1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package flocker
18
19import (
20	"fmt"
21	"os"
22	"path/filepath"
23	"time"
24
25	flockerapi "github.com/clusterhq/flocker-go"
26	"k8s.io/klog/v2"
27	"k8s.io/mount-utils"
28	utilstrings "k8s.io/utils/strings"
29
30	v1 "k8s.io/api/core/v1"
31	"k8s.io/apimachinery/pkg/types"
32	"k8s.io/kubernetes/pkg/util/env"
33	"k8s.io/kubernetes/pkg/volume"
34	"k8s.io/kubernetes/pkg/volume/util"
35)
36
37// ProbeVolumePlugins is the primary entrypoint for volume plugins.
38func ProbeVolumePlugins() []volume.VolumePlugin {
39	return []volume.VolumePlugin{&flockerPlugin{nil}}
40}
41
42type flockerPlugin struct {
43	host volume.VolumeHost
44}
45
46type flockerVolume struct {
47	volName string
48	podUID  types.UID
49	// dataset metadata name deprecated
50	datasetName string
51	// dataset uuid
52	datasetUUID string
53	//pod           *v1.Pod
54	flockerClient flockerapi.Clientable
55	manager       volumeManager
56	plugin        *flockerPlugin
57	mounter       mount.Interface
58	volume.MetricsProvider
59}
60
61var _ volume.VolumePlugin = &flockerPlugin{}
62var _ volume.PersistentVolumePlugin = &flockerPlugin{}
63var _ volume.DeletableVolumePlugin = &flockerPlugin{}
64var _ volume.ProvisionableVolumePlugin = &flockerPlugin{}
65
66const (
67	flockerPluginName = "kubernetes.io/flocker"
68
69	defaultHost           = "localhost"
70	defaultPort           = 4523
71	defaultCACertFile     = "/etc/flocker/cluster.crt"
72	defaultClientKeyFile  = "/etc/flocker/apiuser.key"
73	defaultClientCertFile = "/etc/flocker/apiuser.crt"
74	defaultMountPath      = "/flocker"
75
76	timeoutWaitingForVolume = 2 * time.Minute
77	tickerWaitingForVolume  = 5 * time.Second
78)
79
80func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
81	return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName)
82}
83
84func makeGlobalFlockerPath(datasetUUID string) string {
85	return filepath.Join(defaultMountPath, datasetUUID)
86}
87
88func (p *flockerPlugin) Init(host volume.VolumeHost) error {
89	p.host = host
90	return nil
91}
92
93func (p *flockerPlugin) GetPluginName() string {
94	return flockerPluginName
95}
96
97func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
98	volumeSource, _, err := getVolumeSource(spec)
99	if err != nil {
100		return "", err
101	}
102
103	return volumeSource.DatasetName, nil
104}
105
106func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool {
107	return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) ||
108		(spec.Volume != nil && spec.Volume.Flocker != nil)
109}
110
111func (p *flockerPlugin) RequiresRemount(spec *volume.Spec) bool {
112	return false
113}
114
115func (p *flockerPlugin) SupportsMountOption() bool {
116	return false
117}
118
119func (p *flockerPlugin) SupportsBulkVolumeVerification() bool {
120	return false
121}
122
123func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
124	return []v1.PersistentVolumeAccessMode{
125		v1.ReadWriteOnce,
126	}
127}
128
129func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) {
130	// AFAIK this will always be r/w, but perhaps for the future it will be needed
131	readOnly := false
132
133	if spec.Volume != nil && spec.Volume.Flocker != nil {
134		return spec.Volume.Flocker, readOnly
135	}
136	return spec.PersistentVolume.Spec.Flocker, readOnly
137}
138
139func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
140	// Inject real implementations here, test through the internal function.
141	return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
142}
143
144func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) {
145	volumeSource, readOnly, err := getVolumeSource(spec)
146	if err != nil {
147		return nil, err
148	}
149
150	datasetName := volumeSource.DatasetName
151	datasetUUID := volumeSource.DatasetUUID
152
153	return &flockerVolumeMounter{
154		flockerVolume: &flockerVolume{
155			podUID:          podUID,
156			volName:         spec.Name(),
157			datasetName:     datasetName,
158			datasetUUID:     datasetUUID,
159			mounter:         mounter,
160			manager:         manager,
161			plugin:          p,
162			MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)),
163		},
164		readOnly: readOnly}, nil
165}
166
167func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
168	// Inject real implementations here, test through the internal function.
169	return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName()))
170}
171
172func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) {
173	return &flockerVolumeUnmounter{&flockerVolume{
174		podUID:          podUID,
175		volName:         volName,
176		manager:         manager,
177		mounter:         mounter,
178		plugin:          p,
179		MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)),
180	}}, nil
181}
182
183func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
184	flockerVolume := &v1.Volume{
185		Name: volumeName,
186		VolumeSource: v1.VolumeSource{
187			Flocker: &v1.FlockerVolumeSource{
188				DatasetName: volumeName,
189			},
190		},
191	}
192	return volume.NewSpecFromVolume(flockerVolume), nil
193}
194
195func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) {
196
197	// return UUID if set
198	if len(b.datasetUUID) > 0 {
199		return b.datasetUUID, nil
200	}
201
202	if b.flockerClient == nil {
203		return "", fmt.Errorf("flocker client is not initialized")
204	}
205
206	// lookup in flocker API otherwise
207	return b.flockerClient.GetDatasetID(b.datasetName)
208}
209
210type flockerVolumeMounter struct {
211	*flockerVolume
212	readOnly bool
213}
214
215func (b *flockerVolumeMounter) GetAttributes() volume.Attributes {
216	return volume.Attributes{
217		ReadOnly:        b.readOnly,
218		Managed:         false,
219		SupportsSELinux: false,
220	}
221}
222
223// Checks prior to mount operations to verify that the required components (binaries, etc.)
224// to mount the volume are available on the underlying node.
225// If not, it returns an error
226func (b *flockerVolumeMounter) CanMount() error {
227	return nil
228}
229
230func (b *flockerVolumeMounter) GetPath() string {
231	return getPath(b.podUID, b.volName, b.plugin.host)
232}
233
234// SetUp bind mounts the disk global mount to the volume path.
235func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
236	return b.SetUpAt(b.GetPath(), mounterArgs)
237}
238
239// newFlockerClient uses environment variables and pod attributes to return a
240// flocker client capable of talking with the Flocker control service.
241func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) {
242	host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost)
243	port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort)
244
245	if err != nil {
246		return nil, err
247	}
248	caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile)
249	keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile)
250	certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile)
251
252	c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath)
253	return c, err
254}
255
256func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) {
257
258	hostIP, err := b.plugin.host.GetHostIP()
259	if err != nil {
260		return nil, err
261	}
262
263	return b.plugin.newFlockerClient(hostIP.String())
264}
265
266/*
267SetUpAt will setup a Flocker volume following this flow of calls to the Flocker
268control service:
269
2701. Get the dataset id for the given volume name/dir
2712. It should already be there, if it's not the user needs to manually create it
2723. Check the current Primary UUID
2734. If it doesn't match with the Primary UUID that we got on 2, then we will
274   need to update the Primary UUID for this volume.
2755. Wait until the Primary UUID was updated or timeout.
276*/
277func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
278	var err error
279	if b.flockerClient == nil {
280		b.flockerClient, err = b.newFlockerClient()
281		if err != nil {
282			return err
283		}
284	}
285
286	datasetUUID, err := b.GetDatasetUUID()
287	if err != nil {
288		return fmt.Errorf("the datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err)
289	}
290
291	datasetState, err := b.flockerClient.GetDatasetState(datasetUUID)
292	if err != nil {
293		return fmt.Errorf("the datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err)
294	}
295
296	primaryUUID, err := b.flockerClient.GetPrimaryUUID()
297	if err != nil {
298		return err
299	}
300
301	if datasetState.Primary != primaryUUID {
302		if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil {
303			return err
304		}
305		_, err := b.flockerClient.GetDatasetState(datasetUUID)
306		if err != nil {
307			return fmt.Errorf("the volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID)
308		}
309	}
310
311	// TODO: handle failed mounts here.
312	notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
313	klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly)
314	if err != nil && !os.IsNotExist(err) {
315		klog.Errorf("cannot validate mount point: %s %v", dir, err)
316		return err
317	}
318	if !notMnt {
319		return nil
320	}
321
322	if err := os.MkdirAll(dir, 0750); err != nil {
323		klog.Errorf("mkdir failed on disk %s (%v)", dir, err)
324		return err
325	}
326
327	// Perform a bind mount to the full path to allow duplicate mounts of the same PD.
328	options := []string{"bind"}
329	if b.readOnly {
330		options = append(options, "ro")
331	}
332
333	globalFlockerPath := makeGlobalFlockerPath(datasetUUID)
334	klog.V(4).Infof("attempting to mount %s", dir)
335
336	err = b.mounter.MountSensitiveWithoutSystemd(globalFlockerPath, dir, "", options, nil)
337	if err != nil {
338		notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
339		if mntErr != nil {
340			klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
341			return err
342		}
343		if !notMnt {
344			if mntErr = b.mounter.Unmount(dir); mntErr != nil {
345				klog.Errorf("failed to unmount: %v", mntErr)
346				return err
347			}
348			notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir)
349			if mntErr != nil {
350				klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr)
351				return err
352			}
353			if !notMnt {
354				// This is very odd, we don't expect it.  We'll try again next sync loop.
355				klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", dir)
356				return err
357			}
358		}
359		os.Remove(dir)
360		klog.Errorf("mount of disk %s failed: %v", dir, err)
361		return err
362	}
363
364	if !b.readOnly {
365		volume.SetVolumeOwnership(b, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
366	}
367
368	klog.V(4).Infof("successfully mounted %s", dir)
369	return nil
370}
371
372// updateDatasetPrimary will update the primary in Flocker and wait for it to
373// be ready. If it never gets to ready state it will timeout and error.
374func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error {
375	// We need to update the primary and wait for it to be ready
376	_, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID)
377	if err != nil {
378		return err
379	}
380
381	timeoutChan := time.NewTimer(timeoutWaitingForVolume)
382	defer timeoutChan.Stop()
383	tickChan := time.NewTicker(tickerWaitingForVolume)
384	defer tickChan.Stop()
385
386	for {
387		if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID {
388			return nil
389		}
390
391		select {
392		case <-timeoutChan.C:
393			return fmt.Errorf(
394				"Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v",
395				datasetUUID, primaryUUID, err,
396			)
397		case <-tickChan.C:
398		}
399	}
400
401}
402
403func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) {
404	if spec.Volume != nil && spec.Volume.Flocker != nil {
405		return spec.Volume.Flocker, spec.ReadOnly, nil
406	} else if spec.PersistentVolume != nil &&
407		spec.PersistentVolume.Spec.Flocker != nil {
408		return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil
409	}
410
411	return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type")
412}
413
414type flockerVolumeUnmounter struct {
415	*flockerVolume
416}
417
418var _ volume.Unmounter = &flockerVolumeUnmounter{}
419
420func (c *flockerVolumeUnmounter) GetPath() string {
421	return getPath(c.podUID, c.volName, c.plugin.host)
422}
423
424// Unmounts the bind mount, and detaches the disk only if the PD
425// resource was the last reference to that disk on the kubelet.
426func (c *flockerVolumeUnmounter) TearDown() error {
427	return c.TearDownAt(c.GetPath())
428}
429
430// TearDownAt unmounts the bind mount
431func (c *flockerVolumeUnmounter) TearDownAt(dir string) error {
432	return mount.CleanupMountPoint(dir, c.mounter, false)
433}
434
435func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
436	return p.newDeleterInternal(spec, &flockerUtil{})
437}
438
439func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) {
440	if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil {
441		return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil")
442	}
443	return &flockerVolumeDeleter{
444		flockerVolume: &flockerVolume{
445			volName:     spec.Name(),
446			datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName,
447			datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID,
448			manager:     manager,
449		}}, nil
450}
451
452func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
453	return p.newProvisionerInternal(options, &flockerUtil{})
454}
455
456func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) {
457	return &flockerVolumeProvisioner{
458		flockerVolume: &flockerVolume{
459			manager: manager,
460			plugin:  p,
461		},
462		options: options,
463	}, nil
464}
465