1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package glusterfs 18 19import ( 20 "context" 21 "crypto/tls" 22 "fmt" 23 "math" 24 "math/rand" 25 "net" 26 "net/http" 27 "os" 28 "path/filepath" 29 "runtime" 30 "strconv" 31 dstrings "strings" 32 "sync" 33 34 gcli "github.com/heketi/heketi/client/api/go-client" 35 gapi "github.com/heketi/heketi/pkg/glusterfs/api" 36 "k8s.io/klog/v2" 37 "k8s.io/mount-utils" 38 utilstrings "k8s.io/utils/strings" 39 40 v1 "k8s.io/api/core/v1" 41 "k8s.io/apimachinery/pkg/api/errors" 42 "k8s.io/apimachinery/pkg/api/resource" 43 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 44 "k8s.io/apimachinery/pkg/labels" 45 "k8s.io/apimachinery/pkg/types" 46 "k8s.io/apimachinery/pkg/util/sets" 47 "k8s.io/apimachinery/pkg/util/uuid" 48 clientset "k8s.io/client-go/kubernetes" 49 volumehelpers "k8s.io/cloud-provider/volume/helpers" 50 storagehelpers "k8s.io/component-helpers/storage/volume" 51 proxyutil "k8s.io/kubernetes/pkg/proxy/util" 52 "k8s.io/kubernetes/pkg/volume" 53 volutil "k8s.io/kubernetes/pkg/volume/util" 54) 55 56// ProbeVolumePlugins is the primary entrypoint for volume plugins. 57func ProbeVolumePlugins() []volume.VolumePlugin { 58 return []volume.VolumePlugin{&glusterfsPlugin{host: nil, gidTable: make(map[string]*MinMaxAllocator)}} 59} 60 61type glusterfsPlugin struct { 62 host volume.VolumeHost 63 gidTable map[string]*MinMaxAllocator 64 gidTableLock sync.Mutex 65} 66 67var _ volume.VolumePlugin = &glusterfsPlugin{} 68var _ volume.PersistentVolumePlugin = &glusterfsPlugin{} 69var _ volume.DeletableVolumePlugin = &glusterfsPlugin{} 70var _ volume.ProvisionableVolumePlugin = &glusterfsPlugin{} 71var _ volume.ExpandableVolumePlugin = &glusterfsPlugin{} 72var _ volume.Provisioner = &glusterfsVolumeProvisioner{} 73var _ volume.Deleter = &glusterfsVolumeDeleter{} 74 75const ( 76 glusterfsPluginName = "kubernetes.io/glusterfs" 77 volPrefix = "vol_" 78 dynamicEpSvcPrefix = "glusterfs-dynamic" 79 replicaCount = 3 80 secretKeyName = "key" // key name used in secret 81 gciLinuxGlusterMountBinaryPath = "/sbin/mount.glusterfs" 82 defaultGidMin = 2000 83 defaultGidMax = math.MaxInt32 84 85 // maxCustomEpNamePrefix is the maximum number of chars. 86 // which can be used as ep/svc name prefix. This number is carved 87 // out from below formula. 88 // max length of name of an ep - length of pvc uuid 89 // where max length of name of an ep is 63 and length of uuid is 37 90 maxCustomEpNamePrefixLen = 26 91 92 // absoluteGidMin/Max are currently the same as the 93 // default values, but they play a different role and 94 // could take a different value. Only thing we need is: 95 // absGidMin <= defGidMin <= defGidMax <= absGidMax 96 absoluteGidMin = 2000 97 absoluteGidMax = math.MaxInt32 98 heketiAnn = "heketi-dynamic-provisioner" 99 glusterTypeAnn = "gluster.org/type" 100 glusterDescAnn = "Gluster-Internal: Dynamically provisioned PV" 101 heketiVolIDAnn = "gluster.kubernetes.io/heketi-volume-id" 102 103 // Error string returned by heketi 104 errIDNotFound = "Id not found" 105) 106 107func (plugin *glusterfsPlugin) Init(host volume.VolumeHost) error { 108 plugin.host = host 109 return nil 110} 111 112func (plugin *glusterfsPlugin) GetPluginName() string { 113 return glusterfsPluginName 114} 115 116func (plugin *glusterfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) { 117 return "", fmt.Errorf("GetVolumeName() is unimplemented for GlusterFS") 118} 119 120func (plugin *glusterfsPlugin) CanSupport(spec *volume.Spec) bool { 121 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Glusterfs != nil) || 122 (spec.Volume != nil && spec.Volume.Glusterfs != nil) 123} 124 125func (plugin *glusterfsPlugin) RequiresRemount(spec *volume.Spec) bool { 126 return false 127} 128 129func (plugin *glusterfsPlugin) SupportsMountOption() bool { 130 return true 131} 132 133func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool { 134 return false 135} 136 137func (plugin *glusterfsPlugin) RequiresFSResize() bool { 138 return false 139} 140 141func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { 142 return []v1.PersistentVolumeAccessMode{ 143 v1.ReadWriteOnce, 144 v1.ReadOnlyMany, 145 v1.ReadWriteMany, 146 } 147} 148 149func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { 150 epName, epNamespace, err := plugin.getEndpointNameAndNamespace(spec, pod.Namespace) 151 if err != nil { 152 return nil, err 153 } 154 kubeClient := plugin.host.GetKubeClient() 155 if kubeClient == nil { 156 return nil, fmt.Errorf("failed to get kube client to initialize mounter") 157 } 158 ep, err := kubeClient.CoreV1().Endpoints(epNamespace).Get(context.TODO(), epName, metav1.GetOptions{}) 159 if err != nil { 160 klog.Errorf("failed to get endpoint %s: %v", epName, err) 161 return nil, err 162 } 163 klog.V(4).Infof("glusterfs pv endpoint %v", ep) 164 return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(plugin.GetPluginName())) 165} 166 167func (plugin *glusterfsPlugin) getEndpointNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) { 168 if spec.Volume != nil && spec.Volume.Glusterfs != nil { 169 endpoints := spec.Volume.Glusterfs.EndpointsName 170 if endpoints == "" { 171 return "", "", fmt.Errorf("no glusterFS endpoint specified") 172 } 173 return endpoints, defaultNamespace, nil 174 } else if spec.PersistentVolume != nil && 175 spec.PersistentVolume.Spec.Glusterfs != nil { 176 endpoints := spec.PersistentVolume.Spec.Glusterfs.EndpointsName 177 endpointsNs := defaultNamespace 178 overriddenNs := spec.PersistentVolume.Spec.Glusterfs.EndpointsNamespace 179 if overriddenNs != nil { 180 if len(*overriddenNs) > 0 { 181 endpointsNs = *overriddenNs 182 } else { 183 return "", "", fmt.Errorf("endpointnamespace field set, but no endpointnamespace specified") 184 } 185 } 186 return endpoints, endpointsNs, nil 187 } 188 return "", "", fmt.Errorf("spec does not reference a GlusterFS volume type") 189 190} 191func (plugin *glusterfsPlugin) newMounterInternal(spec *volume.Spec, ep *v1.Endpoints, pod *v1.Pod, mounter mount.Interface) (volume.Mounter, error) { 192 volPath, readOnly, err := getVolumeInfo(spec) 193 if err != nil { 194 klog.Errorf("failed to get volumesource: %v", err) 195 return nil, err 196 } 197 return &glusterfsMounter{ 198 glusterfs: &glusterfs{ 199 volName: spec.Name(), 200 mounter: mounter, 201 pod: pod, 202 plugin: plugin, 203 MetricsProvider: volume.NewMetricsStatFS(plugin.host.GetPodVolumeDir(pod.UID, utilstrings.EscapeQualifiedName(glusterfsPluginName), spec.Name())), 204 }, 205 hosts: ep, 206 path: volPath, 207 readOnly: readOnly, 208 mountOptions: volutil.MountOptionFromSpec(spec), 209 }, nil 210} 211 212func (plugin *glusterfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { 213 return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName())) 214} 215 216func (plugin *glusterfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) { 217 return &glusterfsUnmounter{&glusterfs{ 218 volName: volName, 219 mounter: mounter, 220 pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID}}, 221 plugin: plugin, 222 MetricsProvider: volume.NewMetricsStatFS(plugin.host.GetPodVolumeDir(podUID, utilstrings.EscapeQualifiedName(glusterfsPluginName), volName)), 223 }}, nil 224} 225 226func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { 227 228 // To reconstruct volume spec we need endpoint where fetching endpoint from mount 229 // string looks to be impossible, so returning error. 230 return nil, fmt.Errorf("impossible to reconstruct glusterfs volume spec from volume mountpath") 231} 232 233// Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export. 234type glusterfs struct { 235 volName string 236 pod *v1.Pod 237 mounter mount.Interface 238 plugin *glusterfsPlugin 239 volume.MetricsProvider 240} 241 242type glusterfsMounter struct { 243 *glusterfs 244 hosts *v1.Endpoints 245 path string 246 readOnly bool 247 mountOptions []string 248} 249 250var _ volume.Mounter = &glusterfsMounter{} 251 252func (b *glusterfsMounter) GetAttributes() volume.Attributes { 253 return volume.Attributes{ 254 ReadOnly: b.readOnly, 255 Managed: false, 256 SupportsSELinux: false, 257 } 258} 259 260// Checks prior to mount operations to verify that the required components (binaries, etc.) 261// to mount the volume are available on the underlying node. 262// If not, it returns an error 263func (b *glusterfsMounter) CanMount() error { 264 exe := b.plugin.host.GetExec(b.plugin.GetPluginName()) 265 switch runtime.GOOS { 266 case "linux": 267 if _, err := exe.Command("test", "-x", gciLinuxGlusterMountBinaryPath).CombinedOutput(); err != nil { 268 return fmt.Errorf("required binary %s is missing", gciLinuxGlusterMountBinaryPath) 269 } 270 } 271 return nil 272} 273 274// SetUp attaches the disk and bind mounts to the volume path. 275func (b *glusterfsMounter) SetUp(mounterArgs volume.MounterArgs) error { 276 return b.SetUpAt(b.GetPath(), mounterArgs) 277} 278 279func (b *glusterfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { 280 notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) 281 klog.V(4).Infof("mount setup: %s %v %v", dir, !notMnt, err) 282 if err != nil && !os.IsNotExist(err) { 283 return err 284 } 285 if !notMnt { 286 return nil 287 } 288 if err := os.MkdirAll(dir, 0750); err != nil { 289 return err 290 } 291 err = b.setUpAtInternal(dir) 292 if err == nil { 293 return nil 294 } 295 296 // Cleanup upon failure. 297 mount.CleanupMountPoint(dir, b.mounter, false) 298 return err 299} 300 301func (glusterfsVolume *glusterfs) GetPath() string { 302 name := glusterfsPluginName 303 return glusterfsVolume.plugin.host.GetPodVolumeDir(glusterfsVolume.pod.UID, utilstrings.EscapeQualifiedName(name), glusterfsVolume.volName) 304} 305 306type glusterfsUnmounter struct { 307 *glusterfs 308} 309 310var _ volume.Unmounter = &glusterfsUnmounter{} 311 312func (c *glusterfsUnmounter) TearDown() error { 313 return c.TearDownAt(c.GetPath()) 314} 315 316func (c *glusterfsUnmounter) TearDownAt(dir string) error { 317 return mount.CleanupMountPoint(dir, c.mounter, false) 318} 319 320func (b *glusterfsMounter) setUpAtInternal(dir string) error { 321 var errs error 322 options := []string{} 323 hasLogFile := false 324 hasLogLevel := false 325 log := "" 326 if b.readOnly { 327 options = append(options, "ro") 328 } 329 330 // Check for log-file,log-level options existence in user supplied mount options, if provided, use those. 331 for _, userOpt := range b.mountOptions { 332 switch { 333 case dstrings.HasPrefix(userOpt, "log-file"): 334 klog.V(4).Infof("log-file mount option has provided") 335 hasLogFile = true 336 337 case dstrings.HasPrefix(userOpt, "log-level"): 338 klog.V(4).Infof("log-level mount option has provided") 339 hasLogLevel = true 340 } 341 } 342 343 // If logfile has not been provided, create driver specific log file. 344 if !hasLogFile { 345 p := filepath.Join(b.glusterfs.plugin.host.GetPluginDir(glusterfsPluginName), b.glusterfs.volName) 346 if err := os.MkdirAll(p, 0750); err != nil { 347 return fmt.Errorf("failed to create directory %v: %v", p, err) 348 } 349 350 // adding log-level ERROR to remove noise 351 // and more specific log path so each pod has 352 // its own log based on PV + Pod 353 log = filepath.Join(p, b.pod.Name+"-glusterfs.log") 354 355 // Use derived log file in gluster fuse mount 356 options = append(options, "log-file="+log) 357 } 358 if !hasLogLevel { 359 options = append(options, "log-level=ERROR") 360 } 361 var addrlist []string 362 if b.hosts == nil { 363 return fmt.Errorf("glusterfs endpoint is nil in mounter") 364 } 365 addr := sets.String{} 366 if b.hosts.Subsets != nil { 367 for _, s := range b.hosts.Subsets { 368 for _, a := range s.Addresses { 369 if !addr.Has(a.IP) { 370 addr.Insert(a.IP) 371 addrlist = append(addrlist, a.IP) 372 } 373 } 374 } 375 } 376 377 if (len(addrlist) > 0) && (addrlist[0] != "") { 378 ip := addrlist[rand.Intn(len(addrlist))] 379 380 // Add backup-volfile-servers and auto_unmount options. 381 // When ip is also in backup-volfile-servers, there will be a warning: 382 // "gf_remember_backup_volfile_server] 0-glusterfs: failed to set volfile server: File exists". 383 addr.Delete(ip) 384 backups := addr.List() 385 // Avoid an invalid empty backup-volfile-servers option. 386 if len(backups) > 0 { 387 options = append(options, "backup-volfile-servers="+dstrings.Join(addrlist[:], ":")) 388 } 389 options = append(options, "auto_unmount") 390 391 mountOptions := volutil.JoinMountOptions(b.mountOptions, options) 392 // with `backup-volfile-servers` mount option in place, it is not required to 393 // iterate over all the servers in the addrlist. A mount attempt with this option 394 // will fetch all the servers mentioned in the backup-volfile-servers list. 395 // Refer to backup-volfile-servers @ http://docs.gluster.org/en/latest/Administrator%20Guide/Setting%20Up%20Clients/ 396 397 errs = b.mounter.Mount(ip+":"+b.path, dir, "glusterfs", mountOptions) 398 if errs == nil { 399 klog.Infof("successfully mounted directory %s", dir) 400 return nil 401 } 402 if dstrings.Contains(errs.Error(), "Invalid option auto_unmount") || 403 dstrings.Contains(errs.Error(), "Invalid argument") { 404 // Give a try without `auto_unmount` mount option, because 405 // it could be that gluster fuse client is older version and 406 // mount.glusterfs is unaware of `auto_unmount`. 407 noAutoMountOptions := make([]string, 0, len(mountOptions)) 408 for _, opt := range mountOptions { 409 if opt != "auto_unmount" { 410 noAutoMountOptions = append(noAutoMountOptions, opt) 411 } 412 } 413 errs = b.mounter.Mount(ip+":"+b.path, dir, "glusterfs", noAutoMountOptions) 414 if errs == nil { 415 klog.Infof("successfully mounted %s", dir) 416 return nil 417 } 418 } 419 } else { 420 return fmt.Errorf("failed to execute mount command:[no valid ipaddress found in endpoint address list]") 421 } 422 423 // Failed mount scenario. 424 // Since glusterfs does not return error text 425 // it all goes in a log file, we will read the log file 426 logErr := readGlusterLog(log, b.pod.Name) 427 if logErr != nil { 428 return fmt.Errorf("mount failed: %v, the following error information was pulled from the glusterfs log to help diagnose this issue: %v", errs, logErr) 429 } 430 return fmt.Errorf("mount failed: %v", errs) 431 432} 433 434//getVolumeInfo returns 'path' and 'readonly' field values from the provided glusterfs spec. 435func getVolumeInfo(spec *volume.Spec) (string, bool, error) { 436 if spec.Volume != nil && spec.Volume.Glusterfs != nil { 437 return spec.Volume.Glusterfs.Path, spec.Volume.Glusterfs.ReadOnly, nil 438 } else if spec.PersistentVolume != nil && 439 spec.PersistentVolume.Spec.Glusterfs != nil { 440 return spec.PersistentVolume.Spec.Glusterfs.Path, spec.ReadOnly, nil 441 } 442 return "", false, fmt.Errorf("spec does not reference a Glusterfs volume type") 443} 444 445func (plugin *glusterfsPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) { 446 return plugin.newProvisionerInternal(options) 447} 448 449func (plugin *glusterfsPlugin) newProvisionerInternal(options volume.VolumeOptions) (volume.Provisioner, error) { 450 return &glusterfsVolumeProvisioner{ 451 glusterfsMounter: &glusterfsMounter{ 452 glusterfs: &glusterfs{ 453 plugin: plugin, 454 }, 455 }, 456 options: options, 457 }, nil 458} 459 460type provisionerConfig struct { 461 url string 462 user string 463 userKey string 464 secretNamespace string 465 secretName string 466 secretValue string `datapolicy:"token"` 467 clusterID string 468 gidMin int 469 gidMax int 470 volumeType gapi.VolumeDurabilityInfo 471 volumeOptions []string 472 volumeNamePrefix string 473 thinPoolSnapFactor float32 474 customEpNamePrefix string 475} 476 477type glusterfsVolumeProvisioner struct { 478 *glusterfsMounter 479 provisionerConfig 480 options volume.VolumeOptions 481} 482 483func convertGid(gidString string) (int, error) { 484 gid64, err := strconv.ParseInt(gidString, 10, 32) 485 if err != nil { 486 return 0, fmt.Errorf("failed to parse gid %v: %v", gidString, err) 487 } 488 if gid64 < 0 { 489 return 0, fmt.Errorf("negative GIDs %v are not allowed", gidString) 490 } 491 492 // ParseInt returns a int64, but since we parsed only 493 // for 32 bit, we can cast to int without loss: 494 gid := int(gid64) 495 return gid, nil 496} 497 498func convertVolumeParam(volumeString string) (int, error) { 499 count, err := strconv.Atoi(volumeString) 500 if err != nil { 501 return 0, fmt.Errorf("failed to parse volumestring %q: %v", volumeString, err) 502 } 503 504 if count < 0 { 505 return 0, fmt.Errorf("negative values are not allowed") 506 } 507 return count, nil 508} 509 510func (plugin *glusterfsPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { 511 return plugin.newDeleterInternal(spec) 512} 513 514func (plugin *glusterfsPlugin) newDeleterInternal(spec *volume.Spec) (volume.Deleter, error) { 515 if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Glusterfs == nil { 516 return nil, fmt.Errorf("spec.PersistentVolume.Spec.Glusterfs is nil") 517 } 518 return &glusterfsVolumeDeleter{ 519 glusterfsMounter: &glusterfsMounter{ 520 glusterfs: &glusterfs{ 521 volName: spec.Name(), 522 plugin: plugin, 523 }, 524 path: spec.PersistentVolume.Spec.Glusterfs.Path, 525 }, 526 spec: spec.PersistentVolume, 527 }, nil 528} 529 530type glusterfsVolumeDeleter struct { 531 *glusterfsMounter 532 provisionerConfig 533 spec *v1.PersistentVolume 534} 535 536func (d *glusterfsVolumeDeleter) GetPath() string { 537 name := glusterfsPluginName 538 return d.plugin.host.GetPodVolumeDir(d.glusterfsMounter.glusterfs.pod.UID, utilstrings.EscapeQualifiedName(name), d.glusterfsMounter.glusterfs.volName) 539} 540 541// Traverse the PVs, fetching all the GIDs from those 542// in a given storage class, and mark them in the table. 543func (plugin *glusterfsPlugin) collectGids(className string, gidTable *MinMaxAllocator) error { 544 kubeClient := plugin.host.GetKubeClient() 545 if kubeClient == nil { 546 return fmt.Errorf("failed to get kube client when collecting gids") 547 } 548 pvList, err := kubeClient.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String()}) 549 if err != nil { 550 return fmt.Errorf("failed to get existing persistent volumes") 551 } 552 for _, pv := range pvList.Items { 553 if storagehelpers.GetPersistentVolumeClass(&pv) != className { 554 continue 555 } 556 pvName := pv.ObjectMeta.Name 557 gidStr, ok := pv.Annotations[volutil.VolumeGidAnnotationKey] 558 if !ok { 559 klog.Warningf("no GID found in pv %v", pvName) 560 continue 561 } 562 gid, err := convertGid(gidStr) 563 if err != nil { 564 klog.Errorf("failed to parse gid %s: %v", gidStr, err) 565 continue 566 } 567 _, err = gidTable.Allocate(gid) 568 if err == ErrConflict { 569 klog.Warningf("GID %v found in pv %v was already allocated", gid, pvName) 570 } else if err != nil { 571 return fmt.Errorf("failed to store gid %v found in pv %v: %v", gid, pvName, err) 572 } 573 } 574 return nil 575} 576 577// Return the gid table for a storage class. 578// - If this is the first time, fill it with all the gids 579// used in PVs of this storage class by traversing the PVs. 580// - Adapt the range of the table to the current range of the SC. 581func (plugin *glusterfsPlugin) getGidTable(className string, min int, max int) (*MinMaxAllocator, error) { 582 plugin.gidTableLock.Lock() 583 gidTable, ok := plugin.gidTable[className] 584 plugin.gidTableLock.Unlock() 585 586 if ok { 587 err := gidTable.SetRange(min, max) 588 if err != nil { 589 return nil, err 590 } 591 return gidTable, nil 592 } 593 594 // create a new table and fill it 595 newGidTable, err := NewMinMaxAllocator(0, absoluteGidMax) 596 if err != nil { 597 return nil, err 598 } 599 600 // collect gids with the full range 601 err = plugin.collectGids(className, newGidTable) 602 if err != nil { 603 return nil, err 604 } 605 606 // and only reduce the range afterwards 607 err = newGidTable.SetRange(min, max) 608 if err != nil { 609 return nil, err 610 } 611 612 // if in the meantime a table appeared, use it 613 plugin.gidTableLock.Lock() 614 defer plugin.gidTableLock.Unlock() 615 gidTable, ok = plugin.gidTable[className] 616 if ok { 617 err = gidTable.SetRange(min, max) 618 if err != nil { 619 return nil, err 620 } 621 return gidTable, nil 622 } 623 624 plugin.gidTable[className] = newGidTable 625 return newGidTable, nil 626} 627 628func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) { 629 gidStr, ok := d.spec.Annotations[volutil.VolumeGidAnnotationKey] 630 if !ok { 631 return 0, false, nil 632 } 633 gid, err := convertGid(gidStr) 634 return gid, true, err 635} 636 637func (d *glusterfsVolumeDeleter) Delete() error { 638 klog.V(2).Infof("delete volume %s", d.glusterfsMounter.path) 639 volumeName := d.glusterfsMounter.path 640 volumeID, err := getVolumeID(d.spec, volumeName) 641 if err != nil { 642 return fmt.Errorf("failed to get volumeID: %v", err) 643 } 644 class, err := volutil.GetClassForVolume(d.plugin.host.GetKubeClient(), d.spec) 645 if err != nil { 646 return err 647 } 648 cfg, err := parseClassParameters(class.Parameters, d.plugin.host.GetKubeClient()) 649 if err != nil { 650 return err 651 } 652 d.provisionerConfig = *cfg 653 klog.V(4).Infof("deleting volume %q", volumeID) 654 gid, exists, err := d.getGid() 655 if err != nil { 656 klog.Error(err) 657 } else if exists { 658 gidTable, err := d.plugin.getGidTable(class.Name, cfg.gidMin, cfg.gidMax) 659 if err != nil { 660 return fmt.Errorf("failed to get gidTable: %v", err) 661 } 662 err = gidTable.Release(gid) 663 if err != nil { 664 return fmt.Errorf("failed to release gid %v: %v", gid, err) 665 } 666 } 667 cli := filterClient(gcli.NewClient(d.url, d.user, d.secretValue), d.plugin.host.GetFilteredDialOptions()) 668 if cli == nil { 669 klog.Errorf("failed to create glusterfs REST client") 670 return fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") 671 } 672 err = cli.VolumeDelete(volumeID) 673 if err != nil { 674 if dstrings.TrimSpace(err.Error()) != errIDNotFound { 675 // don't log error details from client calls in events 676 klog.V(4).Infof("failed to delete volume %s: %v", volumeName, err) 677 return fmt.Errorf("failed to delete volume: see kube-controller-manager.log for details") 678 } 679 klog.V(2).Infof("volume %s not present in heketi, ignoring", volumeName) 680 } 681 klog.V(2).Infof("volume %s deleted successfully", volumeName) 682 683 //Deleter takes endpoint and namespace from pv spec. 684 pvSpec := d.spec.Spec 685 var dynamicEndpoint, dynamicNamespace string 686 if pvSpec.ClaimRef == nil { 687 klog.Errorf("ClaimRef is nil") 688 return fmt.Errorf("ClaimRef is nil") 689 } 690 if pvSpec.ClaimRef.Namespace == "" { 691 klog.Errorf("namespace is nil") 692 return fmt.Errorf("namespace is nil") 693 } 694 dynamicNamespace = pvSpec.ClaimRef.Namespace 695 if pvSpec.Glusterfs.EndpointsName != "" { 696 dynamicEndpoint = pvSpec.Glusterfs.EndpointsName 697 } 698 klog.V(3).Infof("dynamic namespace and endpoint %v/%v", dynamicNamespace, dynamicEndpoint) 699 err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint) 700 if err != nil { 701 klog.Errorf("failed to delete endpoint/service %v/%v: %v", dynamicNamespace, dynamicEndpoint, err) 702 } else { 703 klog.V(1).Infof("endpoint %v/%v is deleted successfully ", dynamicNamespace, dynamicEndpoint) 704 } 705 return nil 706} 707 708func filterClient(client *gcli.Client, opts *proxyutil.FilteredDialOptions) *gcli.Client { 709 if opts == nil { 710 return client 711 } 712 dialer := proxyutil.NewFilteredDialContext(nil, nil, opts) 713 client.SetClientFunc(func(tlsConfig *tls.Config, checkRedirect gcli.CheckRedirectFunc) (gcli.HttpPerformer, error) { 714 transport := http.DefaultTransport.(*http.Transport).Clone() 715 transport.DialContext = dialer 716 transport.TLSClientConfig = tlsConfig 717 return &http.Client{Transport: transport, CheckRedirect: checkRedirect}, nil 718 }) 719 return client 720} 721 722func (p *glusterfsVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) { 723 if !volutil.ContainsAllAccessModes(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) { 724 return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes()) 725 } 726 if p.options.PVC.Spec.Selector != nil { 727 klog.V(4).Infof("not able to parse your claim Selector") 728 return nil, fmt.Errorf("not able to parse your claim Selector") 729 } 730 if volutil.CheckPersistentVolumeClaimModeBlock(p.options.PVC) { 731 return nil, fmt.Errorf("%s does not support block volume provisioning", p.plugin.GetPluginName()) 732 } 733 klog.V(4).Infof("provision volume with options %v", p.options) 734 scName := storagehelpers.GetPersistentVolumeClaimClass(p.options.PVC) 735 cfg, err := parseClassParameters(p.options.Parameters, p.plugin.host.GetKubeClient()) 736 if err != nil { 737 return nil, err 738 } 739 p.provisionerConfig = *cfg 740 741 gidTable, err := p.plugin.getGidTable(scName, cfg.gidMin, cfg.gidMax) 742 if err != nil { 743 return nil, fmt.Errorf("failed to get gidTable: %v", err) 744 } 745 gid, _, err := gidTable.AllocateNext() 746 if err != nil { 747 klog.Errorf("failed to reserve GID from table: %v", err) 748 return nil, fmt.Errorf("failed to reserve GID from table: %v", err) 749 } 750 klog.V(2).Infof("allocated GID %d for PVC %s", gid, p.options.PVC.Name) 751 glusterfs, sizeGiB, volID, err := p.CreateVolume(gid) 752 if err != nil { 753 if releaseErr := gidTable.Release(gid); releaseErr != nil { 754 klog.Errorf("error when releasing GID in storageclass %s: %v", scName, releaseErr) 755 } 756 return nil, fmt.Errorf("failed to create volume: %v", err) 757 } 758 mode := v1.PersistentVolumeFilesystem 759 pv := new(v1.PersistentVolume) 760 pv.Spec.PersistentVolumeSource.Glusterfs = glusterfs 761 pv.Spec.PersistentVolumeReclaimPolicy = p.options.PersistentVolumeReclaimPolicy 762 pv.Spec.AccessModes = p.options.PVC.Spec.AccessModes 763 pv.Spec.VolumeMode = &mode 764 if len(pv.Spec.AccessModes) == 0 { 765 pv.Spec.AccessModes = p.plugin.GetAccessModes() 766 } 767 pv.Spec.MountOptions = p.options.MountOptions 768 gidStr := strconv.FormatInt(int64(gid), 10) 769 pv.Annotations = map[string]string{ 770 volutil.VolumeGidAnnotationKey: gidStr, 771 volutil.VolumeDynamicallyCreatedByKey: heketiAnn, 772 glusterTypeAnn: "file", 773 "Description": glusterDescAnn, 774 heketiVolIDAnn: volID, 775 } 776 pv.Spec.Capacity = v1.ResourceList{ 777 v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGiB)), 778 } 779 return pv, nil 780} 781 782func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersistentVolumeSource, size int, volID string, err error) { 783 var clusterIDs []string 784 customVolumeName := "" 785 epServiceName := "" 786 kubeClient := p.plugin.host.GetKubeClient() 787 if kubeClient == nil { 788 return nil, 0, "", fmt.Errorf("failed to get kube client to update endpoint") 789 } 790 if len(p.provisionerConfig.customEpNamePrefix) == 0 { 791 epServiceName = string(p.options.PVC.UID) 792 } else { 793 epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID) 794 } 795 epNamespace := p.options.PVC.Namespace 796 endpoint, service, err := p.createOrGetEndpointService(epNamespace, epServiceName, p.options.PVC) 797 if err != nil { 798 klog.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err) 799 return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err) 800 } 801 klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service) 802 capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] 803 804 // GlusterFS/heketi creates volumes in units of GiB. 805 sz, err := volumehelpers.RoundUpToGiBInt(capacity) 806 if err != nil { 807 return nil, 0, "", err 808 } 809 klog.V(2).Infof("create volume of size %dGiB", sz) 810 if p.url == "" { 811 return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST URL is empty") 812 } 813 cli := filterClient(gcli.NewClient(p.url, p.user, p.secretValue), p.plugin.host.GetFilteredDialOptions()) 814 if cli == nil { 815 return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") 816 } 817 if p.provisionerConfig.clusterID != "" { 818 clusterIDs = dstrings.Split(p.clusterID, ",") 819 klog.V(4).Infof("provided clusterIDs %v", clusterIDs) 820 } 821 822 if p.provisionerConfig.volumeNamePrefix != "" { 823 customVolumeName = fmt.Sprintf("%s_%s_%s_%s", p.provisionerConfig.volumeNamePrefix, p.options.PVC.Namespace, p.options.PVC.Name, uuid.NewUUID()) 824 } 825 gid64 := int64(gid) 826 snaps := struct { 827 Enable bool `json:"enable"` 828 Factor float32 `json:"factor"` 829 }{ 830 true, 831 p.provisionerConfig.thinPoolSnapFactor, 832 } 833 volumeReq := &gapi.VolumeCreateRequest{Size: sz, Name: customVolumeName, Clusters: clusterIDs, Gid: gid64, Durability: p.volumeType, GlusterVolumeOptions: p.volumeOptions, Snapshot: snaps} 834 volume, err := cli.VolumeCreate(volumeReq) 835 if err != nil { 836 // don't log error details from client calls in events 837 klog.V(4).Infof("failed to create volume: %v", err) 838 return nil, 0, "", fmt.Errorf("failed to create volume: see kube-controller-manager.log for details") 839 } 840 klog.V(1).Infof("volume with size %d and name %s created", volume.Size, volume.Name) 841 volID = volume.Id 842 dynamicHostIps, err := getClusterNodes(cli, volume.Cluster) 843 if err != nil { 844 return nil, 0, "", fmt.Errorf("failed to get cluster nodes for volume %s: %v", volume, err) 845 } 846 addrlist := make([]v1.EndpointAddress, len(dynamicHostIps)) 847 for i, v := range dynamicHostIps { 848 addrlist[i].IP = v 849 } 850 subset := make([]v1.EndpointSubset, 1) 851 ports := []v1.EndpointPort{{Port: 1, Protocol: "TCP"}} 852 endpoint.Subsets = subset 853 endpoint.Subsets[0].Addresses = addrlist 854 endpoint.Subsets[0].Ports = ports 855 _, err = kubeClient.CoreV1().Endpoints(epNamespace).Update(context.TODO(), endpoint, metav1.UpdateOptions{}) 856 if err != nil { 857 deleteErr := cli.VolumeDelete(volume.Id) 858 if deleteErr != nil { 859 // don't log error details from client calls in events 860 klog.V(4).Infof("failed to delete volume: %v, manual deletion of the volume required", deleteErr) 861 } 862 klog.V(3).Infof("failed to update endpoint, deleting %s", endpoint) 863 err = kubeClient.CoreV1().Services(epNamespace).Delete(context.TODO(), epServiceName, metav1.DeleteOptions{}) 864 if err != nil && errors.IsNotFound(err) { 865 klog.V(1).Infof("service %s does not exist in namespace %s", epServiceName, epNamespace) 866 err = nil 867 } 868 if err != nil { 869 klog.Errorf("failed to delete service %s/%s: %v", epNamespace, epServiceName, err) 870 } 871 klog.V(1).Infof("service/endpoint: %s/%s deleted successfully", epNamespace, epServiceName) 872 return nil, 0, "", fmt.Errorf("failed to update endpoint %s: %v", endpoint, err) 873 } 874 klog.V(3).Infof("endpoint %s updated successfully", endpoint) 875 return &v1.GlusterfsPersistentVolumeSource{ 876 EndpointsName: endpoint.Name, 877 EndpointsNamespace: &epNamespace, 878 Path: volume.Name, 879 ReadOnly: false, 880 }, sz, volID, nil 881} 882 883// createOrGetEndpointService() makes sure an endpoint and service 884// exist for the given namespace, PVC name, endpoint name 885// I.e. the endpoint or service is only created 886// if it does not exist yet. 887func (p *glusterfsVolumeProvisioner) createOrGetEndpointService(namespace string, epServiceName string, pvc *v1.PersistentVolumeClaim) (endpoint *v1.Endpoints, service *v1.Service, err error) { 888 pvcNameOrID := "" 889 if len(pvc.Name) >= 63 { 890 pvcNameOrID = string(pvc.UID) 891 } else { 892 pvcNameOrID = pvc.Name 893 } 894 endpoint = &v1.Endpoints{ 895 ObjectMeta: metav1.ObjectMeta{ 896 Namespace: namespace, 897 Name: epServiceName, 898 Labels: map[string]string{ 899 "gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID, 900 }, 901 }, 902 } 903 kubeClient := p.plugin.host.GetKubeClient() 904 if kubeClient == nil { 905 return nil, nil, fmt.Errorf("failed to get kube client when creating endpoint service") 906 } 907 _, err = kubeClient.CoreV1().Endpoints(namespace).Create(context.TODO(), endpoint, metav1.CreateOptions{}) 908 if err != nil && errors.IsAlreadyExists(err) { 909 klog.V(1).Infof("endpoint %s already exist in namespace %s", endpoint, namespace) 910 err = nil 911 } 912 if err != nil { 913 klog.Errorf("failed to create endpoint: %v", err) 914 return nil, nil, fmt.Errorf("failed to create endpoint: %v", err) 915 } 916 service = &v1.Service{ 917 ObjectMeta: metav1.ObjectMeta{ 918 Name: epServiceName, 919 Namespace: namespace, 920 Labels: map[string]string{ 921 "gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID, 922 }, 923 }, 924 Spec: v1.ServiceSpec{ 925 Ports: []v1.ServicePort{ 926 {Protocol: "TCP", Port: 1}}}} 927 _, err = kubeClient.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) 928 if err != nil && errors.IsAlreadyExists(err) { 929 klog.V(1).Infof("service %s already exist in namespace %s", service, namespace) 930 err = nil 931 } 932 if err != nil { 933 klog.Errorf("failed to create service: %v", err) 934 return nil, nil, fmt.Errorf("error creating service: %v", err) 935 } 936 return endpoint, service, nil 937} 938 939func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) { 940 kubeClient := d.plugin.host.GetKubeClient() 941 if kubeClient == nil { 942 return fmt.Errorf("failed to get kube client when deleting endpoint service") 943 } 944 err = kubeClient.CoreV1().Services(namespace).Delete(context.TODO(), epServiceName, metav1.DeleteOptions{}) 945 if err != nil { 946 return fmt.Errorf("failed to delete service %s/%s: %v", namespace, epServiceName, err) 947 } 948 klog.V(1).Infof("service/endpoint: %s/%s deleted successfully", namespace, epServiceName) 949 return nil 950} 951 952// parseSecret finds a given Secret instance and reads user password from it. 953func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) { 954 secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient) 955 if err != nil { 956 klog.Errorf("failed to get secret: %s/%s: %v", namespace, secretName, err) 957 return "", fmt.Errorf("failed to get secret %s/%s: %v", namespace, secretName, err) 958 } 959 if len(secretMap) == 0 { 960 return "", fmt.Errorf("empty secret map") 961 } 962 secret := "" 963 for k, v := range secretMap { 964 if k == secretKeyName { 965 return v, nil 966 } 967 secret = v 968 } 969 970 // If not found, the last secret in the map wins as done before 971 return secret, nil 972} 973 974// getClusterNodes() returns the cluster nodes of a given cluster 975func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) { 976 clusterinfo, err := cli.ClusterInfo(cluster) 977 if err != nil { 978 // don't log error details from client calls in events 979 klog.V(4).Infof("failed to get cluster details: %v", err) 980 return nil, fmt.Errorf("failed to get cluster details: see kube-controller-manager.log for details") 981 } 982 983 // For the dynamically provisioned volume, we gather the list of node IPs 984 // of the cluster on which provisioned volume belongs to, as there can be multiple 985 // clusters. 986 for _, node := range clusterinfo.Nodes { 987 nodeInfo, err := cli.NodeInfo(string(node)) 988 if err != nil { 989 // don't log error details from client calls in events 990 klog.V(4).Infof("failed to get host ipaddress: %v", err) 991 return nil, fmt.Errorf("failed to get host ipaddress: see kube-controller-manager.log for details") 992 } 993 ipaddr := dstrings.Join(nodeInfo.NodeAddRequest.Hostnames.Storage, "") 994 // IP validates if a string is a valid IP address. 995 ip := net.ParseIP(ipaddr) 996 if ip == nil { 997 return nil, fmt.Errorf("glusterfs server node ip address %s must be a valid IP address, (e.g. 10.9.8.7)", ipaddr) 998 } 999 dynamicHostIps = append(dynamicHostIps, ipaddr) 1000 } 1001 klog.V(3).Infof("host list :%v", dynamicHostIps) 1002 if len(dynamicHostIps) == 0 { 1003 return nil, fmt.Errorf("no hosts found: %v", err) 1004 } 1005 return dynamicHostIps, nil 1006} 1007 1008// parseClassParameters parses StorageClass parameters. 1009func parseClassParameters(params map[string]string, kubeClient clientset.Interface) (*provisionerConfig, error) { 1010 var cfg provisionerConfig 1011 var err error 1012 cfg.gidMin = defaultGidMin 1013 cfg.gidMax = defaultGidMax 1014 cfg.customEpNamePrefix = dynamicEpSvcPrefix 1015 1016 authEnabled := true 1017 parseVolumeType := "" 1018 parseVolumeOptions := "" 1019 parseVolumeNamePrefix := "" 1020 parseThinPoolSnapFactor := "" 1021 1022 //thin pool snap factor default to 1.0 1023 cfg.thinPoolSnapFactor = float32(1.0) 1024 1025 for k, v := range params { 1026 switch dstrings.ToLower(k) { 1027 case "resturl": 1028 cfg.url = v 1029 case "restuser": 1030 cfg.user = v 1031 case "restuserkey": 1032 cfg.userKey = v 1033 case "secretname": 1034 cfg.secretName = v 1035 case "secretnamespace": 1036 cfg.secretNamespace = v 1037 case "clusterid": 1038 if len(v) != 0 { 1039 cfg.clusterID = v 1040 } 1041 case "restauthenabled": 1042 authEnabled = dstrings.ToLower(v) == "true" 1043 case "gidmin": 1044 parseGidMin, err := convertGid(v) 1045 if err != nil { 1046 return nil, fmt.Errorf("invalid gidMin value %q for volume plugin %s", k, glusterfsPluginName) 1047 } 1048 if parseGidMin < absoluteGidMin { 1049 return nil, fmt.Errorf("gidMin must be >= %v", absoluteGidMin) 1050 } 1051 if parseGidMin > absoluteGidMax { 1052 return nil, fmt.Errorf("gidMin must be <= %v", absoluteGidMax) 1053 } 1054 cfg.gidMin = parseGidMin 1055 case "gidmax": 1056 parseGidMax, err := convertGid(v) 1057 if err != nil { 1058 return nil, fmt.Errorf("invalid gidMax value %q for volume plugin %s", k, glusterfsPluginName) 1059 } 1060 if parseGidMax < absoluteGidMin { 1061 return nil, fmt.Errorf("gidMax must be >= %v", absoluteGidMin) 1062 } 1063 if parseGidMax > absoluteGidMax { 1064 return nil, fmt.Errorf("gidMax must be <= %v", absoluteGidMax) 1065 } 1066 cfg.gidMax = parseGidMax 1067 case "volumetype": 1068 parseVolumeType = v 1069 1070 case "volumeoptions": 1071 if len(v) != 0 { 1072 parseVolumeOptions = v 1073 } 1074 case "volumenameprefix": 1075 if len(v) != 0 { 1076 parseVolumeNamePrefix = v 1077 } 1078 case "snapfactor": 1079 if len(v) != 0 { 1080 parseThinPoolSnapFactor = v 1081 } 1082 case "customepnameprefix": 1083 // If the string has > 'maxCustomEpNamePrefixLen' chars, the final endpoint name will 1084 // exceed the limitation of 63 chars, so fail if prefix is > 'maxCustomEpNamePrefixLen' 1085 // characters. This is only applicable for 'customepnameprefix' string and default ep name 1086 // string will always pass. 1087 if len(v) <= maxCustomEpNamePrefixLen { 1088 cfg.customEpNamePrefix = v 1089 } else { 1090 return nil, fmt.Errorf("'customepnameprefix' value should be < %d characters", maxCustomEpNamePrefixLen) 1091 } 1092 default: 1093 return nil, fmt.Errorf("invalid option %q for volume plugin %s", k, glusterfsPluginName) 1094 } 1095 } 1096 if len(cfg.url) == 0 { 1097 return nil, fmt.Errorf("StorageClass for provisioner %s must contain 'resturl' parameter", glusterfsPluginName) 1098 } 1099 if len(parseVolumeType) == 0 { 1100 cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: replicaCount}} 1101 } else { 1102 parseVolumeTypeInfo := dstrings.Split(parseVolumeType, ":") 1103 switch parseVolumeTypeInfo[0] { 1104 case "replicate": 1105 if len(parseVolumeTypeInfo) >= 2 { 1106 newReplicaCount, err := convertVolumeParam(parseVolumeTypeInfo[1]) 1107 if err != nil { 1108 return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[1], err) 1109 } 1110 cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: newReplicaCount}} 1111 } else { 1112 cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: replicaCount}} 1113 } 1114 case "disperse": 1115 if len(parseVolumeTypeInfo) >= 3 { 1116 newDisperseData, err := convertVolumeParam(parseVolumeTypeInfo[1]) 1117 if err != nil { 1118 return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[1], err) 1119 } 1120 newDisperseRedundancy, err := convertVolumeParam(parseVolumeTypeInfo[2]) 1121 if err != nil { 1122 return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[2], err) 1123 } 1124 cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityEC, Disperse: gapi.DisperseDurability{Data: newDisperseData, Redundancy: newDisperseRedundancy}} 1125 } else { 1126 return nil, fmt.Errorf("StorageClass for provisioner %q must have data:redundancy count set for disperse volumes in storage class option '%s'", glusterfsPluginName, "volumetype") 1127 } 1128 case "none": 1129 cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityDistributeOnly} 1130 default: 1131 return nil, fmt.Errorf("error parsing value for option 'volumetype' for volume plugin %s", glusterfsPluginName) 1132 } 1133 } 1134 if !authEnabled { 1135 cfg.user = "" 1136 cfg.secretName = "" 1137 cfg.secretNamespace = "" 1138 cfg.userKey = "" 1139 cfg.secretValue = "" 1140 } 1141 1142 if len(cfg.secretName) != 0 || len(cfg.secretNamespace) != 0 { 1143 // secretName + Namespace has precedence over userKey 1144 if len(cfg.secretName) != 0 && len(cfg.secretNamespace) != 0 { 1145 cfg.secretValue, err = parseSecret(cfg.secretNamespace, cfg.secretName, kubeClient) 1146 if err != nil { 1147 return nil, err 1148 } 1149 } else { 1150 return nil, fmt.Errorf("StorageClass for provisioner %q must have secretNamespace and secretName either both set or both empty", glusterfsPluginName) 1151 } 1152 } else { 1153 cfg.secretValue = cfg.userKey 1154 } 1155 if cfg.gidMin > cfg.gidMax { 1156 return nil, fmt.Errorf("storageClass for provisioner %q must have gidMax value >= gidMin", glusterfsPluginName) 1157 } 1158 if len(parseVolumeOptions) != 0 { 1159 volOptions := dstrings.Split(parseVolumeOptions, ",") 1160 if len(volOptions) == 0 { 1161 return nil, fmt.Errorf("storageClass for provisioner %q must have valid (for e.g., 'client.ssl on') volume option", glusterfsPluginName) 1162 } 1163 cfg.volumeOptions = volOptions 1164 } 1165 if len(parseVolumeNamePrefix) != 0 { 1166 if dstrings.Contains(parseVolumeNamePrefix, "_") { 1167 return nil, fmt.Errorf("storageclass parameter 'volumenameprefix' should not contain '_' in its value") 1168 } 1169 cfg.volumeNamePrefix = parseVolumeNamePrefix 1170 } 1171 if len(parseThinPoolSnapFactor) != 0 { 1172 thinPoolSnapFactor, err := strconv.ParseFloat(parseThinPoolSnapFactor, 32) 1173 if err != nil { 1174 return nil, fmt.Errorf("failed to convert snapfactor %v to float: %v", parseThinPoolSnapFactor, err) 1175 } 1176 if thinPoolSnapFactor < 1.0 || thinPoolSnapFactor > 100.0 { 1177 return nil, fmt.Errorf("invalid snapshot factor %v, the value must be between 1 to 100", thinPoolSnapFactor) 1178 } 1179 cfg.thinPoolSnapFactor = float32(thinPoolSnapFactor) 1180 } 1181 return &cfg, nil 1182} 1183 1184// getVolumeID returns volumeID from the PV or volumename. 1185func getVolumeID(pv *v1.PersistentVolume, volumeName string) (string, error) { 1186 volumeID := "" 1187 1188 // Get volID from pvspec if available, else fill it from volumename. 1189 if pv != nil { 1190 if pv.Annotations[heketiVolIDAnn] != "" { 1191 volumeID = pv.Annotations[heketiVolIDAnn] 1192 } else { 1193 volumeID = dstrings.TrimPrefix(volumeName, volPrefix) 1194 } 1195 } else { 1196 return volumeID, fmt.Errorf("provided PV spec is nil") 1197 } 1198 if volumeID == "" { 1199 return volumeID, fmt.Errorf("volume ID is empty") 1200 } 1201 return volumeID, nil 1202} 1203 1204func (plugin *glusterfsPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) { 1205 pvSpec := spec.PersistentVolume.Spec 1206 volumeName := pvSpec.Glusterfs.Path 1207 klog.V(2).Infof("received request to expand volume %s", volumeName) 1208 volumeID, err := getVolumeID(spec.PersistentVolume, volumeName) 1209 if err != nil { 1210 return oldSize, fmt.Errorf("failed to get volumeID for volume %s: %v", volumeName, err) 1211 } 1212 //Get details of StorageClass. 1213 class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume) 1214 if err != nil { 1215 return oldSize, err 1216 } 1217 cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient()) 1218 if err != nil { 1219 return oldSize, err 1220 } 1221 klog.V(4).Infof("expanding volume: %q", volumeID) 1222 1223 //Create REST server connection 1224 cli := filterClient(gcli.NewClient(cfg.url, cfg.user, cfg.secretValue), plugin.host.GetFilteredDialOptions()) 1225 if cli == nil { 1226 klog.Errorf("failed to create glusterfs REST client") 1227 return oldSize, fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed") 1228 } 1229 1230 // Find out delta size 1231 expansionSize := newSize 1232 expansionSize.Sub(oldSize) 1233 expansionSizeGiB, err := volumehelpers.RoundUpToGiBInt(expansionSize) 1234 if err != nil { 1235 return oldSize, err 1236 } 1237 1238 // Find out requested Size 1239 requestGiB, err := volumehelpers.RoundUpToGiB(newSize) 1240 if err != nil { 1241 return oldSize, err 1242 } 1243 1244 //Check the existing volume size 1245 currentVolumeInfo, err := cli.VolumeInfo(volumeID) 1246 if err != nil { 1247 // don't log error details from client calls in events 1248 klog.V(4).Infof("error when fetching details of volume %s: %v", volumeName, err) 1249 return oldSize, fmt.Errorf("failed to get volume info %s: see kube-controller-manager.log for details", volumeName) 1250 } 1251 if int64(currentVolumeInfo.Size) >= requestGiB { 1252 return newSize, nil 1253 } 1254 1255 // Make volume expansion request 1256 volumeExpandReq := &gapi.VolumeExpandRequest{Size: expansionSizeGiB} 1257 1258 // Expand the volume 1259 volumeInfoRes, err := cli.VolumeExpand(volumeID, volumeExpandReq) 1260 if err != nil { 1261 // don't log error details from client calls in events 1262 klog.V(4).Infof("failed to expand volume %s: %v", volumeName, err) 1263 return oldSize, fmt.Errorf("failed to expand volume: see kube-controller-manager.log for details") 1264 } 1265 klog.V(2).Infof("volume %s expanded to new size %d successfully", volumeName, volumeInfoRes.Size) 1266 newVolumeSize := resource.MustParse(fmt.Sprintf("%dGi", volumeInfoRes.Size)) 1267 return newVolumeSize, nil 1268} 1269