1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package expand 18 19import ( 20 "context" 21 "fmt" 22 "net" 23 "time" 24 25 "k8s.io/klog/v2" 26 "k8s.io/mount-utils" 27 utilexec "k8s.io/utils/exec" 28 29 authenticationv1 "k8s.io/api/authentication/v1" 30 v1 "k8s.io/api/core/v1" 31 "k8s.io/apimachinery/pkg/api/errors" 32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 33 "k8s.io/apimachinery/pkg/types" 34 "k8s.io/apimachinery/pkg/util/runtime" 35 "k8s.io/apimachinery/pkg/util/wait" 36 coreinformers "k8s.io/client-go/informers/core/v1" 37 clientset "k8s.io/client-go/kubernetes" 38 "k8s.io/client-go/kubernetes/scheme" 39 v1core "k8s.io/client-go/kubernetes/typed/core/v1" 40 corelisters "k8s.io/client-go/listers/core/v1" 41 "k8s.io/client-go/tools/cache" 42 kcache "k8s.io/client-go/tools/cache" 43 "k8s.io/client-go/tools/record" 44 "k8s.io/client-go/util/workqueue" 45 cloudprovider "k8s.io/cloud-provider" 46 "k8s.io/kubernetes/pkg/controller/volume/events" 47 proxyutil "k8s.io/kubernetes/pkg/proxy/util" 48 "k8s.io/kubernetes/pkg/volume" 49 "k8s.io/kubernetes/pkg/volume/csimigration" 50 "k8s.io/kubernetes/pkg/volume/util" 51 "k8s.io/kubernetes/pkg/volume/util/operationexecutor" 52 "k8s.io/kubernetes/pkg/volume/util/subpath" 53 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" 54) 55 56const ( 57 // number of default volume expansion workers 58 defaultWorkerCount = 10 59) 60 61// ExpandController expands the pvs 62type ExpandController interface { 63 Run(stopCh <-chan struct{}) 64} 65 66// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name 67type CSINameTranslator interface { 68 GetCSINameFromInTreeName(pluginName string) (string, error) 69} 70 71type expandController struct { 72 // kubeClient is the kube API client used by volumehost to communicate with 73 // the API server. 74 kubeClient clientset.Interface 75 76 // pvcLister is the shared PVC lister used to fetch and store PVC 77 // objects from the API server. It is shared with other controllers and 78 // therefore the PVC objects in its store should be treated as immutable. 79 pvcLister corelisters.PersistentVolumeClaimLister 80 pvcsSynced kcache.InformerSynced 81 82 pvLister corelisters.PersistentVolumeLister 83 pvSynced kcache.InformerSynced 84 85 // cloud provider used by volume host 86 cloud cloudprovider.Interface 87 88 // volumePluginMgr used to initialize and fetch volume plugins 89 volumePluginMgr volume.VolumePluginMgr 90 91 // recorder is used to record events in the API server 92 recorder record.EventRecorder 93 94 operationGenerator operationexecutor.OperationGenerator 95 96 queue workqueue.RateLimitingInterface 97 98 translator CSINameTranslator 99 100 csiMigratedPluginManager csimigration.PluginManager 101 102 filteredDialOptions *proxyutil.FilteredDialOptions 103} 104 105// NewExpandController expands the pvs 106func NewExpandController( 107 kubeClient clientset.Interface, 108 pvcInformer coreinformers.PersistentVolumeClaimInformer, 109 pvInformer coreinformers.PersistentVolumeInformer, 110 cloud cloudprovider.Interface, 111 plugins []volume.VolumePlugin, 112 translator CSINameTranslator, 113 csiMigratedPluginManager csimigration.PluginManager, 114 filteredDialOptions *proxyutil.FilteredDialOptions) (ExpandController, error) { 115 116 expc := &expandController{ 117 kubeClient: kubeClient, 118 cloud: cloud, 119 pvcLister: pvcInformer.Lister(), 120 pvcsSynced: pvcInformer.Informer().HasSynced, 121 pvLister: pvInformer.Lister(), 122 pvSynced: pvInformer.Informer().HasSynced, 123 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"), 124 translator: translator, 125 csiMigratedPluginManager: csiMigratedPluginManager, 126 filteredDialOptions: filteredDialOptions, 127 } 128 129 if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil { 130 return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err) 131 } 132 133 eventBroadcaster := record.NewBroadcaster() 134 eventBroadcaster.StartStructuredLogging(0) 135 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) 136 expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"}) 137 blkutil := volumepathhandler.NewBlockVolumePathHandler() 138 139 expc.operationGenerator = operationexecutor.NewOperationGenerator( 140 kubeClient, 141 &expc.volumePluginMgr, 142 expc.recorder, 143 false, 144 blkutil) 145 146 pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ 147 AddFunc: expc.enqueuePVC, 148 UpdateFunc: func(old, new interface{}) { 149 oldPVC, ok := old.(*v1.PersistentVolumeClaim) 150 if !ok { 151 return 152 } 153 154 oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage] 155 oldCap := oldPVC.Status.Capacity[v1.ResourceStorage] 156 newPVC, ok := new.(*v1.PersistentVolumeClaim) 157 if !ok { 158 return 159 } 160 newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage] 161 newCap := newPVC.Status.Capacity[v1.ResourceStorage] 162 // PVC will be enqueued under 2 circumstances 163 // 1. User has increased PVC's request capacity --> volume needs to be expanded 164 // 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV 165 if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 { 166 expc.enqueuePVC(new) 167 } 168 }, 169 DeleteFunc: expc.enqueuePVC, 170 }) 171 172 return expc, nil 173} 174 175func (expc *expandController) enqueuePVC(obj interface{}) { 176 pvc, ok := obj.(*v1.PersistentVolumeClaim) 177 if !ok { 178 return 179 } 180 181 if pvc.Status.Phase == v1.ClaimBound { 182 key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc) 183 if err != nil { 184 runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err)) 185 return 186 } 187 expc.queue.Add(key) 188 } 189} 190 191func (expc *expandController) processNextWorkItem() bool { 192 key, shutdown := expc.queue.Get() 193 if shutdown { 194 return false 195 } 196 defer expc.queue.Done(key) 197 198 err := expc.syncHandler(key.(string)) 199 if err == nil { 200 expc.queue.Forget(key) 201 return true 202 } 203 204 runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) 205 expc.queue.AddRateLimited(key) 206 207 return true 208} 209 210// syncHandler performs actual expansion of volume. If an error is returned 211// from this function - PVC will be requeued for resizing. 212func (expc *expandController) syncHandler(key string) error { 213 namespace, name, err := kcache.SplitMetaNamespaceKey(key) 214 if err != nil { 215 return err 216 } 217 pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name) 218 if errors.IsNotFound(err) { 219 return nil 220 } 221 if err != nil { 222 klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) 223 return err 224 } 225 226 pv, err := expc.getPersistentVolume(pvc) 227 if err != nil { 228 klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err) 229 return err 230 } 231 232 if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID { 233 err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc)) 234 klog.V(4).Infof("%v", err) 235 return err 236 } 237 238 pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage] 239 pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage] 240 241 // call expand operation only under two condition 242 // 1. pvc's request size has been expanded and is larger than pvc's current status size 243 // 2. pv has an pre-resize capacity annotation 244 if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) { 245 return nil 246 } 247 248 volumeSpec := volume.NewSpecFromPersistentVolume(pv, false) 249 migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec) 250 if err != nil { 251 klog.V(4).Infof("failed to check CSI migration status for PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err) 252 return nil 253 } 254 // handle CSI migration scenarios before invoking FindExpandablePluginBySpec for in-tree 255 if migratable { 256 inTreePluginName, err := expc.csiMigratedPluginManager.GetInTreePluginNameFromSpec(volumeSpec.PersistentVolume, volumeSpec.Volume) 257 if err != nil { 258 klog.V(4).Infof("Error getting in-tree plugin name from persistent volume %s: %v", volumeSpec.PersistentVolume.Name, err) 259 return err 260 } 261 262 msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", inTreePluginName) 263 expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg) 264 csiResizerName, err := expc.translator.GetCSINameFromInTreeName(inTreePluginName) 265 if err != nil { 266 errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) 267 expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) 268 return fmt.Errorf(errorMsg) 269 } 270 271 pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient) 272 if err != nil { 273 errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err) 274 expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg) 275 return fmt.Errorf(errorMsg) 276 } 277 return nil 278 } 279 280 volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec) 281 if err != nil || volumePlugin == nil { 282 msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " + 283 "waiting for an external controller to process this PVC") 284 eventType := v1.EventTypeNormal 285 if err != nil { 286 eventType = v1.EventTypeWarning 287 } 288 expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg)) 289 klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg) 290 // If we are expecting that an external plugin will handle resizing this volume then 291 // is no point in requeuing this PVC. 292 return nil 293 } 294 295 volumeResizerName := volumePlugin.GetPluginName() 296 return expc.expand(pvc, pv, volumeResizerName) 297} 298 299func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error { 300 // if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return 301 if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) { 302 return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient()) 303 } 304 305 pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient) 306 if err != nil { 307 klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) 308 return err 309 } 310 311 generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv) 312 if err != nil { 313 klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err) 314 return err 315 } 316 klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc)) 317 _, detailedErr := generatedOperations.Run() 318 319 return detailedErr 320} 321 322// TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines 323func (expc *expandController) Run(stopCh <-chan struct{}) { 324 defer runtime.HandleCrash() 325 defer expc.queue.ShutDown() 326 327 klog.Infof("Starting expand controller") 328 defer klog.Infof("Shutting down expand controller") 329 330 if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) { 331 return 332 } 333 334 for i := 0; i < defaultWorkerCount; i++ { 335 go wait.Until(expc.runWorker, time.Second, stopCh) 336 } 337 338 <-stopCh 339} 340 341func (expc *expandController) runWorker() { 342 for expc.processNextWorkItem() { 343 } 344} 345 346func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) { 347 volumeName := pvc.Spec.VolumeName 348 pv, err := expc.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), volumeName, metav1.GetOptions{}) 349 350 if err != nil { 351 return nil, fmt.Errorf("failed to get PV %q: %v", volumeName, err) 352 } 353 354 return pv.DeepCopy(), nil 355} 356 357// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity 358func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { 359 klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage]) 360 pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage] 361 return pvcCap.Cmp(pvCap) >= 0 362} 363 364// Implementing VolumeHost interface 365func (expc *expandController) GetPluginDir(pluginName string) string { 366 return "" 367} 368 369func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string { 370 return "" 371} 372 373func (expc *expandController) GetPodsDir() string { 374 return "" 375} 376 377func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string { 378 return "" 379} 380 381func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string { 382 return "" 383} 384 385func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string { 386 return "" 387} 388 389func (expc *expandController) GetKubeClient() clientset.Interface { 390 return expc.kubeClient 391} 392 393func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) { 394 return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation") 395} 396 397func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) { 398 return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation") 399} 400 401func (expc *expandController) GetCloudProvider() cloudprovider.Interface { 402 return expc.cloud 403} 404 405func (expc *expandController) GetMounter(pluginName string) mount.Interface { 406 return nil 407} 408 409func (expc *expandController) GetExec(pluginName string) utilexec.Interface { 410 return utilexec.New() 411} 412 413func (expc *expandController) GetHostName() string { 414 return "" 415} 416 417func (expc *expandController) GetHostIP() (net.IP, error) { 418 return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation") 419} 420 421func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) { 422 return v1.ResourceList{}, nil 423} 424 425func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) { 426 return func(_, _ string) (*v1.Secret, error) { 427 return nil, fmt.Errorf("GetSecret unsupported in expandController") 428 } 429} 430 431func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) { 432 return func(_, _ string) (*v1.ConfigMap, error) { 433 return nil, fmt.Errorf("GetConfigMap unsupported in expandController") 434 } 435} 436 437func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { 438 return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) { 439 return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController") 440 } 441} 442 443func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) { 444 return func(types.UID) { 445 klog.Errorf("DeleteServiceAccountToken unsupported in expandController") 446 } 447} 448 449func (expc *expandController) GetNodeLabels() (map[string]string, error) { 450 return nil, fmt.Errorf("GetNodeLabels unsupported in expandController") 451} 452 453func (expc *expandController) GetNodeName() types.NodeName { 454 return "" 455} 456 457func (expc *expandController) GetEventRecorder() record.EventRecorder { 458 return expc.recorder 459} 460 461func (expc *expandController) GetSubpather() subpath.Interface { 462 // not needed for expand controller 463 return nil 464} 465 466func (expc *expandController) GetFilteredDialOptions() *proxyutil.FilteredDialOptions { 467 return expc.filteredDialOptions 468} 469