1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package disruption
18
19import (
20	"context"
21	"fmt"
22	"strings"
23	"time"
24
25	apps "k8s.io/api/apps/v1beta1"
26	v1 "k8s.io/api/core/v1"
27	"k8s.io/api/extensions/v1beta1"
28	policy "k8s.io/api/policy/v1"
29	apiequality "k8s.io/apimachinery/pkg/api/equality"
30	"k8s.io/apimachinery/pkg/api/errors"
31	apimeta "k8s.io/apimachinery/pkg/api/meta"
32	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33	"k8s.io/apimachinery/pkg/runtime/schema"
34	"k8s.io/apimachinery/pkg/types"
35	"k8s.io/apimachinery/pkg/util/intstr"
36	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37	"k8s.io/apimachinery/pkg/util/wait"
38	"k8s.io/client-go/discovery"
39	appsv1informers "k8s.io/client-go/informers/apps/v1"
40	coreinformers "k8s.io/client-go/informers/core/v1"
41	policyinformers "k8s.io/client-go/informers/policy/v1"
42	clientset "k8s.io/client-go/kubernetes"
43	"k8s.io/client-go/kubernetes/scheme"
44	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
45	appsv1listers "k8s.io/client-go/listers/apps/v1"
46	corelisters "k8s.io/client-go/listers/core/v1"
47	policylisters "k8s.io/client-go/listers/policy/v1"
48	scaleclient "k8s.io/client-go/scale"
49	"k8s.io/client-go/tools/cache"
50	"k8s.io/client-go/tools/record"
51	"k8s.io/client-go/util/workqueue"
52	pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
53	"k8s.io/klog/v2"
54	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
55	"k8s.io/kubernetes/pkg/controller"
56)
57
58// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
59// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
60// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
61// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
62// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
63// be more than enough.
64// If the controller is running on a different node it is important that the two nodes have synced
65// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
66// protection against unwanted pod disruptions.
67const (
68	DeletionTimeout = 2 * 60 * time.Second
69)
70
71type updater func(*policy.PodDisruptionBudget) error
72
73type DisruptionController struct {
74	kubeClient clientset.Interface
75	mapper     apimeta.RESTMapper
76
77	scaleNamespacer scaleclient.ScalesGetter
78	discoveryClient discovery.DiscoveryInterface
79
80	pdbLister       policylisters.PodDisruptionBudgetLister
81	pdbListerSynced cache.InformerSynced
82
83	podLister       corelisters.PodLister
84	podListerSynced cache.InformerSynced
85
86	rcLister       corelisters.ReplicationControllerLister
87	rcListerSynced cache.InformerSynced
88
89	rsLister       appsv1listers.ReplicaSetLister
90	rsListerSynced cache.InformerSynced
91
92	dLister       appsv1listers.DeploymentLister
93	dListerSynced cache.InformerSynced
94
95	ssLister       appsv1listers.StatefulSetLister
96	ssListerSynced cache.InformerSynced
97
98	// PodDisruptionBudget keys that need to be synced.
99	queue        workqueue.RateLimitingInterface
100	recheckQueue workqueue.DelayingInterface
101
102	broadcaster record.EventBroadcaster
103	recorder    record.EventRecorder
104
105	getUpdater func() updater
106}
107
108// controllerAndScale is used to return (controller, scale) pairs from the
109// controller finder functions.
110type controllerAndScale struct {
111	types.UID
112	scale int32
113}
114
115// podControllerFinder is a function type that maps a pod to a list of
116// controllers and their scale.
117type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
118
119func NewDisruptionController(
120	podInformer coreinformers.PodInformer,
121	pdbInformer policyinformers.PodDisruptionBudgetInformer,
122	rcInformer coreinformers.ReplicationControllerInformer,
123	rsInformer appsv1informers.ReplicaSetInformer,
124	dInformer appsv1informers.DeploymentInformer,
125	ssInformer appsv1informers.StatefulSetInformer,
126	kubeClient clientset.Interface,
127	restMapper apimeta.RESTMapper,
128	scaleNamespacer scaleclient.ScalesGetter,
129	discoveryClient discovery.DiscoveryInterface,
130) *DisruptionController {
131	dc := &DisruptionController{
132		kubeClient:   kubeClient,
133		queue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
134		recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"),
135		broadcaster:  record.NewBroadcaster(),
136	}
137	dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
138
139	dc.getUpdater = func() updater { return dc.writePdbStatus }
140
141	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
142		AddFunc:    dc.addPod,
143		UpdateFunc: dc.updatePod,
144		DeleteFunc: dc.deletePod,
145	})
146	dc.podLister = podInformer.Lister()
147	dc.podListerSynced = podInformer.Informer().HasSynced
148
149	pdbInformer.Informer().AddEventHandler(
150		cache.ResourceEventHandlerFuncs{
151			AddFunc:    dc.addDb,
152			UpdateFunc: dc.updateDb,
153			DeleteFunc: dc.removeDb,
154		},
155	)
156	dc.pdbLister = pdbInformer.Lister()
157	dc.pdbListerSynced = pdbInformer.Informer().HasSynced
158
159	dc.rcLister = rcInformer.Lister()
160	dc.rcListerSynced = rcInformer.Informer().HasSynced
161
162	dc.rsLister = rsInformer.Lister()
163	dc.rsListerSynced = rsInformer.Informer().HasSynced
164
165	dc.dLister = dInformer.Lister()
166	dc.dListerSynced = dInformer.Informer().HasSynced
167
168	dc.ssLister = ssInformer.Lister()
169	dc.ssListerSynced = ssInformer.Informer().HasSynced
170
171	dc.mapper = restMapper
172	dc.scaleNamespacer = scaleNamespacer
173	dc.discoveryClient = discoveryClient
174
175	return dc
176}
177
178// The workload resources do implement the scale subresource, so it would
179// be possible to only check the scale subresource here. But since there is no
180// way to take advantage of listers with scale subresources, we use the workload
181// resources directly and only fall back to the scale subresource when needed.
182func (dc *DisruptionController) finders() []podControllerFinder {
183	return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
184		dc.getPodStatefulSet, dc.getScaleController}
185}
186
187var (
188	controllerKindRS  = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
189	controllerKindSS  = apps.SchemeGroupVersion.WithKind("StatefulSet")
190	controllerKindRC  = v1.SchemeGroupVersion.WithKind("ReplicationController")
191	controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
192)
193
194// getPodReplicaSet finds a replicaset which has no matching deployments.
195func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
196	ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
197	if !ok || err != nil {
198		return nil, err
199	}
200	rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
201	if err != nil {
202		// The only possible error is NotFound, which is ok here.
203		return nil, nil
204	}
205	if rs.UID != controllerRef.UID {
206		return nil, nil
207	}
208	controllerRef = metav1.GetControllerOf(rs)
209	if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
210		// Skip RS if it's controlled by a Deployment.
211		return nil, nil
212	}
213	return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
214}
215
216// getPodStatefulSet returns the statefulset referenced by the provided controllerRef.
217func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
218	ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
219	if !ok || err != nil {
220		return nil, err
221	}
222	ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
223	if err != nil {
224		// The only possible error is NotFound, which is ok here.
225		return nil, nil
226	}
227	if ss.UID != controllerRef.UID {
228		return nil, nil
229	}
230
231	return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
232}
233
234// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
235func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
236	ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
237	if !ok || err != nil {
238		return nil, err
239	}
240	rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
241	if err != nil {
242		// The only possible error is NotFound, which is ok here.
243		return nil, nil
244	}
245	if rs.UID != controllerRef.UID {
246		return nil, nil
247	}
248	controllerRef = metav1.GetControllerOf(rs)
249	if controllerRef == nil {
250		return nil, nil
251	}
252
253	ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
254	if !ok || err != nil {
255		return nil, err
256	}
257	deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
258	if err != nil {
259		// The only possible error is NotFound, which is ok here.
260		return nil, nil
261	}
262	if deployment.UID != controllerRef.UID {
263		return nil, nil
264	}
265	return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
266}
267
268func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
269	ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
270	if !ok || err != nil {
271		return nil, err
272	}
273	rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
274	if err != nil {
275		// The only possible error is NotFound, which is ok here.
276		return nil, nil
277	}
278	if rc.UID != controllerRef.UID {
279		return nil, nil
280	}
281	return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
282}
283
284func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
285	gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
286	if err != nil {
287		return nil, err
288	}
289
290	gk := schema.GroupKind{
291		Group: gv.Group,
292		Kind:  controllerRef.Kind,
293	}
294
295	mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
296	if err != nil {
297		return nil, err
298	}
299	gr := mapping.Resource.GroupResource()
300
301	scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{})
302	if err != nil {
303		if errors.IsNotFound(err) {
304			// The IsNotFound error can mean either that the resource does not exist,
305			// or it exist but doesn't implement the scale subresource. We check which
306			// situation we are facing so we can give an appropriate error message.
307			isScale, err := dc.implementsScale(gv, controllerRef.Kind)
308			if err != nil {
309				return nil, err
310			}
311			if !isScale {
312				return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String())
313			}
314			return nil, nil
315		}
316		return nil, err
317	}
318	if scale.UID != controllerRef.UID {
319		return nil, nil
320	}
321	return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
322}
323
324func (dc *DisruptionController) implementsScale(gv schema.GroupVersion, kind string) (bool, error) {
325	resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gv.String())
326	if err != nil {
327		return false, err
328	}
329	for _, resource := range resourceList.APIResources {
330		if resource.Kind != kind {
331			continue
332		}
333		if strings.HasSuffix(resource.Name, "/scale") {
334			return true, nil
335		}
336	}
337	return false, nil
338}
339
340func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
341	gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
342	if err != nil {
343		return false, err
344	}
345
346	if controllerRef.Kind != expectedKind {
347		return false, nil
348	}
349
350	for _, group := range expectedGroups {
351		if group == gv.Group {
352			return true, nil
353		}
354	}
355
356	return false, nil
357}
358
359func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
360	defer utilruntime.HandleCrash()
361	defer dc.queue.ShutDown()
362
363	klog.Infof("Starting disruption controller")
364	defer klog.Infof("Shutting down disruption controller")
365
366	if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
367		return
368	}
369
370	if dc.kubeClient != nil {
371		klog.Infof("Sending events to api server.")
372		dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
373	} else {
374		klog.Infof("No api server defined - no events will be sent to API server.")
375	}
376	go wait.Until(dc.worker, time.Second, stopCh)
377	go wait.Until(dc.recheckWorker, time.Second, stopCh)
378
379	<-stopCh
380}
381
382func (dc *DisruptionController) addDb(obj interface{}) {
383	pdb := obj.(*policy.PodDisruptionBudget)
384	klog.V(4).Infof("add DB %q", pdb.Name)
385	dc.enqueuePdb(pdb)
386}
387
388func (dc *DisruptionController) updateDb(old, cur interface{}) {
389	// TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
390	pdb := cur.(*policy.PodDisruptionBudget)
391	klog.V(4).Infof("update DB %q", pdb.Name)
392	dc.enqueuePdb(pdb)
393}
394
395func (dc *DisruptionController) removeDb(obj interface{}) {
396	pdb, ok := obj.(*policy.PodDisruptionBudget)
397	if !ok {
398		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
399		if !ok {
400			klog.Errorf("Couldn't get object from tombstone %+v", obj)
401			return
402		}
403		pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
404		if !ok {
405			klog.Errorf("Tombstone contained object that is not a pdb %+v", obj)
406			return
407		}
408	}
409	klog.V(4).Infof("remove DB %q", pdb.Name)
410	dc.enqueuePdb(pdb)
411}
412
413func (dc *DisruptionController) addPod(obj interface{}) {
414	pod := obj.(*v1.Pod)
415	klog.V(4).Infof("addPod called on pod %q", pod.Name)
416	pdb := dc.getPdbForPod(pod)
417	if pdb == nil {
418		klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
419		return
420	}
421	klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
422	dc.enqueuePdb(pdb)
423}
424
425func (dc *DisruptionController) updatePod(old, cur interface{}) {
426	pod := cur.(*v1.Pod)
427	klog.V(4).Infof("updatePod called on pod %q", pod.Name)
428	pdb := dc.getPdbForPod(pod)
429	if pdb == nil {
430		klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
431		return
432	}
433	klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
434	dc.enqueuePdb(pdb)
435}
436
437func (dc *DisruptionController) deletePod(obj interface{}) {
438	pod, ok := obj.(*v1.Pod)
439	// When a delete is dropped, the relist will notice a pod in the store not
440	// in the list, leading to the insertion of a tombstone object which contains
441	// the deleted key/value. Note that this value might be stale. If the pod
442	// changed labels the new ReplicaSet will not be woken up till the periodic
443	// resync.
444	if !ok {
445		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
446		if !ok {
447			klog.Errorf("Couldn't get object from tombstone %+v", obj)
448			return
449		}
450		pod, ok = tombstone.Obj.(*v1.Pod)
451		if !ok {
452			klog.Errorf("Tombstone contained object that is not a pod %+v", obj)
453			return
454		}
455	}
456	klog.V(4).Infof("deletePod called on pod %q", pod.Name)
457	pdb := dc.getPdbForPod(pod)
458	if pdb == nil {
459		klog.V(4).Infof("No matching pdb for pod %q", pod.Name)
460		return
461	}
462	klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
463	dc.enqueuePdb(pdb)
464}
465
466func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
467	key, err := controller.KeyFunc(pdb)
468	if err != nil {
469		klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
470		return
471	}
472	dc.queue.Add(key)
473}
474
475func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
476	key, err := controller.KeyFunc(pdb)
477	if err != nil {
478		klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err)
479		return
480	}
481	dc.recheckQueue.AddAfter(key, delay)
482}
483
484func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget {
485	// GetPodPodDisruptionBudgets returns an error only if no
486	// PodDisruptionBudgets are found.  We don't return that as an error to the
487	// caller.
488	pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
489	if err != nil {
490		klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
491		return nil
492	}
493
494	if len(pdbs) > 1 {
495		msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets.  Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
496		klog.Warning(msg)
497		dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
498	}
499	return pdbs[0]
500}
501
502// This function returns pods using the PodDisruptionBudget object.
503// IMPORTANT NOTE : the returned pods should NOT be modified.
504func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
505	sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
506	if err != nil {
507		return []*v1.Pod{}, err
508	}
509	pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
510	if err != nil {
511		return []*v1.Pod{}, err
512	}
513	return pods, nil
514}
515
516func (dc *DisruptionController) worker() {
517	for dc.processNextWorkItem() {
518	}
519}
520
521func (dc *DisruptionController) processNextWorkItem() bool {
522	dKey, quit := dc.queue.Get()
523	if quit {
524		return false
525	}
526	defer dc.queue.Done(dKey)
527
528	err := dc.sync(dKey.(string))
529	if err == nil {
530		dc.queue.Forget(dKey)
531		return true
532	}
533
534	utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
535	dc.queue.AddRateLimited(dKey)
536
537	return true
538}
539
540func (dc *DisruptionController) recheckWorker() {
541	for dc.processNextRecheckWorkItem() {
542	}
543}
544
545func (dc *DisruptionController) processNextRecheckWorkItem() bool {
546	dKey, quit := dc.recheckQueue.Get()
547	if quit {
548		return false
549	}
550	defer dc.recheckQueue.Done(dKey)
551	dc.queue.AddRateLimited(dKey)
552	return true
553}
554
555func (dc *DisruptionController) sync(key string) error {
556	startTime := time.Now()
557	defer func() {
558		klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime))
559	}()
560
561	namespace, name, err := cache.SplitMetaNamespaceKey(key)
562	if err != nil {
563		return err
564	}
565	pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
566	if errors.IsNotFound(err) {
567		klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
568		return nil
569	}
570	if err != nil {
571		return err
572	}
573
574	err = dc.trySync(pdb)
575	// If the reason for failure was a conflict, then allow this PDB update to be
576	// requeued without triggering the failSafe logic.
577	if errors.IsConflict(err) {
578		return err
579	}
580	if err != nil {
581		klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err)
582		return dc.failSafe(pdb, err)
583	}
584
585	return nil
586}
587
588func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
589	pods, err := dc.getPodsForPdb(pdb)
590	if err != nil {
591		dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
592		return err
593	}
594	if len(pods) == 0 {
595		dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
596	}
597
598	expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(pdb, pods)
599	if err != nil {
600		dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
601		return err
602	}
603	// We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue.
604	if len(unmanagedPods) > 0 {
605		klog.V(4).Infof("found unmanaged pods associated with this PDB: %v",
606			strings.Join(unmanagedPods, ",'"))
607	}
608
609	currentTime := time.Now()
610	disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime)
611	currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
612	err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
613
614	if err == nil && recheckTime != nil {
615		// There is always at most one PDB waiting with a particular name in the queue,
616		// and each PDB in the queue is associated with the lowest timestamp
617		// that was supplied when a PDB with that name was added.
618		dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
619	}
620	return err
621}
622
623func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
624	err = nil
625	// TODO(davidopp): consider making the way expectedCount and rules about
626	// permitted controller configurations (specifically, considering it an error
627	// if a pod covered by a PDB has 0 controllers or > 1 controller) should be
628	// handled the same way for integer and percentage minAvailable
629
630	if pdb.Spec.MaxUnavailable != nil {
631		expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods)
632		if err != nil {
633			return
634		}
635		var maxUnavailable int
636		maxUnavailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true)
637		if err != nil {
638			return
639		}
640		desiredHealthy = expectedCount - int32(maxUnavailable)
641		if desiredHealthy < 0 {
642			desiredHealthy = 0
643		}
644	} else if pdb.Spec.MinAvailable != nil {
645		if pdb.Spec.MinAvailable.Type == intstr.Int {
646			desiredHealthy = pdb.Spec.MinAvailable.IntVal
647			expectedCount = int32(len(pods))
648		} else if pdb.Spec.MinAvailable.Type == intstr.String {
649			expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods)
650			if err != nil {
651				return
652			}
653
654			var minAvailable int
655			minAvailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true)
656			if err != nil {
657				return
658			}
659			desiredHealthy = int32(minAvailable)
660		}
661	}
662	return
663}
664
665func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) {
666	// When the user specifies a fraction of pods that must be available, we
667	// use as the fraction's denominator
668	// SUM_{all c in C} scale(c)
669	// where C is the union of C_p1, C_p2, ..., C_pN
670	// and each C_pi is the set of controllers controlling the pod pi
671
672	// k8s only defines what will happens when 0 or 1 controllers control a
673	// given pod.  We explicitly exclude the 0 controllers case here, and we
674	// report an error if we find a pod with more than 1 controller.  Thus in
675	// practice each C_pi is a set of exactly 1 controller.
676
677	// A mapping from controllers to their scale.
678	controllerScale := map[types.UID]int32{}
679
680	// 1. Find the controller for each pod.
681
682	// As of now, we allow PDBs to be applied to pods via selectors, so there
683	// can be unmanaged pods(pods that don't have backing controllers) but still have PDBs associated.
684	// Such pods are to be collected and PDB backing them should be enqueued instead of immediately throwing
685	// a sync error. This ensures disruption controller is not frequently updating the status subresource and thus
686	// preventing excessive and expensive writes to etcd.
687	// With ControllerRef, a pod can only have 1 controller.
688	for _, pod := range pods {
689		controllerRef := metav1.GetControllerOf(pod)
690		if controllerRef == nil {
691			unmanagedPods = append(unmanagedPods, pod.Name)
692			continue
693		}
694
695		// If we already know the scale of the controller there is no need to do anything.
696		if _, found := controllerScale[controllerRef.UID]; found {
697			continue
698		}
699
700		// Check all the supported controllers to find the desired scale.
701		foundController := false
702		for _, finder := range dc.finders() {
703			var controllerNScale *controllerAndScale
704			controllerNScale, err = finder(controllerRef, pod.Namespace)
705			if err != nil {
706				return
707			}
708			if controllerNScale != nil {
709				controllerScale[controllerNScale.UID] = controllerNScale.scale
710				foundController = true
711				break
712			}
713		}
714		if !foundController {
715			err = fmt.Errorf("found no controllers for pod %q", pod.Name)
716			return
717		}
718	}
719
720	// 2. Add up all the controllers.
721	expectedCount = 0
722	for _, count := range controllerScale {
723		expectedCount += count
724	}
725
726	return
727}
728
729func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) {
730	for _, pod := range pods {
731		// Pod is being deleted.
732		if pod.DeletionTimestamp != nil {
733			continue
734		}
735		// Pod is expected to be deleted soon.
736		if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
737			continue
738		}
739		if podutil.IsPodReady(pod) {
740			currentHealthy++
741		}
742	}
743
744	return
745}
746
747// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
748// or not-deleted at all items. Also returns an information when this check should be repeated.
749func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
750	disruptedPods := pdb.Status.DisruptedPods
751	result := make(map[string]metav1.Time)
752	var recheckTime *time.Time
753
754	if disruptedPods == nil {
755		return result, recheckTime
756	}
757	for _, pod := range pods {
758		if pod.DeletionTimestamp != nil {
759			// Already being deleted.
760			continue
761		}
762		disruptionTime, found := disruptedPods[pod.Name]
763		if !found {
764			// Pod not on the list.
765			continue
766		}
767		expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
768		if expectedDeletion.Before(currentTime) {
769			klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
770				pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
771			dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
772				pdb.Namespace, pdb.Namespace)
773		} else {
774			if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
775				recheckTime = &expectedDeletion
776			}
777			result[pod.Name] = disruptionTime
778		}
779	}
780	return result, recheckTime
781}
782
783// failSafe is an attempt to at least update the DisruptionsAllowed field to
784// 0 if everything else has failed.  This is one place we
785// implement the  "fail open" part of the design since if we manage to update
786// this field correctly, we will prevent the /evict handler from approving an
787// eviction when it may be unsafe to do so.
788func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err error) error {
789	newPdb := pdb.DeepCopy()
790	newPdb.Status.DisruptionsAllowed = 0
791
792	if newPdb.Status.Conditions == nil {
793		newPdb.Status.Conditions = make([]metav1.Condition, 0)
794	}
795	apimeta.SetStatusCondition(&newPdb.Status.Conditions, metav1.Condition{
796		Type:               policy.DisruptionAllowedCondition,
797		Status:             metav1.ConditionFalse,
798		Reason:             policy.SyncFailedReason,
799		Message:            err.Error(),
800		ObservedGeneration: newPdb.Status.ObservedGeneration,
801	})
802
803	return dc.getUpdater()(newPdb)
804}
805
806func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
807	disruptedPods map[string]metav1.Time) error {
808
809	// We require expectedCount to be > 0 so that PDBs which currently match no
810	// pods are in a safe state when their first pods appear but this controller
811	// has not updated their status yet.  This isn't the only race, but it's a
812	// common one that's easy to detect.
813	disruptionsAllowed := currentHealthy - desiredHealthy
814	if expectedCount <= 0 || disruptionsAllowed <= 0 {
815		disruptionsAllowed = 0
816	}
817
818	if pdb.Status.CurrentHealthy == currentHealthy &&
819		pdb.Status.DesiredHealthy == desiredHealthy &&
820		pdb.Status.ExpectedPods == expectedCount &&
821		pdb.Status.DisruptionsAllowed == disruptionsAllowed &&
822		apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
823		pdb.Status.ObservedGeneration == pdb.Generation &&
824		pdbhelper.ConditionsAreUpToDate(pdb) {
825		return nil
826	}
827
828	newPdb := pdb.DeepCopy()
829	newPdb.Status = policy.PodDisruptionBudgetStatus{
830		CurrentHealthy:     currentHealthy,
831		DesiredHealthy:     desiredHealthy,
832		ExpectedPods:       expectedCount,
833		DisruptionsAllowed: disruptionsAllowed,
834		DisruptedPods:      disruptedPods,
835		ObservedGeneration: pdb.Generation,
836	}
837
838	pdbhelper.UpdateDisruptionAllowedCondition(newPdb)
839
840	return dc.getUpdater()(newPdb)
841}
842
843func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
844	// If this update fails, don't retry it. Allow the failure to get handled &
845	// retried in `processNextWorkItem()`.
846	_, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(context.TODO(), pdb, metav1.UpdateOptions{})
847	return err
848}
849