1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package expand
18
19import (
20	"context"
21	"fmt"
22	"net"
23	"time"
24
25	"k8s.io/klog/v2"
26	"k8s.io/mount-utils"
27	utilexec "k8s.io/utils/exec"
28
29	authenticationv1 "k8s.io/api/authentication/v1"
30	v1 "k8s.io/api/core/v1"
31	"k8s.io/apimachinery/pkg/api/errors"
32	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33	"k8s.io/apimachinery/pkg/types"
34	"k8s.io/apimachinery/pkg/util/runtime"
35	"k8s.io/apimachinery/pkg/util/wait"
36	coreinformers "k8s.io/client-go/informers/core/v1"
37	clientset "k8s.io/client-go/kubernetes"
38	"k8s.io/client-go/kubernetes/scheme"
39	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
40	corelisters "k8s.io/client-go/listers/core/v1"
41	"k8s.io/client-go/tools/cache"
42	kcache "k8s.io/client-go/tools/cache"
43	"k8s.io/client-go/tools/record"
44	"k8s.io/client-go/util/workqueue"
45	cloudprovider "k8s.io/cloud-provider"
46	"k8s.io/kubernetes/pkg/controller/volume/events"
47	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
48	"k8s.io/kubernetes/pkg/volume"
49	"k8s.io/kubernetes/pkg/volume/csimigration"
50	"k8s.io/kubernetes/pkg/volume/util"
51	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
52	"k8s.io/kubernetes/pkg/volume/util/subpath"
53	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
54)
55
56const (
57	// number of default volume expansion workers
58	defaultWorkerCount = 10
59)
60
61// ExpandController expands the pvs
62type ExpandController interface {
63	Run(stopCh <-chan struct{})
64}
65
66// CSINameTranslator can get the CSI Driver name based on the in-tree plugin name
67type CSINameTranslator interface {
68	GetCSINameFromInTreeName(pluginName string) (string, error)
69}
70
71type expandController struct {
72	// kubeClient is the kube API client used by volumehost to communicate with
73	// the API server.
74	kubeClient clientset.Interface
75
76	// pvcLister is the shared PVC lister used to fetch and store PVC
77	// objects from the API server. It is shared with other controllers and
78	// therefore the PVC objects in its store should be treated as immutable.
79	pvcLister  corelisters.PersistentVolumeClaimLister
80	pvcsSynced kcache.InformerSynced
81
82	pvLister corelisters.PersistentVolumeLister
83	pvSynced kcache.InformerSynced
84
85	// cloud provider used by volume host
86	cloud cloudprovider.Interface
87
88	// volumePluginMgr used to initialize and fetch volume plugins
89	volumePluginMgr volume.VolumePluginMgr
90
91	// recorder is used to record events in the API server
92	recorder record.EventRecorder
93
94	operationGenerator operationexecutor.OperationGenerator
95
96	queue workqueue.RateLimitingInterface
97
98	translator CSINameTranslator
99
100	csiMigratedPluginManager csimigration.PluginManager
101
102	filteredDialOptions *proxyutil.FilteredDialOptions
103}
104
105// NewExpandController expands the pvs
106func NewExpandController(
107	kubeClient clientset.Interface,
108	pvcInformer coreinformers.PersistentVolumeClaimInformer,
109	pvInformer coreinformers.PersistentVolumeInformer,
110	cloud cloudprovider.Interface,
111	plugins []volume.VolumePlugin,
112	translator CSINameTranslator,
113	csiMigratedPluginManager csimigration.PluginManager,
114	filteredDialOptions *proxyutil.FilteredDialOptions) (ExpandController, error) {
115
116	expc := &expandController{
117		kubeClient:               kubeClient,
118		cloud:                    cloud,
119		pvcLister:                pvcInformer.Lister(),
120		pvcsSynced:               pvcInformer.Informer().HasSynced,
121		pvLister:                 pvInformer.Lister(),
122		pvSynced:                 pvInformer.Informer().HasSynced,
123		queue:                    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "volume_expand"),
124		translator:               translator,
125		csiMigratedPluginManager: csiMigratedPluginManager,
126		filteredDialOptions:      filteredDialOptions,
127	}
128
129	if err := expc.volumePluginMgr.InitPlugins(plugins, nil, expc); err != nil {
130		return nil, fmt.Errorf("could not initialize volume plugins for Expand Controller : %+v", err)
131	}
132
133	eventBroadcaster := record.NewBroadcaster()
134	eventBroadcaster.StartStructuredLogging(0)
135	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
136	expc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
137	blkutil := volumepathhandler.NewBlockVolumePathHandler()
138
139	expc.operationGenerator = operationexecutor.NewOperationGenerator(
140		kubeClient,
141		&expc.volumePluginMgr,
142		expc.recorder,
143		false,
144		blkutil)
145
146	pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
147		AddFunc: expc.enqueuePVC,
148		UpdateFunc: func(old, new interface{}) {
149			oldPVC, ok := old.(*v1.PersistentVolumeClaim)
150			if !ok {
151				return
152			}
153
154			oldReq := oldPVC.Spec.Resources.Requests[v1.ResourceStorage]
155			oldCap := oldPVC.Status.Capacity[v1.ResourceStorage]
156			newPVC, ok := new.(*v1.PersistentVolumeClaim)
157			if !ok {
158				return
159			}
160			newReq := newPVC.Spec.Resources.Requests[v1.ResourceStorage]
161			newCap := newPVC.Status.Capacity[v1.ResourceStorage]
162			// PVC will be enqueued under 2 circumstances
163			// 1. User has increased PVC's request capacity --> volume needs to be expanded
164			// 2. PVC status capacity has been expanded --> claim's bound PV has likely recently gone through filesystem resize, so remove AnnPreResizeCapacity annotation from PV
165			if newReq.Cmp(oldReq) > 0 || newCap.Cmp(oldCap) > 0 {
166				expc.enqueuePVC(new)
167			}
168		},
169		DeleteFunc: expc.enqueuePVC,
170	})
171
172	return expc, nil
173}
174
175func (expc *expandController) enqueuePVC(obj interface{}) {
176	pvc, ok := obj.(*v1.PersistentVolumeClaim)
177	if !ok {
178		return
179	}
180
181	if pvc.Status.Phase == v1.ClaimBound {
182		key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
183		if err != nil {
184			runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pvc, err))
185			return
186		}
187		expc.queue.Add(key)
188	}
189}
190
191func (expc *expandController) processNextWorkItem() bool {
192	key, shutdown := expc.queue.Get()
193	if shutdown {
194		return false
195	}
196	defer expc.queue.Done(key)
197
198	err := expc.syncHandler(key.(string))
199	if err == nil {
200		expc.queue.Forget(key)
201		return true
202	}
203
204	runtime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
205	expc.queue.AddRateLimited(key)
206
207	return true
208}
209
210// syncHandler performs actual expansion of volume. If an error is returned
211// from this function - PVC will be requeued for resizing.
212func (expc *expandController) syncHandler(key string) error {
213	namespace, name, err := kcache.SplitMetaNamespaceKey(key)
214	if err != nil {
215		return err
216	}
217	pvc, err := expc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
218	if errors.IsNotFound(err) {
219		return nil
220	}
221	if err != nil {
222		klog.V(5).Infof("Error getting PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
223		return err
224	}
225
226	pv, err := expc.getPersistentVolume(pvc)
227	if err != nil {
228		klog.V(5).Infof("Error getting Persistent Volume for PVC %q (uid: %q) from informer : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, err)
229		return err
230	}
231
232	if pv.Spec.ClaimRef == nil || pvc.Namespace != pv.Spec.ClaimRef.Namespace || pvc.UID != pv.Spec.ClaimRef.UID {
233		err := fmt.Errorf("persistent Volume is not bound to PVC being updated : %s", util.ClaimToClaimKey(pvc))
234		klog.V(4).Infof("%v", err)
235		return err
236	}
237
238	pvcRequestSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
239	pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
240
241	// call expand operation only under two condition
242	// 1. pvc's request size has been expanded and is larger than pvc's current status size
243	// 2. pv has an pre-resize capacity annotation
244	if pvcRequestSize.Cmp(pvcStatusSize) <= 0 && !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
245		return nil
246	}
247
248	volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
249	migratable, err := expc.csiMigratedPluginManager.IsMigratable(volumeSpec)
250	if err != nil {
251		klog.V(4).Infof("failed to check CSI migration status for PVC: %s with error: %v", util.ClaimToClaimKey(pvc), err)
252		return nil
253	}
254	// handle CSI migration scenarios before invoking FindExpandablePluginBySpec for in-tree
255	if migratable {
256		inTreePluginName, err := expc.csiMigratedPluginManager.GetInTreePluginNameFromSpec(volumeSpec.PersistentVolume, volumeSpec.Volume)
257		if err != nil {
258			klog.V(4).Infof("Error getting in-tree plugin name from persistent volume %s: %v", volumeSpec.PersistentVolume.Name, err)
259			return err
260		}
261
262		msg := fmt.Sprintf("CSI migration enabled for %s; waiting for external resizer to expand the pvc", inTreePluginName)
263		expc.recorder.Event(pvc, v1.EventTypeNormal, events.ExternalExpanding, msg)
264		csiResizerName, err := expc.translator.GetCSINameFromInTreeName(inTreePluginName)
265		if err != nil {
266			errorMsg := fmt.Sprintf("error getting CSI driver name for pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
267			expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
268			return fmt.Errorf(errorMsg)
269		}
270
271		pvc, err := util.SetClaimResizer(pvc, csiResizerName, expc.kubeClient)
272		if err != nil {
273			errorMsg := fmt.Sprintf("error setting resizer annotation to pvc %s, with error %v", util.ClaimToClaimKey(pvc), err)
274			expc.recorder.Event(pvc, v1.EventTypeWarning, events.ExternalExpanding, errorMsg)
275			return fmt.Errorf(errorMsg)
276		}
277		return nil
278	}
279
280	volumePlugin, err := expc.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
281	if err != nil || volumePlugin == nil {
282		msg := fmt.Errorf("didn't find a plugin capable of expanding the volume; " +
283			"waiting for an external controller to process this PVC")
284		eventType := v1.EventTypeNormal
285		if err != nil {
286			eventType = v1.EventTypeWarning
287		}
288		expc.recorder.Event(pvc, eventType, events.ExternalExpanding, fmt.Sprintf("Ignoring the PVC: %v.", msg))
289		klog.Infof("Ignoring the PVC %q (uid: %q) : %v.", util.GetPersistentVolumeClaimQualifiedName(pvc), pvc.UID, msg)
290		// If we are expecting that an external plugin will handle resizing this volume then
291		// is no point in requeuing this PVC.
292		return nil
293	}
294
295	volumeResizerName := volumePlugin.GetPluginName()
296	return expc.expand(pvc, pv, volumeResizerName)
297}
298
299func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) error {
300	// if node expand is complete and pv's annotation can be removed, remove the annotation from pv and return
301	if expc.isNodeExpandComplete(pvc, pv) && metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
302		return util.DeleteAnnPreResizeCapacity(pv, expc.GetKubeClient())
303	}
304
305	pvc, err := util.MarkResizeInProgressWithResizer(pvc, resizerName, expc.kubeClient)
306	if err != nil {
307		klog.V(5).Infof("Error setting PVC %s in progress with error : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
308		return err
309	}
310
311	generatedOperations, err := expc.operationGenerator.GenerateExpandVolumeFunc(pvc, pv)
312	if err != nil {
313		klog.Errorf("Error starting ExpandVolume for pvc %s with %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
314		return err
315	}
316	klog.V(5).Infof("Starting ExpandVolume for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
317	_, detailedErr := generatedOperations.Run()
318
319	return detailedErr
320}
321
322// TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines
323func (expc *expandController) Run(stopCh <-chan struct{}) {
324	defer runtime.HandleCrash()
325	defer expc.queue.ShutDown()
326
327	klog.Infof("Starting expand controller")
328	defer klog.Infof("Shutting down expand controller")
329
330	if !cache.WaitForNamedCacheSync("expand", stopCh, expc.pvcsSynced, expc.pvSynced) {
331		return
332	}
333
334	for i := 0; i < defaultWorkerCount; i++ {
335		go wait.Until(expc.runWorker, time.Second, stopCh)
336	}
337
338	<-stopCh
339}
340
341func (expc *expandController) runWorker() {
342	for expc.processNextWorkItem() {
343	}
344}
345
346func (expc *expandController) getPersistentVolume(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
347	volumeName := pvc.Spec.VolumeName
348	pv, err := expc.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), volumeName, metav1.GetOptions{})
349
350	if err != nil {
351		return nil, fmt.Errorf("failed to get PV %q: %v", volumeName, err)
352	}
353
354	return pv.DeepCopy(), nil
355}
356
357// isNodeExpandComplete returns true if  pvc.Status.Capacity >= pv.Spec.Capacity
358func (expc *expandController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
359	klog.V(4).Infof("pv %q capacity = %v, pvc %s capacity = %v", pv.Name, pv.Spec.Capacity[v1.ResourceStorage], pvc.ObjectMeta.Name, pvc.Status.Capacity[v1.ResourceStorage])
360	pvcCap, pvCap := pvc.Status.Capacity[v1.ResourceStorage], pv.Spec.Capacity[v1.ResourceStorage]
361	return pvcCap.Cmp(pvCap) >= 0
362}
363
364// Implementing VolumeHost interface
365func (expc *expandController) GetPluginDir(pluginName string) string {
366	return ""
367}
368
369func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
370	return ""
371}
372
373func (expc *expandController) GetPodsDir() string {
374	return ""
375}
376
377func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
378	return ""
379}
380
381func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
382	return ""
383}
384
385func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
386	return ""
387}
388
389func (expc *expandController) GetKubeClient() clientset.Interface {
390	return expc.kubeClient
391}
392
393func (expc *expandController) NewWrapperMounter(volName string, spec volume.Spec, pod *v1.Pod, opts volume.VolumeOptions) (volume.Mounter, error) {
394	return nil, fmt.Errorf("NewWrapperMounter not supported by expand controller's VolumeHost implementation")
395}
396
397func (expc *expandController) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
398	return nil, fmt.Errorf("NewWrapperUnmounter not supported by expand controller's VolumeHost implementation")
399}
400
401func (expc *expandController) GetCloudProvider() cloudprovider.Interface {
402	return expc.cloud
403}
404
405func (expc *expandController) GetMounter(pluginName string) mount.Interface {
406	return nil
407}
408
409func (expc *expandController) GetExec(pluginName string) utilexec.Interface {
410	return utilexec.New()
411}
412
413func (expc *expandController) GetHostName() string {
414	return ""
415}
416
417func (expc *expandController) GetHostIP() (net.IP, error) {
418	return nil, fmt.Errorf("GetHostIP not supported by expand controller's VolumeHost implementation")
419}
420
421func (expc *expandController) GetNodeAllocatable() (v1.ResourceList, error) {
422	return v1.ResourceList{}, nil
423}
424
425func (expc *expandController) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
426	return func(_, _ string) (*v1.Secret, error) {
427		return nil, fmt.Errorf("GetSecret unsupported in expandController")
428	}
429}
430
431func (expc *expandController) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
432	return func(_, _ string) (*v1.ConfigMap, error) {
433		return nil, fmt.Errorf("GetConfigMap unsupported in expandController")
434	}
435}
436
437func (expc *expandController) GetServiceAccountTokenFunc() func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
438	return func(_, _ string, _ *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
439		return nil, fmt.Errorf("GetServiceAccountToken unsupported in expandController")
440	}
441}
442
443func (expc *expandController) DeleteServiceAccountTokenFunc() func(types.UID) {
444	return func(types.UID) {
445		klog.Errorf("DeleteServiceAccountToken unsupported in expandController")
446	}
447}
448
449func (expc *expandController) GetNodeLabels() (map[string]string, error) {
450	return nil, fmt.Errorf("GetNodeLabels unsupported in expandController")
451}
452
453func (expc *expandController) GetNodeName() types.NodeName {
454	return ""
455}
456
457func (expc *expandController) GetEventRecorder() record.EventRecorder {
458	return expc.recorder
459}
460
461func (expc *expandController) GetSubpather() subpath.Interface {
462	// not needed for expand controller
463	return nil
464}
465
466func (expc *expandController) GetFilteredDialOptions() *proxyutil.FilteredDialOptions {
467	return expc.filteredDialOptions
468}
469