1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package flocker 18 19import ( 20 "fmt" 21 "os" 22 "path/filepath" 23 "time" 24 25 flockerapi "github.com/clusterhq/flocker-go" 26 "k8s.io/klog/v2" 27 "k8s.io/mount-utils" 28 utilstrings "k8s.io/utils/strings" 29 30 v1 "k8s.io/api/core/v1" 31 "k8s.io/apimachinery/pkg/types" 32 "k8s.io/kubernetes/pkg/util/env" 33 "k8s.io/kubernetes/pkg/volume" 34 "k8s.io/kubernetes/pkg/volume/util" 35) 36 37// ProbeVolumePlugins is the primary entrypoint for volume plugins. 38func ProbeVolumePlugins() []volume.VolumePlugin { 39 return []volume.VolumePlugin{&flockerPlugin{nil}} 40} 41 42type flockerPlugin struct { 43 host volume.VolumeHost 44} 45 46type flockerVolume struct { 47 volName string 48 podUID types.UID 49 // dataset metadata name deprecated 50 datasetName string 51 // dataset uuid 52 datasetUUID string 53 //pod *v1.Pod 54 flockerClient flockerapi.Clientable 55 manager volumeManager 56 plugin *flockerPlugin 57 mounter mount.Interface 58 volume.MetricsProvider 59} 60 61var _ volume.VolumePlugin = &flockerPlugin{} 62var _ volume.PersistentVolumePlugin = &flockerPlugin{} 63var _ volume.DeletableVolumePlugin = &flockerPlugin{} 64var _ volume.ProvisionableVolumePlugin = &flockerPlugin{} 65 66const ( 67 flockerPluginName = "kubernetes.io/flocker" 68 69 defaultHost = "localhost" 70 defaultPort = 4523 71 defaultCACertFile = "/etc/flocker/cluster.crt" 72 defaultClientKeyFile = "/etc/flocker/apiuser.key" 73 defaultClientCertFile = "/etc/flocker/apiuser.crt" 74 defaultMountPath = "/flocker" 75 76 timeoutWaitingForVolume = 2 * time.Minute 77 tickerWaitingForVolume = 5 * time.Second 78) 79 80func getPath(uid types.UID, volName string, host volume.VolumeHost) string { 81 return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(flockerPluginName), volName) 82} 83 84func makeGlobalFlockerPath(datasetUUID string) string { 85 return filepath.Join(defaultMountPath, datasetUUID) 86} 87 88func (p *flockerPlugin) Init(host volume.VolumeHost) error { 89 p.host = host 90 return nil 91} 92 93func (p *flockerPlugin) GetPluginName() string { 94 return flockerPluginName 95} 96 97func (p *flockerPlugin) GetVolumeName(spec *volume.Spec) (string, error) { 98 volumeSource, _, err := getVolumeSource(spec) 99 if err != nil { 100 return "", err 101 } 102 103 return volumeSource.DatasetName, nil 104} 105 106func (p *flockerPlugin) CanSupport(spec *volume.Spec) bool { 107 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker != nil) || 108 (spec.Volume != nil && spec.Volume.Flocker != nil) 109} 110 111func (p *flockerPlugin) RequiresRemount(spec *volume.Spec) bool { 112 return false 113} 114 115func (p *flockerPlugin) SupportsMountOption() bool { 116 return false 117} 118 119func (p *flockerPlugin) SupportsBulkVolumeVerification() bool { 120 return false 121} 122 123func (p *flockerPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode { 124 return []v1.PersistentVolumeAccessMode{ 125 v1.ReadWriteOnce, 126 } 127} 128 129func (p *flockerPlugin) getFlockerVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool) { 130 // AFAIK this will always be r/w, but perhaps for the future it will be needed 131 readOnly := false 132 133 if spec.Volume != nil && spec.Volume.Flocker != nil { 134 return spec.Volume.Flocker, readOnly 135 } 136 return spec.PersistentVolume.Spec.Flocker, readOnly 137} 138 139func (p *flockerPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) { 140 // Inject real implementations here, test through the internal function. 141 return p.newMounterInternal(spec, pod.UID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName())) 142} 143 144func (p *flockerPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Mounter, error) { 145 volumeSource, readOnly, err := getVolumeSource(spec) 146 if err != nil { 147 return nil, err 148 } 149 150 datasetName := volumeSource.DatasetName 151 datasetUUID := volumeSource.DatasetUUID 152 153 return &flockerVolumeMounter{ 154 flockerVolume: &flockerVolume{ 155 podUID: podUID, 156 volName: spec.Name(), 157 datasetName: datasetName, 158 datasetUUID: datasetUUID, 159 mounter: mounter, 160 manager: manager, 161 plugin: p, 162 MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), p.host)), 163 }, 164 readOnly: readOnly}, nil 165} 166 167func (p *flockerPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { 168 // Inject real implementations here, test through the internal function. 169 return p.newUnmounterInternal(volName, podUID, &flockerUtil{}, p.host.GetMounter(p.GetPluginName())) 170} 171 172func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, manager volumeManager, mounter mount.Interface) (volume.Unmounter, error) { 173 return &flockerVolumeUnmounter{&flockerVolume{ 174 podUID: podUID, 175 volName: volName, 176 manager: manager, 177 mounter: mounter, 178 plugin: p, 179 MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, p.host)), 180 }}, nil 181} 182 183func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { 184 flockerVolume := &v1.Volume{ 185 Name: volumeName, 186 VolumeSource: v1.VolumeSource{ 187 Flocker: &v1.FlockerVolumeSource{ 188 DatasetName: volumeName, 189 }, 190 }, 191 } 192 return volume.NewSpecFromVolume(flockerVolume), nil 193} 194 195func (b *flockerVolume) GetDatasetUUID() (datasetUUID string, err error) { 196 197 // return UUID if set 198 if len(b.datasetUUID) > 0 { 199 return b.datasetUUID, nil 200 } 201 202 if b.flockerClient == nil { 203 return "", fmt.Errorf("flocker client is not initialized") 204 } 205 206 // lookup in flocker API otherwise 207 return b.flockerClient.GetDatasetID(b.datasetName) 208} 209 210type flockerVolumeMounter struct { 211 *flockerVolume 212 readOnly bool 213} 214 215func (b *flockerVolumeMounter) GetAttributes() volume.Attributes { 216 return volume.Attributes{ 217 ReadOnly: b.readOnly, 218 Managed: false, 219 SupportsSELinux: false, 220 } 221} 222 223// Checks prior to mount operations to verify that the required components (binaries, etc.) 224// to mount the volume are available on the underlying node. 225// If not, it returns an error 226func (b *flockerVolumeMounter) CanMount() error { 227 return nil 228} 229 230func (b *flockerVolumeMounter) GetPath() string { 231 return getPath(b.podUID, b.volName, b.plugin.host) 232} 233 234// SetUp bind mounts the disk global mount to the volume path. 235func (b *flockerVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error { 236 return b.SetUpAt(b.GetPath(), mounterArgs) 237} 238 239// newFlockerClient uses environment variables and pod attributes to return a 240// flocker client capable of talking with the Flocker control service. 241func (p *flockerPlugin) newFlockerClient(hostIP string) (*flockerapi.Client, error) { 242 host := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_HOST", defaultHost) 243 port, err := env.GetEnvAsIntOrFallback("FLOCKER_CONTROL_SERVICE_PORT", defaultPort) 244 245 if err != nil { 246 return nil, err 247 } 248 caCertPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CA_FILE", defaultCACertFile) 249 keyPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_KEY_FILE", defaultClientKeyFile) 250 certPath := env.GetEnvAsStringOrFallback("FLOCKER_CONTROL_SERVICE_CLIENT_CERT_FILE", defaultClientCertFile) 251 252 c, err := flockerapi.NewClient(host, port, hostIP, caCertPath, keyPath, certPath) 253 return c, err 254} 255 256func (b *flockerVolumeMounter) newFlockerClient() (*flockerapi.Client, error) { 257 258 hostIP, err := b.plugin.host.GetHostIP() 259 if err != nil { 260 return nil, err 261 } 262 263 return b.plugin.newFlockerClient(hostIP.String()) 264} 265 266/* 267SetUpAt will setup a Flocker volume following this flow of calls to the Flocker 268control service: 269 2701. Get the dataset id for the given volume name/dir 2712. It should already be there, if it's not the user needs to manually create it 2723. Check the current Primary UUID 2734. If it doesn't match with the Primary UUID that we got on 2, then we will 274 need to update the Primary UUID for this volume. 2755. Wait until the Primary UUID was updated or timeout. 276*/ 277func (b *flockerVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { 278 var err error 279 if b.flockerClient == nil { 280 b.flockerClient, err = b.newFlockerClient() 281 if err != nil { 282 return err 283 } 284 } 285 286 datasetUUID, err := b.GetDatasetUUID() 287 if err != nil { 288 return fmt.Errorf("the datasetUUID for volume with datasetName='%s' can not be found using flocker: %s", b.datasetName, err) 289 } 290 291 datasetState, err := b.flockerClient.GetDatasetState(datasetUUID) 292 if err != nil { 293 return fmt.Errorf("the datasetState for volume with datasetUUID='%s' could not determinted uusing flocker: %s", datasetUUID, err) 294 } 295 296 primaryUUID, err := b.flockerClient.GetPrimaryUUID() 297 if err != nil { 298 return err 299 } 300 301 if datasetState.Primary != primaryUUID { 302 if err := b.updateDatasetPrimary(datasetUUID, primaryUUID); err != nil { 303 return err 304 } 305 _, err := b.flockerClient.GetDatasetState(datasetUUID) 306 if err != nil { 307 return fmt.Errorf("the volume with datasetUUID='%s' migrated unsuccessfully", datasetUUID) 308 } 309 } 310 311 // TODO: handle failed mounts here. 312 notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) 313 klog.V(4).Infof("flockerVolume set up: %s %v %v, datasetUUID %v readOnly %v", dir, !notMnt, err, datasetUUID, b.readOnly) 314 if err != nil && !os.IsNotExist(err) { 315 klog.Errorf("cannot validate mount point: %s %v", dir, err) 316 return err 317 } 318 if !notMnt { 319 return nil 320 } 321 322 if err := os.MkdirAll(dir, 0750); err != nil { 323 klog.Errorf("mkdir failed on disk %s (%v)", dir, err) 324 return err 325 } 326 327 // Perform a bind mount to the full path to allow duplicate mounts of the same PD. 328 options := []string{"bind"} 329 if b.readOnly { 330 options = append(options, "ro") 331 } 332 333 globalFlockerPath := makeGlobalFlockerPath(datasetUUID) 334 klog.V(4).Infof("attempting to mount %s", dir) 335 336 err = b.mounter.MountSensitiveWithoutSystemd(globalFlockerPath, dir, "", options, nil) 337 if err != nil { 338 notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) 339 if mntErr != nil { 340 klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr) 341 return err 342 } 343 if !notMnt { 344 if mntErr = b.mounter.Unmount(dir); mntErr != nil { 345 klog.Errorf("failed to unmount: %v", mntErr) 346 return err 347 } 348 notMnt, mntErr := b.mounter.IsLikelyNotMountPoint(dir) 349 if mntErr != nil { 350 klog.Errorf("isLikelyNotMountPoint check failed: %v", mntErr) 351 return err 352 } 353 if !notMnt { 354 // This is very odd, we don't expect it. We'll try again next sync loop. 355 klog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir) 356 return err 357 } 358 } 359 os.Remove(dir) 360 klog.Errorf("mount of disk %s failed: %v", dir, err) 361 return err 362 } 363 364 if !b.readOnly { 365 volume.SetVolumeOwnership(b, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil)) 366 } 367 368 klog.V(4).Infof("successfully mounted %s", dir) 369 return nil 370} 371 372// updateDatasetPrimary will update the primary in Flocker and wait for it to 373// be ready. If it never gets to ready state it will timeout and error. 374func (b *flockerVolumeMounter) updateDatasetPrimary(datasetUUID string, primaryUUID string) error { 375 // We need to update the primary and wait for it to be ready 376 _, err := b.flockerClient.UpdatePrimaryForDataset(primaryUUID, datasetUUID) 377 if err != nil { 378 return err 379 } 380 381 timeoutChan := time.NewTimer(timeoutWaitingForVolume) 382 defer timeoutChan.Stop() 383 tickChan := time.NewTicker(tickerWaitingForVolume) 384 defer tickChan.Stop() 385 386 for { 387 if s, err := b.flockerClient.GetDatasetState(datasetUUID); err == nil && s.Primary == primaryUUID { 388 return nil 389 } 390 391 select { 392 case <-timeoutChan.C: 393 return fmt.Errorf( 394 "Timed out waiting for the datasetUUID: '%s' to be moved to the primary: '%s'\n%v", 395 datasetUUID, primaryUUID, err, 396 ) 397 case <-tickChan.C: 398 } 399 } 400 401} 402 403func getVolumeSource(spec *volume.Spec) (*v1.FlockerVolumeSource, bool, error) { 404 if spec.Volume != nil && spec.Volume.Flocker != nil { 405 return spec.Volume.Flocker, spec.ReadOnly, nil 406 } else if spec.PersistentVolume != nil && 407 spec.PersistentVolume.Spec.Flocker != nil { 408 return spec.PersistentVolume.Spec.Flocker, spec.ReadOnly, nil 409 } 410 411 return nil, false, fmt.Errorf("Spec does not reference a Flocker volume type") 412} 413 414type flockerVolumeUnmounter struct { 415 *flockerVolume 416} 417 418var _ volume.Unmounter = &flockerVolumeUnmounter{} 419 420func (c *flockerVolumeUnmounter) GetPath() string { 421 return getPath(c.podUID, c.volName, c.plugin.host) 422} 423 424// Unmounts the bind mount, and detaches the disk only if the PD 425// resource was the last reference to that disk on the kubelet. 426func (c *flockerVolumeUnmounter) TearDown() error { 427 return c.TearDownAt(c.GetPath()) 428} 429 430// TearDownAt unmounts the bind mount 431func (c *flockerVolumeUnmounter) TearDownAt(dir string) error { 432 return mount.CleanupMountPoint(dir, c.mounter, false) 433} 434 435func (p *flockerPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { 436 return p.newDeleterInternal(spec, &flockerUtil{}) 437} 438 439func (p *flockerPlugin) newDeleterInternal(spec *volume.Spec, manager volumeManager) (volume.Deleter, error) { 440 if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Flocker == nil { 441 return nil, fmt.Errorf("spec.PersistentVolumeSource.Flocker is nil") 442 } 443 return &flockerVolumeDeleter{ 444 flockerVolume: &flockerVolume{ 445 volName: spec.Name(), 446 datasetName: spec.PersistentVolume.Spec.Flocker.DatasetName, 447 datasetUUID: spec.PersistentVolume.Spec.Flocker.DatasetUUID, 448 manager: manager, 449 }}, nil 450} 451 452func (p *flockerPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) { 453 return p.newProvisionerInternal(options, &flockerUtil{}) 454} 455 456func (p *flockerPlugin) newProvisionerInternal(options volume.VolumeOptions, manager volumeManager) (volume.Provisioner, error) { 457 return &flockerVolumeProvisioner{ 458 flockerVolume: &flockerVolume{ 459 manager: manager, 460 plugin: p, 461 }, 462 options: options, 463 }, nil 464} 465