1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package glusterfs
18
19import (
20	"context"
21	"crypto/tls"
22	"fmt"
23	"math"
24	"math/rand"
25	"net"
26	"net/http"
27	"os"
28	"path/filepath"
29	"runtime"
30	"strconv"
31	dstrings "strings"
32	"sync"
33
34	gcli "github.com/heketi/heketi/client/api/go-client"
35	gapi "github.com/heketi/heketi/pkg/glusterfs/api"
36	"k8s.io/klog/v2"
37	"k8s.io/mount-utils"
38	utilstrings "k8s.io/utils/strings"
39
40	v1 "k8s.io/api/core/v1"
41	"k8s.io/apimachinery/pkg/api/errors"
42	"k8s.io/apimachinery/pkg/api/resource"
43	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44	"k8s.io/apimachinery/pkg/labels"
45	"k8s.io/apimachinery/pkg/types"
46	"k8s.io/apimachinery/pkg/util/sets"
47	"k8s.io/apimachinery/pkg/util/uuid"
48	clientset "k8s.io/client-go/kubernetes"
49	volumehelpers "k8s.io/cloud-provider/volume/helpers"
50	storagehelpers "k8s.io/component-helpers/storage/volume"
51	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
52	"k8s.io/kubernetes/pkg/volume"
53	volutil "k8s.io/kubernetes/pkg/volume/util"
54)
55
56// ProbeVolumePlugins is the primary entrypoint for volume plugins.
57func ProbeVolumePlugins() []volume.VolumePlugin {
58	return []volume.VolumePlugin{&glusterfsPlugin{host: nil, gidTable: make(map[string]*MinMaxAllocator)}}
59}
60
61type glusterfsPlugin struct {
62	host         volume.VolumeHost
63	gidTable     map[string]*MinMaxAllocator
64	gidTableLock sync.Mutex
65}
66
67var _ volume.VolumePlugin = &glusterfsPlugin{}
68var _ volume.PersistentVolumePlugin = &glusterfsPlugin{}
69var _ volume.DeletableVolumePlugin = &glusterfsPlugin{}
70var _ volume.ProvisionableVolumePlugin = &glusterfsPlugin{}
71var _ volume.ExpandableVolumePlugin = &glusterfsPlugin{}
72var _ volume.Provisioner = &glusterfsVolumeProvisioner{}
73var _ volume.Deleter = &glusterfsVolumeDeleter{}
74
75const (
76	glusterfsPluginName            = "kubernetes.io/glusterfs"
77	volPrefix                      = "vol_"
78	dynamicEpSvcPrefix             = "glusterfs-dynamic"
79	replicaCount                   = 3
80	secretKeyName                  = "key" // key name used in secret
81	gciLinuxGlusterMountBinaryPath = "/sbin/mount.glusterfs"
82	defaultGidMin                  = 2000
83	defaultGidMax                  = math.MaxInt32
84
85	// maxCustomEpNamePrefix is the maximum number of chars.
86	// which can be used as ep/svc name prefix. This number is carved
87	// out from below formula.
88	// max length of name of an ep - length of pvc uuid
89	// where max length of name of an ep is 63 and length of uuid is 37
90	maxCustomEpNamePrefixLen = 26
91
92	// absoluteGidMin/Max are currently the same as the
93	// default values, but they play a different role and
94	// could take a different value. Only thing we need is:
95	// absGidMin <= defGidMin <= defGidMax <= absGidMax
96	absoluteGidMin = 2000
97	absoluteGidMax = math.MaxInt32
98	heketiAnn      = "heketi-dynamic-provisioner"
99	glusterTypeAnn = "gluster.org/type"
100	glusterDescAnn = "Gluster-Internal: Dynamically provisioned PV"
101	heketiVolIDAnn = "gluster.kubernetes.io/heketi-volume-id"
102
103	// Error string returned by heketi
104	errIDNotFound = "Id not found"
105)
106
107func (plugin *glusterfsPlugin) Init(host volume.VolumeHost) error {
108	plugin.host = host
109	return nil
110}
111
112func (plugin *glusterfsPlugin) GetPluginName() string {
113	return glusterfsPluginName
114}
115
116func (plugin *glusterfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
117	return "", fmt.Errorf("GetVolumeName() is unimplemented for GlusterFS")
118}
119
120func (plugin *glusterfsPlugin) CanSupport(spec *volume.Spec) bool {
121	return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Glusterfs != nil) ||
122		(spec.Volume != nil && spec.Volume.Glusterfs != nil)
123}
124
125func (plugin *glusterfsPlugin) RequiresRemount(spec *volume.Spec) bool {
126	return false
127}
128
129func (plugin *glusterfsPlugin) SupportsMountOption() bool {
130	return true
131}
132
133func (plugin *glusterfsPlugin) SupportsBulkVolumeVerification() bool {
134	return false
135}
136
137func (plugin *glusterfsPlugin) RequiresFSResize() bool {
138	return false
139}
140
141func (plugin *glusterfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
142	return []v1.PersistentVolumeAccessMode{
143		v1.ReadWriteOnce,
144		v1.ReadOnlyMany,
145		v1.ReadWriteMany,
146	}
147}
148
149func (plugin *glusterfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
150	epName, epNamespace, err := plugin.getEndpointNameAndNamespace(spec, pod.Namespace)
151	if err != nil {
152		return nil, err
153	}
154	kubeClient := plugin.host.GetKubeClient()
155	if kubeClient == nil {
156		return nil, fmt.Errorf("failed to get kube client to initialize mounter")
157	}
158	ep, err := kubeClient.CoreV1().Endpoints(epNamespace).Get(context.TODO(), epName, metav1.GetOptions{})
159	if err != nil {
160		klog.Errorf("failed to get endpoint %s: %v", epName, err)
161		return nil, err
162	}
163	klog.V(4).Infof("glusterfs pv endpoint %v", ep)
164	return plugin.newMounterInternal(spec, ep, pod, plugin.host.GetMounter(plugin.GetPluginName()))
165}
166
167func (plugin *glusterfsPlugin) getEndpointNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
168	if spec.Volume != nil && spec.Volume.Glusterfs != nil {
169		endpoints := spec.Volume.Glusterfs.EndpointsName
170		if endpoints == "" {
171			return "", "", fmt.Errorf("no glusterFS endpoint specified")
172		}
173		return endpoints, defaultNamespace, nil
174	} else if spec.PersistentVolume != nil &&
175		spec.PersistentVolume.Spec.Glusterfs != nil {
176		endpoints := spec.PersistentVolume.Spec.Glusterfs.EndpointsName
177		endpointsNs := defaultNamespace
178		overriddenNs := spec.PersistentVolume.Spec.Glusterfs.EndpointsNamespace
179		if overriddenNs != nil {
180			if len(*overriddenNs) > 0 {
181				endpointsNs = *overriddenNs
182			} else {
183				return "", "", fmt.Errorf("endpointnamespace field set, but no endpointnamespace specified")
184			}
185		}
186		return endpoints, endpointsNs, nil
187	}
188	return "", "", fmt.Errorf("spec does not reference a GlusterFS volume type")
189
190}
191func (plugin *glusterfsPlugin) newMounterInternal(spec *volume.Spec, ep *v1.Endpoints, pod *v1.Pod, mounter mount.Interface) (volume.Mounter, error) {
192	volPath, readOnly, err := getVolumeInfo(spec)
193	if err != nil {
194		klog.Errorf("failed to get volumesource: %v", err)
195		return nil, err
196	}
197	return &glusterfsMounter{
198		glusterfs: &glusterfs{
199			volName:         spec.Name(),
200			mounter:         mounter,
201			pod:             pod,
202			plugin:          plugin,
203			MetricsProvider: volume.NewMetricsStatFS(plugin.host.GetPodVolumeDir(pod.UID, utilstrings.EscapeQualifiedName(glusterfsPluginName), spec.Name())),
204		},
205		hosts:        ep,
206		path:         volPath,
207		readOnly:     readOnly,
208		mountOptions: volutil.MountOptionFromSpec(spec),
209	}, nil
210}
211
212func (plugin *glusterfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
213	return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName()))
214}
215
216func (plugin *glusterfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) {
217	return &glusterfsUnmounter{&glusterfs{
218		volName:         volName,
219		mounter:         mounter,
220		pod:             &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID}},
221		plugin:          plugin,
222		MetricsProvider: volume.NewMetricsStatFS(plugin.host.GetPodVolumeDir(podUID, utilstrings.EscapeQualifiedName(glusterfsPluginName), volName)),
223	}}, nil
224}
225
226func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
227
228	// To reconstruct volume spec we need endpoint where fetching endpoint from mount
229	// string looks to be impossible, so returning error.
230	return nil, fmt.Errorf("impossible to reconstruct glusterfs volume spec from volume mountpath")
231}
232
233// Glusterfs volumes represent a bare host file or directory mount of an Glusterfs export.
234type glusterfs struct {
235	volName string
236	pod     *v1.Pod
237	mounter mount.Interface
238	plugin  *glusterfsPlugin
239	volume.MetricsProvider
240}
241
242type glusterfsMounter struct {
243	*glusterfs
244	hosts        *v1.Endpoints
245	path         string
246	readOnly     bool
247	mountOptions []string
248}
249
250var _ volume.Mounter = &glusterfsMounter{}
251
252func (b *glusterfsMounter) GetAttributes() volume.Attributes {
253	return volume.Attributes{
254		ReadOnly:        b.readOnly,
255		Managed:         false,
256		SupportsSELinux: false,
257	}
258}
259
260// Checks prior to mount operations to verify that the required components (binaries, etc.)
261// to mount the volume are available on the underlying node.
262// If not, it returns an error
263func (b *glusterfsMounter) CanMount() error {
264	exe := b.plugin.host.GetExec(b.plugin.GetPluginName())
265	switch runtime.GOOS {
266	case "linux":
267		if _, err := exe.Command("test", "-x", gciLinuxGlusterMountBinaryPath).CombinedOutput(); err != nil {
268			return fmt.Errorf("required binary %s is missing", gciLinuxGlusterMountBinaryPath)
269		}
270	}
271	return nil
272}
273
274// SetUp attaches the disk and bind mounts to the volume path.
275func (b *glusterfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
276	return b.SetUpAt(b.GetPath(), mounterArgs)
277}
278
279func (b *glusterfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
280	notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
281	klog.V(4).Infof("mount setup: %s %v %v", dir, !notMnt, err)
282	if err != nil && !os.IsNotExist(err) {
283		return err
284	}
285	if !notMnt {
286		return nil
287	}
288	if err := os.MkdirAll(dir, 0750); err != nil {
289		return err
290	}
291	err = b.setUpAtInternal(dir)
292	if err == nil {
293		return nil
294	}
295
296	// Cleanup upon failure.
297	mount.CleanupMountPoint(dir, b.mounter, false)
298	return err
299}
300
301func (glusterfsVolume *glusterfs) GetPath() string {
302	name := glusterfsPluginName
303	return glusterfsVolume.plugin.host.GetPodVolumeDir(glusterfsVolume.pod.UID, utilstrings.EscapeQualifiedName(name), glusterfsVolume.volName)
304}
305
306type glusterfsUnmounter struct {
307	*glusterfs
308}
309
310var _ volume.Unmounter = &glusterfsUnmounter{}
311
312func (c *glusterfsUnmounter) TearDown() error {
313	return c.TearDownAt(c.GetPath())
314}
315
316func (c *glusterfsUnmounter) TearDownAt(dir string) error {
317	return mount.CleanupMountPoint(dir, c.mounter, false)
318}
319
320func (b *glusterfsMounter) setUpAtInternal(dir string) error {
321	var errs error
322	options := []string{}
323	hasLogFile := false
324	hasLogLevel := false
325	log := ""
326	if b.readOnly {
327		options = append(options, "ro")
328	}
329
330	// Check for log-file,log-level options existence in user supplied mount options, if provided, use those.
331	for _, userOpt := range b.mountOptions {
332		switch {
333		case dstrings.HasPrefix(userOpt, "log-file"):
334			klog.V(4).Infof("log-file mount option has provided")
335			hasLogFile = true
336
337		case dstrings.HasPrefix(userOpt, "log-level"):
338			klog.V(4).Infof("log-level mount option has provided")
339			hasLogLevel = true
340		}
341	}
342
343	// If logfile has not been provided, create driver specific log file.
344	if !hasLogFile {
345		p := filepath.Join(b.glusterfs.plugin.host.GetPluginDir(glusterfsPluginName), b.glusterfs.volName)
346		if err := os.MkdirAll(p, 0750); err != nil {
347			return fmt.Errorf("failed to create directory %v: %v", p, err)
348		}
349
350		// adding log-level ERROR to remove noise
351		// and more specific log path so each pod has
352		// its own log based on PV + Pod
353		log = filepath.Join(p, b.pod.Name+"-glusterfs.log")
354
355		// Use derived log file in gluster fuse mount
356		options = append(options, "log-file="+log)
357	}
358	if !hasLogLevel {
359		options = append(options, "log-level=ERROR")
360	}
361	var addrlist []string
362	if b.hosts == nil {
363		return fmt.Errorf("glusterfs endpoint is nil in mounter")
364	}
365	addr := sets.String{}
366	if b.hosts.Subsets != nil {
367		for _, s := range b.hosts.Subsets {
368			for _, a := range s.Addresses {
369				if !addr.Has(a.IP) {
370					addr.Insert(a.IP)
371					addrlist = append(addrlist, a.IP)
372				}
373			}
374		}
375	}
376
377	if (len(addrlist) > 0) && (addrlist[0] != "") {
378		ip := addrlist[rand.Intn(len(addrlist))]
379
380		// Add backup-volfile-servers and auto_unmount options.
381		// When ip is also in backup-volfile-servers, there will be a warning:
382		// "gf_remember_backup_volfile_server] 0-glusterfs: failed to set volfile server: File exists".
383		addr.Delete(ip)
384		backups := addr.List()
385		// Avoid an invalid empty backup-volfile-servers option.
386		if len(backups) > 0 {
387			options = append(options, "backup-volfile-servers="+dstrings.Join(addrlist[:], ":"))
388		}
389		options = append(options, "auto_unmount")
390
391		mountOptions := volutil.JoinMountOptions(b.mountOptions, options)
392		// with `backup-volfile-servers` mount option in place, it is not required to
393		// iterate over all the servers in the addrlist. A mount attempt with this option
394		// will fetch all the servers mentioned in the backup-volfile-servers list.
395		// Refer to backup-volfile-servers @ http://docs.gluster.org/en/latest/Administrator%20Guide/Setting%20Up%20Clients/
396
397		errs = b.mounter.Mount(ip+":"+b.path, dir, "glusterfs", mountOptions)
398		if errs == nil {
399			klog.Infof("successfully mounted directory %s", dir)
400			return nil
401		}
402		if dstrings.Contains(errs.Error(), "Invalid option auto_unmount") ||
403			dstrings.Contains(errs.Error(), "Invalid argument") {
404			// Give a try without `auto_unmount` mount option, because
405			// it could be that gluster fuse client is older version and
406			// mount.glusterfs is unaware of `auto_unmount`.
407			noAutoMountOptions := make([]string, 0, len(mountOptions))
408			for _, opt := range mountOptions {
409				if opt != "auto_unmount" {
410					noAutoMountOptions = append(noAutoMountOptions, opt)
411				}
412			}
413			errs = b.mounter.Mount(ip+":"+b.path, dir, "glusterfs", noAutoMountOptions)
414			if errs == nil {
415				klog.Infof("successfully mounted %s", dir)
416				return nil
417			}
418		}
419	} else {
420		return fmt.Errorf("failed to execute mount command:[no valid ipaddress found in endpoint address list]")
421	}
422
423	// Failed mount scenario.
424	// Since glusterfs does not return error text
425	// it all goes in a log file, we will read the log file
426	logErr := readGlusterLog(log, b.pod.Name)
427	if logErr != nil {
428		return fmt.Errorf("mount failed: %v, the following error information was pulled from the glusterfs log to help diagnose this issue: %v", errs, logErr)
429	}
430	return fmt.Errorf("mount failed: %v", errs)
431
432}
433
434//getVolumeInfo returns 'path' and 'readonly' field values from the provided glusterfs spec.
435func getVolumeInfo(spec *volume.Spec) (string, bool, error) {
436	if spec.Volume != nil && spec.Volume.Glusterfs != nil {
437		return spec.Volume.Glusterfs.Path, spec.Volume.Glusterfs.ReadOnly, nil
438	} else if spec.PersistentVolume != nil &&
439		spec.PersistentVolume.Spec.Glusterfs != nil {
440		return spec.PersistentVolume.Spec.Glusterfs.Path, spec.ReadOnly, nil
441	}
442	return "", false, fmt.Errorf("spec does not reference a Glusterfs volume type")
443}
444
445func (plugin *glusterfsPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
446	return plugin.newProvisionerInternal(options)
447}
448
449func (plugin *glusterfsPlugin) newProvisionerInternal(options volume.VolumeOptions) (volume.Provisioner, error) {
450	return &glusterfsVolumeProvisioner{
451		glusterfsMounter: &glusterfsMounter{
452			glusterfs: &glusterfs{
453				plugin: plugin,
454			},
455		},
456		options: options,
457	}, nil
458}
459
460type provisionerConfig struct {
461	url                string
462	user               string
463	userKey            string
464	secretNamespace    string
465	secretName         string
466	secretValue        string `datapolicy:"token"`
467	clusterID          string
468	gidMin             int
469	gidMax             int
470	volumeType         gapi.VolumeDurabilityInfo
471	volumeOptions      []string
472	volumeNamePrefix   string
473	thinPoolSnapFactor float32
474	customEpNamePrefix string
475}
476
477type glusterfsVolumeProvisioner struct {
478	*glusterfsMounter
479	provisionerConfig
480	options volume.VolumeOptions
481}
482
483func convertGid(gidString string) (int, error) {
484	gid64, err := strconv.ParseInt(gidString, 10, 32)
485	if err != nil {
486		return 0, fmt.Errorf("failed to parse gid %v: %v", gidString, err)
487	}
488	if gid64 < 0 {
489		return 0, fmt.Errorf("negative GIDs %v are not allowed", gidString)
490	}
491
492	// ParseInt returns a int64, but since we parsed only
493	// for 32 bit, we can cast to int without loss:
494	gid := int(gid64)
495	return gid, nil
496}
497
498func convertVolumeParam(volumeString string) (int, error) {
499	count, err := strconv.Atoi(volumeString)
500	if err != nil {
501		return 0, fmt.Errorf("failed to parse volumestring %q: %v", volumeString, err)
502	}
503
504	if count < 0 {
505		return 0, fmt.Errorf("negative values are not allowed")
506	}
507	return count, nil
508}
509
510func (plugin *glusterfsPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
511	return plugin.newDeleterInternal(spec)
512}
513
514func (plugin *glusterfsPlugin) newDeleterInternal(spec *volume.Spec) (volume.Deleter, error) {
515	if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Glusterfs == nil {
516		return nil, fmt.Errorf("spec.PersistentVolume.Spec.Glusterfs is nil")
517	}
518	return &glusterfsVolumeDeleter{
519		glusterfsMounter: &glusterfsMounter{
520			glusterfs: &glusterfs{
521				volName: spec.Name(),
522				plugin:  plugin,
523			},
524			path: spec.PersistentVolume.Spec.Glusterfs.Path,
525		},
526		spec: spec.PersistentVolume,
527	}, nil
528}
529
530type glusterfsVolumeDeleter struct {
531	*glusterfsMounter
532	provisionerConfig
533	spec *v1.PersistentVolume
534}
535
536func (d *glusterfsVolumeDeleter) GetPath() string {
537	name := glusterfsPluginName
538	return d.plugin.host.GetPodVolumeDir(d.glusterfsMounter.glusterfs.pod.UID, utilstrings.EscapeQualifiedName(name), d.glusterfsMounter.glusterfs.volName)
539}
540
541// Traverse the PVs, fetching all the GIDs from those
542// in a given storage class, and mark them in the table.
543func (plugin *glusterfsPlugin) collectGids(className string, gidTable *MinMaxAllocator) error {
544	kubeClient := plugin.host.GetKubeClient()
545	if kubeClient == nil {
546		return fmt.Errorf("failed to get kube client when collecting gids")
547	}
548	pvList, err := kubeClient.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String()})
549	if err != nil {
550		return fmt.Errorf("failed to get existing persistent volumes")
551	}
552	for _, pv := range pvList.Items {
553		if storagehelpers.GetPersistentVolumeClass(&pv) != className {
554			continue
555		}
556		pvName := pv.ObjectMeta.Name
557		gidStr, ok := pv.Annotations[volutil.VolumeGidAnnotationKey]
558		if !ok {
559			klog.Warningf("no GID found in pv %v", pvName)
560			continue
561		}
562		gid, err := convertGid(gidStr)
563		if err != nil {
564			klog.Errorf("failed to parse gid %s: %v", gidStr, err)
565			continue
566		}
567		_, err = gidTable.Allocate(gid)
568		if err == ErrConflict {
569			klog.Warningf("GID %v found in pv %v was already allocated", gid, pvName)
570		} else if err != nil {
571			return fmt.Errorf("failed to store gid %v found in pv %v: %v", gid, pvName, err)
572		}
573	}
574	return nil
575}
576
577// Return the gid table for a storage class.
578// - If this is the first time, fill it with all the gids
579//   used in PVs of this storage class by traversing the PVs.
580// - Adapt the range of the table to the current range of the SC.
581func (plugin *glusterfsPlugin) getGidTable(className string, min int, max int) (*MinMaxAllocator, error) {
582	plugin.gidTableLock.Lock()
583	gidTable, ok := plugin.gidTable[className]
584	plugin.gidTableLock.Unlock()
585
586	if ok {
587		err := gidTable.SetRange(min, max)
588		if err != nil {
589			return nil, err
590		}
591		return gidTable, nil
592	}
593
594	// create a new table and fill it
595	newGidTable, err := NewMinMaxAllocator(0, absoluteGidMax)
596	if err != nil {
597		return nil, err
598	}
599
600	// collect gids with the full range
601	err = plugin.collectGids(className, newGidTable)
602	if err != nil {
603		return nil, err
604	}
605
606	// and only reduce the range afterwards
607	err = newGidTable.SetRange(min, max)
608	if err != nil {
609		return nil, err
610	}
611
612	// if in the meantime a table appeared, use it
613	plugin.gidTableLock.Lock()
614	defer plugin.gidTableLock.Unlock()
615	gidTable, ok = plugin.gidTable[className]
616	if ok {
617		err = gidTable.SetRange(min, max)
618		if err != nil {
619			return nil, err
620		}
621		return gidTable, nil
622	}
623
624	plugin.gidTable[className] = newGidTable
625	return newGidTable, nil
626}
627
628func (d *glusterfsVolumeDeleter) getGid() (int, bool, error) {
629	gidStr, ok := d.spec.Annotations[volutil.VolumeGidAnnotationKey]
630	if !ok {
631		return 0, false, nil
632	}
633	gid, err := convertGid(gidStr)
634	return gid, true, err
635}
636
637func (d *glusterfsVolumeDeleter) Delete() error {
638	klog.V(2).Infof("delete volume %s", d.glusterfsMounter.path)
639	volumeName := d.glusterfsMounter.path
640	volumeID, err := getVolumeID(d.spec, volumeName)
641	if err != nil {
642		return fmt.Errorf("failed to get volumeID: %v", err)
643	}
644	class, err := volutil.GetClassForVolume(d.plugin.host.GetKubeClient(), d.spec)
645	if err != nil {
646		return err
647	}
648	cfg, err := parseClassParameters(class.Parameters, d.plugin.host.GetKubeClient())
649	if err != nil {
650		return err
651	}
652	d.provisionerConfig = *cfg
653	klog.V(4).Infof("deleting volume %q", volumeID)
654	gid, exists, err := d.getGid()
655	if err != nil {
656		klog.Error(err)
657	} else if exists {
658		gidTable, err := d.plugin.getGidTable(class.Name, cfg.gidMin, cfg.gidMax)
659		if err != nil {
660			return fmt.Errorf("failed to get gidTable: %v", err)
661		}
662		err = gidTable.Release(gid)
663		if err != nil {
664			return fmt.Errorf("failed to release gid %v: %v", gid, err)
665		}
666	}
667	cli := filterClient(gcli.NewClient(d.url, d.user, d.secretValue), d.plugin.host.GetFilteredDialOptions())
668	if cli == nil {
669		klog.Errorf("failed to create glusterfs REST client")
670		return fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
671	}
672	err = cli.VolumeDelete(volumeID)
673	if err != nil {
674		if dstrings.TrimSpace(err.Error()) != errIDNotFound {
675			// don't log error details from client calls in events
676			klog.V(4).Infof("failed to delete volume %s: %v", volumeName, err)
677			return fmt.Errorf("failed to delete volume: see kube-controller-manager.log for details")
678		}
679		klog.V(2).Infof("volume %s not present in heketi, ignoring", volumeName)
680	}
681	klog.V(2).Infof("volume %s deleted successfully", volumeName)
682
683	//Deleter takes endpoint and namespace from pv spec.
684	pvSpec := d.spec.Spec
685	var dynamicEndpoint, dynamicNamespace string
686	if pvSpec.ClaimRef == nil {
687		klog.Errorf("ClaimRef is nil")
688		return fmt.Errorf("ClaimRef is nil")
689	}
690	if pvSpec.ClaimRef.Namespace == "" {
691		klog.Errorf("namespace is nil")
692		return fmt.Errorf("namespace is nil")
693	}
694	dynamicNamespace = pvSpec.ClaimRef.Namespace
695	if pvSpec.Glusterfs.EndpointsName != "" {
696		dynamicEndpoint = pvSpec.Glusterfs.EndpointsName
697	}
698	klog.V(3).Infof("dynamic namespace and endpoint %v/%v", dynamicNamespace, dynamicEndpoint)
699	err = d.deleteEndpointService(dynamicNamespace, dynamicEndpoint)
700	if err != nil {
701		klog.Errorf("failed to delete endpoint/service %v/%v: %v", dynamicNamespace, dynamicEndpoint, err)
702	} else {
703		klog.V(1).Infof("endpoint %v/%v is deleted successfully ", dynamicNamespace, dynamicEndpoint)
704	}
705	return nil
706}
707
708func filterClient(client *gcli.Client, opts *proxyutil.FilteredDialOptions) *gcli.Client {
709	if opts == nil {
710		return client
711	}
712	dialer := proxyutil.NewFilteredDialContext(nil, nil, opts)
713	client.SetClientFunc(func(tlsConfig *tls.Config, checkRedirect gcli.CheckRedirectFunc) (gcli.HttpPerformer, error) {
714		transport := http.DefaultTransport.(*http.Transport).Clone()
715		transport.DialContext = dialer
716		transport.TLSClientConfig = tlsConfig
717		return &http.Client{Transport: transport, CheckRedirect: checkRedirect}, nil
718	})
719	return client
720}
721
722func (p *glusterfsVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
723	if !volutil.ContainsAllAccessModes(p.plugin.GetAccessModes(), p.options.PVC.Spec.AccessModes) {
724		return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", p.options.PVC.Spec.AccessModes, p.plugin.GetAccessModes())
725	}
726	if p.options.PVC.Spec.Selector != nil {
727		klog.V(4).Infof("not able to parse your claim Selector")
728		return nil, fmt.Errorf("not able to parse your claim Selector")
729	}
730	if volutil.CheckPersistentVolumeClaimModeBlock(p.options.PVC) {
731		return nil, fmt.Errorf("%s does not support block volume provisioning", p.plugin.GetPluginName())
732	}
733	klog.V(4).Infof("provision volume with options %v", p.options)
734	scName := storagehelpers.GetPersistentVolumeClaimClass(p.options.PVC)
735	cfg, err := parseClassParameters(p.options.Parameters, p.plugin.host.GetKubeClient())
736	if err != nil {
737		return nil, err
738	}
739	p.provisionerConfig = *cfg
740
741	gidTable, err := p.plugin.getGidTable(scName, cfg.gidMin, cfg.gidMax)
742	if err != nil {
743		return nil, fmt.Errorf("failed to get gidTable: %v", err)
744	}
745	gid, _, err := gidTable.AllocateNext()
746	if err != nil {
747		klog.Errorf("failed to reserve GID from table: %v", err)
748		return nil, fmt.Errorf("failed to reserve GID from table: %v", err)
749	}
750	klog.V(2).Infof("allocated GID %d for PVC %s", gid, p.options.PVC.Name)
751	glusterfs, sizeGiB, volID, err := p.CreateVolume(gid)
752	if err != nil {
753		if releaseErr := gidTable.Release(gid); releaseErr != nil {
754			klog.Errorf("error when releasing GID in storageclass %s: %v", scName, releaseErr)
755		}
756		return nil, fmt.Errorf("failed to create volume: %v", err)
757	}
758	mode := v1.PersistentVolumeFilesystem
759	pv := new(v1.PersistentVolume)
760	pv.Spec.PersistentVolumeSource.Glusterfs = glusterfs
761	pv.Spec.PersistentVolumeReclaimPolicy = p.options.PersistentVolumeReclaimPolicy
762	pv.Spec.AccessModes = p.options.PVC.Spec.AccessModes
763	pv.Spec.VolumeMode = &mode
764	if len(pv.Spec.AccessModes) == 0 {
765		pv.Spec.AccessModes = p.plugin.GetAccessModes()
766	}
767	pv.Spec.MountOptions = p.options.MountOptions
768	gidStr := strconv.FormatInt(int64(gid), 10)
769	pv.Annotations = map[string]string{
770		volutil.VolumeGidAnnotationKey:        gidStr,
771		volutil.VolumeDynamicallyCreatedByKey: heketiAnn,
772		glusterTypeAnn:                        "file",
773		"Description":                         glusterDescAnn,
774		heketiVolIDAnn:                        volID,
775	}
776	pv.Spec.Capacity = v1.ResourceList{
777		v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGiB)),
778	}
779	return pv, nil
780}
781
782func (p *glusterfsVolumeProvisioner) CreateVolume(gid int) (r *v1.GlusterfsPersistentVolumeSource, size int, volID string, err error) {
783	var clusterIDs []string
784	customVolumeName := ""
785	epServiceName := ""
786	kubeClient := p.plugin.host.GetKubeClient()
787	if kubeClient == nil {
788		return nil, 0, "", fmt.Errorf("failed to get kube client to update endpoint")
789	}
790	if len(p.provisionerConfig.customEpNamePrefix) == 0 {
791		epServiceName = string(p.options.PVC.UID)
792	} else {
793		epServiceName = p.provisionerConfig.customEpNamePrefix + "-" + string(p.options.PVC.UID)
794	}
795	epNamespace := p.options.PVC.Namespace
796	endpoint, service, err := p.createOrGetEndpointService(epNamespace, epServiceName, p.options.PVC)
797	if err != nil {
798		klog.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
799		return nil, 0, "", fmt.Errorf("failed to create endpoint/service %v/%v: %v", epNamespace, epServiceName, err)
800	}
801	klog.V(3).Infof("dynamic endpoint %v and service %v ", endpoint, service)
802	capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
803
804	// GlusterFS/heketi creates volumes in units of GiB.
805	sz, err := volumehelpers.RoundUpToGiBInt(capacity)
806	if err != nil {
807		return nil, 0, "", err
808	}
809	klog.V(2).Infof("create volume of size %dGiB", sz)
810	if p.url == "" {
811		return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST URL is empty")
812	}
813	cli := filterClient(gcli.NewClient(p.url, p.user, p.secretValue), p.plugin.host.GetFilteredDialOptions())
814	if cli == nil {
815		return nil, 0, "", fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
816	}
817	if p.provisionerConfig.clusterID != "" {
818		clusterIDs = dstrings.Split(p.clusterID, ",")
819		klog.V(4).Infof("provided clusterIDs %v", clusterIDs)
820	}
821
822	if p.provisionerConfig.volumeNamePrefix != "" {
823		customVolumeName = fmt.Sprintf("%s_%s_%s_%s", p.provisionerConfig.volumeNamePrefix, p.options.PVC.Namespace, p.options.PVC.Name, uuid.NewUUID())
824	}
825	gid64 := int64(gid)
826	snaps := struct {
827		Enable bool    `json:"enable"`
828		Factor float32 `json:"factor"`
829	}{
830		true,
831		p.provisionerConfig.thinPoolSnapFactor,
832	}
833	volumeReq := &gapi.VolumeCreateRequest{Size: sz, Name: customVolumeName, Clusters: clusterIDs, Gid: gid64, Durability: p.volumeType, GlusterVolumeOptions: p.volumeOptions, Snapshot: snaps}
834	volume, err := cli.VolumeCreate(volumeReq)
835	if err != nil {
836		// don't log error details from client calls in events
837		klog.V(4).Infof("failed to create volume: %v", err)
838		return nil, 0, "", fmt.Errorf("failed to create volume: see kube-controller-manager.log for details")
839	}
840	klog.V(1).Infof("volume with size %d and name %s created", volume.Size, volume.Name)
841	volID = volume.Id
842	dynamicHostIps, err := getClusterNodes(cli, volume.Cluster)
843	if err != nil {
844		return nil, 0, "", fmt.Errorf("failed to get cluster nodes for volume %s: %v", volume, err)
845	}
846	addrlist := make([]v1.EndpointAddress, len(dynamicHostIps))
847	for i, v := range dynamicHostIps {
848		addrlist[i].IP = v
849	}
850	subset := make([]v1.EndpointSubset, 1)
851	ports := []v1.EndpointPort{{Port: 1, Protocol: "TCP"}}
852	endpoint.Subsets = subset
853	endpoint.Subsets[0].Addresses = addrlist
854	endpoint.Subsets[0].Ports = ports
855	_, err = kubeClient.CoreV1().Endpoints(epNamespace).Update(context.TODO(), endpoint, metav1.UpdateOptions{})
856	if err != nil {
857		deleteErr := cli.VolumeDelete(volume.Id)
858		if deleteErr != nil {
859			// don't log error details from client calls in events
860			klog.V(4).Infof("failed to delete volume: %v, manual deletion of the volume required", deleteErr)
861		}
862		klog.V(3).Infof("failed to update endpoint, deleting %s", endpoint)
863		err = kubeClient.CoreV1().Services(epNamespace).Delete(context.TODO(), epServiceName, metav1.DeleteOptions{})
864		if err != nil && errors.IsNotFound(err) {
865			klog.V(1).Infof("service %s does not exist in namespace %s", epServiceName, epNamespace)
866			err = nil
867		}
868		if err != nil {
869			klog.Errorf("failed to delete service %s/%s: %v", epNamespace, epServiceName, err)
870		}
871		klog.V(1).Infof("service/endpoint: %s/%s deleted successfully", epNamespace, epServiceName)
872		return nil, 0, "", fmt.Errorf("failed to update endpoint %s: %v", endpoint, err)
873	}
874	klog.V(3).Infof("endpoint %s updated successfully", endpoint)
875	return &v1.GlusterfsPersistentVolumeSource{
876		EndpointsName:      endpoint.Name,
877		EndpointsNamespace: &epNamespace,
878		Path:               volume.Name,
879		ReadOnly:           false,
880	}, sz, volID, nil
881}
882
883// createOrGetEndpointService() makes sure an endpoint and service
884// exist for the given namespace, PVC name, endpoint name
885// I.e. the endpoint or service is only created
886// if it does not exist yet.
887func (p *glusterfsVolumeProvisioner) createOrGetEndpointService(namespace string, epServiceName string, pvc *v1.PersistentVolumeClaim) (endpoint *v1.Endpoints, service *v1.Service, err error) {
888	pvcNameOrID := ""
889	if len(pvc.Name) >= 63 {
890		pvcNameOrID = string(pvc.UID)
891	} else {
892		pvcNameOrID = pvc.Name
893	}
894	endpoint = &v1.Endpoints{
895		ObjectMeta: metav1.ObjectMeta{
896			Namespace: namespace,
897			Name:      epServiceName,
898			Labels: map[string]string{
899				"gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID,
900			},
901		},
902	}
903	kubeClient := p.plugin.host.GetKubeClient()
904	if kubeClient == nil {
905		return nil, nil, fmt.Errorf("failed to get kube client when creating endpoint service")
906	}
907	_, err = kubeClient.CoreV1().Endpoints(namespace).Create(context.TODO(), endpoint, metav1.CreateOptions{})
908	if err != nil && errors.IsAlreadyExists(err) {
909		klog.V(1).Infof("endpoint %s already exist in namespace %s", endpoint, namespace)
910		err = nil
911	}
912	if err != nil {
913		klog.Errorf("failed to create endpoint: %v", err)
914		return nil, nil, fmt.Errorf("failed to create endpoint: %v", err)
915	}
916	service = &v1.Service{
917		ObjectMeta: metav1.ObjectMeta{
918			Name:      epServiceName,
919			Namespace: namespace,
920			Labels: map[string]string{
921				"gluster.kubernetes.io/provisioned-for-pvc": pvcNameOrID,
922			},
923		},
924		Spec: v1.ServiceSpec{
925			Ports: []v1.ServicePort{
926				{Protocol: "TCP", Port: 1}}}}
927	_, err = kubeClient.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
928	if err != nil && errors.IsAlreadyExists(err) {
929		klog.V(1).Infof("service %s already exist in namespace %s", service, namespace)
930		err = nil
931	}
932	if err != nil {
933		klog.Errorf("failed to create service: %v", err)
934		return nil, nil, fmt.Errorf("error creating service: %v", err)
935	}
936	return endpoint, service, nil
937}
938
939func (d *glusterfsVolumeDeleter) deleteEndpointService(namespace string, epServiceName string) (err error) {
940	kubeClient := d.plugin.host.GetKubeClient()
941	if kubeClient == nil {
942		return fmt.Errorf("failed to get kube client when deleting endpoint service")
943	}
944	err = kubeClient.CoreV1().Services(namespace).Delete(context.TODO(), epServiceName, metav1.DeleteOptions{})
945	if err != nil {
946		return fmt.Errorf("failed to delete service %s/%s: %v", namespace, epServiceName, err)
947	}
948	klog.V(1).Infof("service/endpoint: %s/%s deleted successfully", namespace, epServiceName)
949	return nil
950}
951
952// parseSecret finds a given Secret instance and reads user password from it.
953func parseSecret(namespace, secretName string, kubeClient clientset.Interface) (string, error) {
954	secretMap, err := volutil.GetSecretForPV(namespace, secretName, glusterfsPluginName, kubeClient)
955	if err != nil {
956		klog.Errorf("failed to get secret: %s/%s: %v", namespace, secretName, err)
957		return "", fmt.Errorf("failed to get secret %s/%s: %v", namespace, secretName, err)
958	}
959	if len(secretMap) == 0 {
960		return "", fmt.Errorf("empty secret map")
961	}
962	secret := ""
963	for k, v := range secretMap {
964		if k == secretKeyName {
965			return v, nil
966		}
967		secret = v
968	}
969
970	// If not found, the last secret in the map wins as done before
971	return secret, nil
972}
973
974// getClusterNodes() returns the cluster nodes of a given cluster
975func getClusterNodes(cli *gcli.Client, cluster string) (dynamicHostIps []string, err error) {
976	clusterinfo, err := cli.ClusterInfo(cluster)
977	if err != nil {
978		// don't log error details from client calls in events
979		klog.V(4).Infof("failed to get cluster details: %v", err)
980		return nil, fmt.Errorf("failed to get cluster details: see kube-controller-manager.log for details")
981	}
982
983	// For the dynamically provisioned volume, we gather the list of node IPs
984	// of the cluster on which provisioned volume belongs to, as there can be multiple
985	// clusters.
986	for _, node := range clusterinfo.Nodes {
987		nodeInfo, err := cli.NodeInfo(string(node))
988		if err != nil {
989			// don't log error details from client calls in events
990			klog.V(4).Infof("failed to get host ipaddress: %v", err)
991			return nil, fmt.Errorf("failed to get host ipaddress: see kube-controller-manager.log for details")
992		}
993		ipaddr := dstrings.Join(nodeInfo.NodeAddRequest.Hostnames.Storage, "")
994		// IP validates if a string is a valid IP address.
995		ip := net.ParseIP(ipaddr)
996		if ip == nil {
997			return nil, fmt.Errorf("glusterfs server node ip address %s must be a valid IP address, (e.g. 10.9.8.7)", ipaddr)
998		}
999		dynamicHostIps = append(dynamicHostIps, ipaddr)
1000	}
1001	klog.V(3).Infof("host list :%v", dynamicHostIps)
1002	if len(dynamicHostIps) == 0 {
1003		return nil, fmt.Errorf("no hosts found: %v", err)
1004	}
1005	return dynamicHostIps, nil
1006}
1007
1008// parseClassParameters parses StorageClass parameters.
1009func parseClassParameters(params map[string]string, kubeClient clientset.Interface) (*provisionerConfig, error) {
1010	var cfg provisionerConfig
1011	var err error
1012	cfg.gidMin = defaultGidMin
1013	cfg.gidMax = defaultGidMax
1014	cfg.customEpNamePrefix = dynamicEpSvcPrefix
1015
1016	authEnabled := true
1017	parseVolumeType := ""
1018	parseVolumeOptions := ""
1019	parseVolumeNamePrefix := ""
1020	parseThinPoolSnapFactor := ""
1021
1022	//thin pool snap factor default to 1.0
1023	cfg.thinPoolSnapFactor = float32(1.0)
1024
1025	for k, v := range params {
1026		switch dstrings.ToLower(k) {
1027		case "resturl":
1028			cfg.url = v
1029		case "restuser":
1030			cfg.user = v
1031		case "restuserkey":
1032			cfg.userKey = v
1033		case "secretname":
1034			cfg.secretName = v
1035		case "secretnamespace":
1036			cfg.secretNamespace = v
1037		case "clusterid":
1038			if len(v) != 0 {
1039				cfg.clusterID = v
1040			}
1041		case "restauthenabled":
1042			authEnabled = dstrings.ToLower(v) == "true"
1043		case "gidmin":
1044			parseGidMin, err := convertGid(v)
1045			if err != nil {
1046				return nil, fmt.Errorf("invalid gidMin value %q for volume plugin %s", k, glusterfsPluginName)
1047			}
1048			if parseGidMin < absoluteGidMin {
1049				return nil, fmt.Errorf("gidMin must be >= %v", absoluteGidMin)
1050			}
1051			if parseGidMin > absoluteGidMax {
1052				return nil, fmt.Errorf("gidMin must be <= %v", absoluteGidMax)
1053			}
1054			cfg.gidMin = parseGidMin
1055		case "gidmax":
1056			parseGidMax, err := convertGid(v)
1057			if err != nil {
1058				return nil, fmt.Errorf("invalid gidMax value %q for volume plugin %s", k, glusterfsPluginName)
1059			}
1060			if parseGidMax < absoluteGidMin {
1061				return nil, fmt.Errorf("gidMax must be >= %v", absoluteGidMin)
1062			}
1063			if parseGidMax > absoluteGidMax {
1064				return nil, fmt.Errorf("gidMax must be <= %v", absoluteGidMax)
1065			}
1066			cfg.gidMax = parseGidMax
1067		case "volumetype":
1068			parseVolumeType = v
1069
1070		case "volumeoptions":
1071			if len(v) != 0 {
1072				parseVolumeOptions = v
1073			}
1074		case "volumenameprefix":
1075			if len(v) != 0 {
1076				parseVolumeNamePrefix = v
1077			}
1078		case "snapfactor":
1079			if len(v) != 0 {
1080				parseThinPoolSnapFactor = v
1081			}
1082		case "customepnameprefix":
1083			// If the string has > 'maxCustomEpNamePrefixLen' chars, the final endpoint name will
1084			// exceed the limitation of 63 chars, so fail if prefix is > 'maxCustomEpNamePrefixLen'
1085			// characters. This is only applicable for 'customepnameprefix' string and default ep name
1086			// string will always pass.
1087			if len(v) <= maxCustomEpNamePrefixLen {
1088				cfg.customEpNamePrefix = v
1089			} else {
1090				return nil, fmt.Errorf("'customepnameprefix' value should be < %d characters", maxCustomEpNamePrefixLen)
1091			}
1092		default:
1093			return nil, fmt.Errorf("invalid option %q for volume plugin %s", k, glusterfsPluginName)
1094		}
1095	}
1096	if len(cfg.url) == 0 {
1097		return nil, fmt.Errorf("StorageClass for provisioner %s must contain 'resturl' parameter", glusterfsPluginName)
1098	}
1099	if len(parseVolumeType) == 0 {
1100		cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: replicaCount}}
1101	} else {
1102		parseVolumeTypeInfo := dstrings.Split(parseVolumeType, ":")
1103		switch parseVolumeTypeInfo[0] {
1104		case "replicate":
1105			if len(parseVolumeTypeInfo) >= 2 {
1106				newReplicaCount, err := convertVolumeParam(parseVolumeTypeInfo[1])
1107				if err != nil {
1108					return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[1], err)
1109				}
1110				cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: newReplicaCount}}
1111			} else {
1112				cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityReplicate, Replicate: gapi.ReplicaDurability{Replica: replicaCount}}
1113			}
1114		case "disperse":
1115			if len(parseVolumeTypeInfo) >= 3 {
1116				newDisperseData, err := convertVolumeParam(parseVolumeTypeInfo[1])
1117				if err != nil {
1118					return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[1], err)
1119				}
1120				newDisperseRedundancy, err := convertVolumeParam(parseVolumeTypeInfo[2])
1121				if err != nil {
1122					return nil, fmt.Errorf("error parsing volumeType %q: %s", parseVolumeTypeInfo[2], err)
1123				}
1124				cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityEC, Disperse: gapi.DisperseDurability{Data: newDisperseData, Redundancy: newDisperseRedundancy}}
1125			} else {
1126				return nil, fmt.Errorf("StorageClass for provisioner %q must have data:redundancy count set for disperse volumes in storage class option '%s'", glusterfsPluginName, "volumetype")
1127			}
1128		case "none":
1129			cfg.volumeType = gapi.VolumeDurabilityInfo{Type: gapi.DurabilityDistributeOnly}
1130		default:
1131			return nil, fmt.Errorf("error parsing value for option 'volumetype' for volume plugin %s", glusterfsPluginName)
1132		}
1133	}
1134	if !authEnabled {
1135		cfg.user = ""
1136		cfg.secretName = ""
1137		cfg.secretNamespace = ""
1138		cfg.userKey = ""
1139		cfg.secretValue = ""
1140	}
1141
1142	if len(cfg.secretName) != 0 || len(cfg.secretNamespace) != 0 {
1143		// secretName + Namespace has precedence over userKey
1144		if len(cfg.secretName) != 0 && len(cfg.secretNamespace) != 0 {
1145			cfg.secretValue, err = parseSecret(cfg.secretNamespace, cfg.secretName, kubeClient)
1146			if err != nil {
1147				return nil, err
1148			}
1149		} else {
1150			return nil, fmt.Errorf("StorageClass for provisioner %q must have secretNamespace and secretName either both set or both empty", glusterfsPluginName)
1151		}
1152	} else {
1153		cfg.secretValue = cfg.userKey
1154	}
1155	if cfg.gidMin > cfg.gidMax {
1156		return nil, fmt.Errorf("storageClass for provisioner %q must have gidMax value >= gidMin", glusterfsPluginName)
1157	}
1158	if len(parseVolumeOptions) != 0 {
1159		volOptions := dstrings.Split(parseVolumeOptions, ",")
1160		if len(volOptions) == 0 {
1161			return nil, fmt.Errorf("storageClass for provisioner %q must have valid (for e.g., 'client.ssl on') volume option", glusterfsPluginName)
1162		}
1163		cfg.volumeOptions = volOptions
1164	}
1165	if len(parseVolumeNamePrefix) != 0 {
1166		if dstrings.Contains(parseVolumeNamePrefix, "_") {
1167			return nil, fmt.Errorf("storageclass parameter 'volumenameprefix' should not contain '_' in its value")
1168		}
1169		cfg.volumeNamePrefix = parseVolumeNamePrefix
1170	}
1171	if len(parseThinPoolSnapFactor) != 0 {
1172		thinPoolSnapFactor, err := strconv.ParseFloat(parseThinPoolSnapFactor, 32)
1173		if err != nil {
1174			return nil, fmt.Errorf("failed to convert snapfactor %v to float: %v", parseThinPoolSnapFactor, err)
1175		}
1176		if thinPoolSnapFactor < 1.0 || thinPoolSnapFactor > 100.0 {
1177			return nil, fmt.Errorf("invalid snapshot factor %v, the value must be between 1 to 100", thinPoolSnapFactor)
1178		}
1179		cfg.thinPoolSnapFactor = float32(thinPoolSnapFactor)
1180	}
1181	return &cfg, nil
1182}
1183
1184// getVolumeID returns volumeID from the PV or volumename.
1185func getVolumeID(pv *v1.PersistentVolume, volumeName string) (string, error) {
1186	volumeID := ""
1187
1188	// Get volID from pvspec if available, else fill it from volumename.
1189	if pv != nil {
1190		if pv.Annotations[heketiVolIDAnn] != "" {
1191			volumeID = pv.Annotations[heketiVolIDAnn]
1192		} else {
1193			volumeID = dstrings.TrimPrefix(volumeName, volPrefix)
1194		}
1195	} else {
1196		return volumeID, fmt.Errorf("provided PV spec is nil")
1197	}
1198	if volumeID == "" {
1199		return volumeID, fmt.Errorf("volume ID is empty")
1200	}
1201	return volumeID, nil
1202}
1203
1204func (plugin *glusterfsPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resource.Quantity, oldSize resource.Quantity) (resource.Quantity, error) {
1205	pvSpec := spec.PersistentVolume.Spec
1206	volumeName := pvSpec.Glusterfs.Path
1207	klog.V(2).Infof("received request to expand volume %s", volumeName)
1208	volumeID, err := getVolumeID(spec.PersistentVolume, volumeName)
1209	if err != nil {
1210		return oldSize, fmt.Errorf("failed to get volumeID for volume %s: %v", volumeName, err)
1211	}
1212	//Get details of StorageClass.
1213	class, err := volutil.GetClassForVolume(plugin.host.GetKubeClient(), spec.PersistentVolume)
1214	if err != nil {
1215		return oldSize, err
1216	}
1217	cfg, err := parseClassParameters(class.Parameters, plugin.host.GetKubeClient())
1218	if err != nil {
1219		return oldSize, err
1220	}
1221	klog.V(4).Infof("expanding volume: %q", volumeID)
1222
1223	//Create REST server connection
1224	cli := filterClient(gcli.NewClient(cfg.url, cfg.user, cfg.secretValue), plugin.host.GetFilteredDialOptions())
1225	if cli == nil {
1226		klog.Errorf("failed to create glusterfs REST client")
1227		return oldSize, fmt.Errorf("failed to create glusterfs REST client, REST server authentication failed")
1228	}
1229
1230	// Find out delta size
1231	expansionSize := newSize
1232	expansionSize.Sub(oldSize)
1233	expansionSizeGiB, err := volumehelpers.RoundUpToGiBInt(expansionSize)
1234	if err != nil {
1235		return oldSize, err
1236	}
1237
1238	// Find out requested Size
1239	requestGiB, err := volumehelpers.RoundUpToGiB(newSize)
1240	if err != nil {
1241		return oldSize, err
1242	}
1243
1244	//Check the existing volume size
1245	currentVolumeInfo, err := cli.VolumeInfo(volumeID)
1246	if err != nil {
1247		// don't log error details from client calls in events
1248		klog.V(4).Infof("error when fetching details of volume %s: %v", volumeName, err)
1249		return oldSize, fmt.Errorf("failed to get volume info %s: see kube-controller-manager.log for details", volumeName)
1250	}
1251	if int64(currentVolumeInfo.Size) >= requestGiB {
1252		return newSize, nil
1253	}
1254
1255	// Make volume expansion request
1256	volumeExpandReq := &gapi.VolumeExpandRequest{Size: expansionSizeGiB}
1257
1258	// Expand the volume
1259	volumeInfoRes, err := cli.VolumeExpand(volumeID, volumeExpandReq)
1260	if err != nil {
1261		// don't log error details from client calls in events
1262		klog.V(4).Infof("failed to expand volume %s: %v", volumeName, err)
1263		return oldSize, fmt.Errorf("failed to expand volume: see kube-controller-manager.log for details")
1264	}
1265	klog.V(2).Infof("volume %s expanded to new size %d successfully", volumeName, volumeInfoRes.Size)
1266	newVolumeSize := resource.MustParse(fmt.Sprintf("%dGi", volumeInfoRes.Size))
1267	return newVolumeSize, nil
1268}
1269