1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package disruption 18 19import ( 20 "context" 21 "fmt" 22 "strings" 23 "time" 24 25 apps "k8s.io/api/apps/v1beta1" 26 v1 "k8s.io/api/core/v1" 27 "k8s.io/api/extensions/v1beta1" 28 policy "k8s.io/api/policy/v1" 29 apiequality "k8s.io/apimachinery/pkg/api/equality" 30 "k8s.io/apimachinery/pkg/api/errors" 31 apimeta "k8s.io/apimachinery/pkg/api/meta" 32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 33 "k8s.io/apimachinery/pkg/runtime/schema" 34 "k8s.io/apimachinery/pkg/types" 35 "k8s.io/apimachinery/pkg/util/intstr" 36 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 37 "k8s.io/apimachinery/pkg/util/wait" 38 "k8s.io/client-go/discovery" 39 appsv1informers "k8s.io/client-go/informers/apps/v1" 40 coreinformers "k8s.io/client-go/informers/core/v1" 41 policyinformers "k8s.io/client-go/informers/policy/v1" 42 clientset "k8s.io/client-go/kubernetes" 43 "k8s.io/client-go/kubernetes/scheme" 44 v1core "k8s.io/client-go/kubernetes/typed/core/v1" 45 appsv1listers "k8s.io/client-go/listers/apps/v1" 46 corelisters "k8s.io/client-go/listers/core/v1" 47 policylisters "k8s.io/client-go/listers/policy/v1" 48 scaleclient "k8s.io/client-go/scale" 49 "k8s.io/client-go/tools/cache" 50 "k8s.io/client-go/tools/record" 51 "k8s.io/client-go/util/workqueue" 52 pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget" 53 "k8s.io/klog/v2" 54 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 55 "k8s.io/kubernetes/pkg/controller" 56) 57 58// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status 59// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. 60// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at 61// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that 62// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should 63// be more than enough. 64// If the controller is running on a different node it is important that the two nodes have synced 65// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough 66// protection against unwanted pod disruptions. 67const ( 68 DeletionTimeout = 2 * 60 * time.Second 69) 70 71type updater func(*policy.PodDisruptionBudget) error 72 73type DisruptionController struct { 74 kubeClient clientset.Interface 75 mapper apimeta.RESTMapper 76 77 scaleNamespacer scaleclient.ScalesGetter 78 discoveryClient discovery.DiscoveryInterface 79 80 pdbLister policylisters.PodDisruptionBudgetLister 81 pdbListerSynced cache.InformerSynced 82 83 podLister corelisters.PodLister 84 podListerSynced cache.InformerSynced 85 86 rcLister corelisters.ReplicationControllerLister 87 rcListerSynced cache.InformerSynced 88 89 rsLister appsv1listers.ReplicaSetLister 90 rsListerSynced cache.InformerSynced 91 92 dLister appsv1listers.DeploymentLister 93 dListerSynced cache.InformerSynced 94 95 ssLister appsv1listers.StatefulSetLister 96 ssListerSynced cache.InformerSynced 97 98 // PodDisruptionBudget keys that need to be synced. 99 queue workqueue.RateLimitingInterface 100 recheckQueue workqueue.DelayingInterface 101 102 broadcaster record.EventBroadcaster 103 recorder record.EventRecorder 104 105 getUpdater func() updater 106} 107 108// controllerAndScale is used to return (controller, scale) pairs from the 109// controller finder functions. 110type controllerAndScale struct { 111 types.UID 112 scale int32 113} 114 115// podControllerFinder is a function type that maps a pod to a list of 116// controllers and their scale. 117type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) 118 119func NewDisruptionController( 120 podInformer coreinformers.PodInformer, 121 pdbInformer policyinformers.PodDisruptionBudgetInformer, 122 rcInformer coreinformers.ReplicationControllerInformer, 123 rsInformer appsv1informers.ReplicaSetInformer, 124 dInformer appsv1informers.DeploymentInformer, 125 ssInformer appsv1informers.StatefulSetInformer, 126 kubeClient clientset.Interface, 127 restMapper apimeta.RESTMapper, 128 scaleNamespacer scaleclient.ScalesGetter, 129 discoveryClient discovery.DiscoveryInterface, 130) *DisruptionController { 131 dc := &DisruptionController{ 132 kubeClient: kubeClient, 133 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), 134 recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"), 135 broadcaster: record.NewBroadcaster(), 136 } 137 dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"}) 138 139 dc.getUpdater = func() updater { return dc.writePdbStatus } 140 141 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 142 AddFunc: dc.addPod, 143 UpdateFunc: dc.updatePod, 144 DeleteFunc: dc.deletePod, 145 }) 146 dc.podLister = podInformer.Lister() 147 dc.podListerSynced = podInformer.Informer().HasSynced 148 149 pdbInformer.Informer().AddEventHandler( 150 cache.ResourceEventHandlerFuncs{ 151 AddFunc: dc.addDb, 152 UpdateFunc: dc.updateDb, 153 DeleteFunc: dc.removeDb, 154 }, 155 ) 156 dc.pdbLister = pdbInformer.Lister() 157 dc.pdbListerSynced = pdbInformer.Informer().HasSynced 158 159 dc.rcLister = rcInformer.Lister() 160 dc.rcListerSynced = rcInformer.Informer().HasSynced 161 162 dc.rsLister = rsInformer.Lister() 163 dc.rsListerSynced = rsInformer.Informer().HasSynced 164 165 dc.dLister = dInformer.Lister() 166 dc.dListerSynced = dInformer.Informer().HasSynced 167 168 dc.ssLister = ssInformer.Lister() 169 dc.ssListerSynced = ssInformer.Informer().HasSynced 170 171 dc.mapper = restMapper 172 dc.scaleNamespacer = scaleNamespacer 173 dc.discoveryClient = discoveryClient 174 175 return dc 176} 177 178// The workload resources do implement the scale subresource, so it would 179// be possible to only check the scale subresource here. But since there is no 180// way to take advantage of listers with scale subresources, we use the workload 181// resources directly and only fall back to the scale subresource when needed. 182func (dc *DisruptionController) finders() []podControllerFinder { 183 return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet, 184 dc.getPodStatefulSet, dc.getScaleController} 185} 186 187var ( 188 controllerKindRS = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet") 189 controllerKindSS = apps.SchemeGroupVersion.WithKind("StatefulSet") 190 controllerKindRC = v1.SchemeGroupVersion.WithKind("ReplicationController") 191 controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment") 192) 193 194// getPodReplicaSet finds a replicaset which has no matching deployments. 195func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { 196 ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) 197 if !ok || err != nil { 198 return nil, err 199 } 200 rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name) 201 if err != nil { 202 // The only possible error is NotFound, which is ok here. 203 return nil, nil 204 } 205 if rs.UID != controllerRef.UID { 206 return nil, nil 207 } 208 controllerRef = metav1.GetControllerOf(rs) 209 if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind { 210 // Skip RS if it's controlled by a Deployment. 211 return nil, nil 212 } 213 return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil 214} 215 216// getPodStatefulSet returns the statefulset referenced by the provided controllerRef. 217func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { 218 ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"}) 219 if !ok || err != nil { 220 return nil, err 221 } 222 ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name) 223 if err != nil { 224 // The only possible error is NotFound, which is ok here. 225 return nil, nil 226 } 227 if ss.UID != controllerRef.UID { 228 return nil, nil 229 } 230 231 return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil 232} 233 234// getPodDeployments finds deployments for any replicasets which are being managed by deployments. 235func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { 236 ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) 237 if !ok || err != nil { 238 return nil, err 239 } 240 rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name) 241 if err != nil { 242 // The only possible error is NotFound, which is ok here. 243 return nil, nil 244 } 245 if rs.UID != controllerRef.UID { 246 return nil, nil 247 } 248 controllerRef = metav1.GetControllerOf(rs) 249 if controllerRef == nil { 250 return nil, nil 251 } 252 253 ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"}) 254 if !ok || err != nil { 255 return nil, err 256 } 257 deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name) 258 if err != nil { 259 // The only possible error is NotFound, which is ok here. 260 return nil, nil 261 } 262 if deployment.UID != controllerRef.UID { 263 return nil, nil 264 } 265 return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil 266} 267 268func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { 269 ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""}) 270 if !ok || err != nil { 271 return nil, err 272 } 273 rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name) 274 if err != nil { 275 // The only possible error is NotFound, which is ok here. 276 return nil, nil 277 } 278 if rc.UID != controllerRef.UID { 279 return nil, nil 280 } 281 return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil 282} 283 284func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { 285 gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) 286 if err != nil { 287 return nil, err 288 } 289 290 gk := schema.GroupKind{ 291 Group: gv.Group, 292 Kind: controllerRef.Kind, 293 } 294 295 mapping, err := dc.mapper.RESTMapping(gk, gv.Version) 296 if err != nil { 297 return nil, err 298 } 299 gr := mapping.Resource.GroupResource() 300 301 scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{}) 302 if err != nil { 303 if errors.IsNotFound(err) { 304 // The IsNotFound error can mean either that the resource does not exist, 305 // or it exist but doesn't implement the scale subresource. We check which 306 // situation we are facing so we can give an appropriate error message. 307 isScale, err := dc.implementsScale(gv, controllerRef.Kind) 308 if err != nil { 309 return nil, err 310 } 311 if !isScale { 312 return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String()) 313 } 314 return nil, nil 315 } 316 return nil, err 317 } 318 if scale.UID != controllerRef.UID { 319 return nil, nil 320 } 321 return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil 322} 323 324func (dc *DisruptionController) implementsScale(gv schema.GroupVersion, kind string) (bool, error) { 325 resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gv.String()) 326 if err != nil { 327 return false, err 328 } 329 for _, resource := range resourceList.APIResources { 330 if resource.Kind != kind { 331 continue 332 } 333 if strings.HasSuffix(resource.Name, "/scale") { 334 return true, nil 335 } 336 } 337 return false, nil 338} 339 340func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) { 341 gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) 342 if err != nil { 343 return false, err 344 } 345 346 if controllerRef.Kind != expectedKind { 347 return false, nil 348 } 349 350 for _, group := range expectedGroups { 351 if group == gv.Group { 352 return true, nil 353 } 354 } 355 356 return false, nil 357} 358 359func (dc *DisruptionController) Run(stopCh <-chan struct{}) { 360 defer utilruntime.HandleCrash() 361 defer dc.queue.ShutDown() 362 363 klog.Infof("Starting disruption controller") 364 defer klog.Infof("Shutting down disruption controller") 365 366 if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { 367 return 368 } 369 370 if dc.kubeClient != nil { 371 klog.Infof("Sending events to api server.") 372 dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")}) 373 } else { 374 klog.Infof("No api server defined - no events will be sent to API server.") 375 } 376 go wait.Until(dc.worker, time.Second, stopCh) 377 go wait.Until(dc.recheckWorker, time.Second, stopCh) 378 379 <-stopCh 380} 381 382func (dc *DisruptionController) addDb(obj interface{}) { 383 pdb := obj.(*policy.PodDisruptionBudget) 384 klog.V(4).Infof("add DB %q", pdb.Name) 385 dc.enqueuePdb(pdb) 386} 387 388func (dc *DisruptionController) updateDb(old, cur interface{}) { 389 // TODO(mml) ignore updates where 'old' is equivalent to 'cur'. 390 pdb := cur.(*policy.PodDisruptionBudget) 391 klog.V(4).Infof("update DB %q", pdb.Name) 392 dc.enqueuePdb(pdb) 393} 394 395func (dc *DisruptionController) removeDb(obj interface{}) { 396 pdb, ok := obj.(*policy.PodDisruptionBudget) 397 if !ok { 398 tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 399 if !ok { 400 klog.Errorf("Couldn't get object from tombstone %+v", obj) 401 return 402 } 403 pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget) 404 if !ok { 405 klog.Errorf("Tombstone contained object that is not a pdb %+v", obj) 406 return 407 } 408 } 409 klog.V(4).Infof("remove DB %q", pdb.Name) 410 dc.enqueuePdb(pdb) 411} 412 413func (dc *DisruptionController) addPod(obj interface{}) { 414 pod := obj.(*v1.Pod) 415 klog.V(4).Infof("addPod called on pod %q", pod.Name) 416 pdb := dc.getPdbForPod(pod) 417 if pdb == nil { 418 klog.V(4).Infof("No matching pdb for pod %q", pod.Name) 419 return 420 } 421 klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) 422 dc.enqueuePdb(pdb) 423} 424 425func (dc *DisruptionController) updatePod(old, cur interface{}) { 426 pod := cur.(*v1.Pod) 427 klog.V(4).Infof("updatePod called on pod %q", pod.Name) 428 pdb := dc.getPdbForPod(pod) 429 if pdb == nil { 430 klog.V(4).Infof("No matching pdb for pod %q", pod.Name) 431 return 432 } 433 klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) 434 dc.enqueuePdb(pdb) 435} 436 437func (dc *DisruptionController) deletePod(obj interface{}) { 438 pod, ok := obj.(*v1.Pod) 439 // When a delete is dropped, the relist will notice a pod in the store not 440 // in the list, leading to the insertion of a tombstone object which contains 441 // the deleted key/value. Note that this value might be stale. If the pod 442 // changed labels the new ReplicaSet will not be woken up till the periodic 443 // resync. 444 if !ok { 445 tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 446 if !ok { 447 klog.Errorf("Couldn't get object from tombstone %+v", obj) 448 return 449 } 450 pod, ok = tombstone.Obj.(*v1.Pod) 451 if !ok { 452 klog.Errorf("Tombstone contained object that is not a pod %+v", obj) 453 return 454 } 455 } 456 klog.V(4).Infof("deletePod called on pod %q", pod.Name) 457 pdb := dc.getPdbForPod(pod) 458 if pdb == nil { 459 klog.V(4).Infof("No matching pdb for pod %q", pod.Name) 460 return 461 } 462 klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name) 463 dc.enqueuePdb(pdb) 464} 465 466func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) { 467 key, err := controller.KeyFunc(pdb) 468 if err != nil { 469 klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) 470 return 471 } 472 dc.queue.Add(key) 473} 474 475func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) { 476 key, err := controller.KeyFunc(pdb) 477 if err != nil { 478 klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) 479 return 480 } 481 dc.recheckQueue.AddAfter(key, delay) 482} 483 484func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget { 485 // GetPodPodDisruptionBudgets returns an error only if no 486 // PodDisruptionBudgets are found. We don't return that as an error to the 487 // caller. 488 pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod) 489 if err != nil { 490 klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name) 491 return nil 492 } 493 494 if len(pdbs) > 1 { 495 msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name) 496 klog.Warning(msg) 497 dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) 498 } 499 return pdbs[0] 500} 501 502// This function returns pods using the PodDisruptionBudget object. 503// IMPORTANT NOTE : the returned pods should NOT be modified. 504func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) { 505 sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) 506 if err != nil { 507 return []*v1.Pod{}, err 508 } 509 pods, err := dc.podLister.Pods(pdb.Namespace).List(sel) 510 if err != nil { 511 return []*v1.Pod{}, err 512 } 513 return pods, nil 514} 515 516func (dc *DisruptionController) worker() { 517 for dc.processNextWorkItem() { 518 } 519} 520 521func (dc *DisruptionController) processNextWorkItem() bool { 522 dKey, quit := dc.queue.Get() 523 if quit { 524 return false 525 } 526 defer dc.queue.Done(dKey) 527 528 err := dc.sync(dKey.(string)) 529 if err == nil { 530 dc.queue.Forget(dKey) 531 return true 532 } 533 534 utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err)) 535 dc.queue.AddRateLimited(dKey) 536 537 return true 538} 539 540func (dc *DisruptionController) recheckWorker() { 541 for dc.processNextRecheckWorkItem() { 542 } 543} 544 545func (dc *DisruptionController) processNextRecheckWorkItem() bool { 546 dKey, quit := dc.recheckQueue.Get() 547 if quit { 548 return false 549 } 550 defer dc.recheckQueue.Done(dKey) 551 dc.queue.AddRateLimited(dKey) 552 return true 553} 554 555func (dc *DisruptionController) sync(key string) error { 556 startTime := time.Now() 557 defer func() { 558 klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) 559 }() 560 561 namespace, name, err := cache.SplitMetaNamespaceKey(key) 562 if err != nil { 563 return err 564 } 565 pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name) 566 if errors.IsNotFound(err) { 567 klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) 568 return nil 569 } 570 if err != nil { 571 return err 572 } 573 574 err = dc.trySync(pdb) 575 // If the reason for failure was a conflict, then allow this PDB update to be 576 // requeued without triggering the failSafe logic. 577 if errors.IsConflict(err) { 578 return err 579 } 580 if err != nil { 581 klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) 582 return dc.failSafe(pdb, err) 583 } 584 585 return nil 586} 587 588func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { 589 pods, err := dc.getPodsForPdb(pdb) 590 if err != nil { 591 dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err) 592 return err 593 } 594 if len(pods) == 0 { 595 dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found") 596 } 597 598 expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(pdb, pods) 599 if err != nil { 600 dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err) 601 return err 602 } 603 // We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue. 604 if len(unmanagedPods) > 0 { 605 klog.V(4).Infof("found unmanaged pods associated with this PDB: %v", 606 strings.Join(unmanagedPods, ",'")) 607 } 608 609 currentTime := time.Now() 610 disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) 611 currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) 612 err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) 613 614 if err == nil && recheckTime != nil { 615 // There is always at most one PDB waiting with a particular name in the queue, 616 // and each PDB in the queue is associated with the lowest timestamp 617 // that was supplied when a PDB with that name was added. 618 dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime)) 619 } 620 return err 621} 622 623func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) { 624 err = nil 625 // TODO(davidopp): consider making the way expectedCount and rules about 626 // permitted controller configurations (specifically, considering it an error 627 // if a pod covered by a PDB has 0 controllers or > 1 controller) should be 628 // handled the same way for integer and percentage minAvailable 629 630 if pdb.Spec.MaxUnavailable != nil { 631 expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) 632 if err != nil { 633 return 634 } 635 var maxUnavailable int 636 maxUnavailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true) 637 if err != nil { 638 return 639 } 640 desiredHealthy = expectedCount - int32(maxUnavailable) 641 if desiredHealthy < 0 { 642 desiredHealthy = 0 643 } 644 } else if pdb.Spec.MinAvailable != nil { 645 if pdb.Spec.MinAvailable.Type == intstr.Int { 646 desiredHealthy = pdb.Spec.MinAvailable.IntVal 647 expectedCount = int32(len(pods)) 648 } else if pdb.Spec.MinAvailable.Type == intstr.String { 649 expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) 650 if err != nil { 651 return 652 } 653 654 var minAvailable int 655 minAvailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true) 656 if err != nil { 657 return 658 } 659 desiredHealthy = int32(minAvailable) 660 } 661 } 662 return 663} 664 665func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) { 666 // When the user specifies a fraction of pods that must be available, we 667 // use as the fraction's denominator 668 // SUM_{all c in C} scale(c) 669 // where C is the union of C_p1, C_p2, ..., C_pN 670 // and each C_pi is the set of controllers controlling the pod pi 671 672 // k8s only defines what will happens when 0 or 1 controllers control a 673 // given pod. We explicitly exclude the 0 controllers case here, and we 674 // report an error if we find a pod with more than 1 controller. Thus in 675 // practice each C_pi is a set of exactly 1 controller. 676 677 // A mapping from controllers to their scale. 678 controllerScale := map[types.UID]int32{} 679 680 // 1. Find the controller for each pod. 681 682 // As of now, we allow PDBs to be applied to pods via selectors, so there 683 // can be unmanaged pods(pods that don't have backing controllers) but still have PDBs associated. 684 // Such pods are to be collected and PDB backing them should be enqueued instead of immediately throwing 685 // a sync error. This ensures disruption controller is not frequently updating the status subresource and thus 686 // preventing excessive and expensive writes to etcd. 687 // With ControllerRef, a pod can only have 1 controller. 688 for _, pod := range pods { 689 controllerRef := metav1.GetControllerOf(pod) 690 if controllerRef == nil { 691 unmanagedPods = append(unmanagedPods, pod.Name) 692 continue 693 } 694 695 // If we already know the scale of the controller there is no need to do anything. 696 if _, found := controllerScale[controllerRef.UID]; found { 697 continue 698 } 699 700 // Check all the supported controllers to find the desired scale. 701 foundController := false 702 for _, finder := range dc.finders() { 703 var controllerNScale *controllerAndScale 704 controllerNScale, err = finder(controllerRef, pod.Namespace) 705 if err != nil { 706 return 707 } 708 if controllerNScale != nil { 709 controllerScale[controllerNScale.UID] = controllerNScale.scale 710 foundController = true 711 break 712 } 713 } 714 if !foundController { 715 err = fmt.Errorf("found no controllers for pod %q", pod.Name) 716 return 717 } 718 } 719 720 // 2. Add up all the controllers. 721 expectedCount = 0 722 for _, count := range controllerScale { 723 expectedCount += count 724 } 725 726 return 727} 728 729func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) { 730 for _, pod := range pods { 731 // Pod is being deleted. 732 if pod.DeletionTimestamp != nil { 733 continue 734 } 735 // Pod is expected to be deleted soon. 736 if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) { 737 continue 738 } 739 if podutil.IsPodReady(pod) { 740 currentHealthy++ 741 } 742 } 743 744 return 745} 746 747// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted 748// or not-deleted at all items. Also returns an information when this check should be repeated. 749func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) { 750 disruptedPods := pdb.Status.DisruptedPods 751 result := make(map[string]metav1.Time) 752 var recheckTime *time.Time 753 754 if disruptedPods == nil { 755 return result, recheckTime 756 } 757 for _, pod := range pods { 758 if pod.DeletionTimestamp != nil { 759 // Already being deleted. 760 continue 761 } 762 disruptionTime, found := disruptedPods[pod.Name] 763 if !found { 764 // Pod not on the list. 765 continue 766 } 767 expectedDeletion := disruptionTime.Time.Add(DeletionTimeout) 768 if expectedDeletion.Before(currentTime) { 769 klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s", 770 pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name) 771 dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't", 772 pdb.Namespace, pdb.Namespace) 773 } else { 774 if recheckTime == nil || expectedDeletion.Before(*recheckTime) { 775 recheckTime = &expectedDeletion 776 } 777 result[pod.Name] = disruptionTime 778 } 779 } 780 return result, recheckTime 781} 782 783// failSafe is an attempt to at least update the DisruptionsAllowed field to 784// 0 if everything else has failed. This is one place we 785// implement the "fail open" part of the design since if we manage to update 786// this field correctly, we will prevent the /evict handler from approving an 787// eviction when it may be unsafe to do so. 788func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err error) error { 789 newPdb := pdb.DeepCopy() 790 newPdb.Status.DisruptionsAllowed = 0 791 792 if newPdb.Status.Conditions == nil { 793 newPdb.Status.Conditions = make([]metav1.Condition, 0) 794 } 795 apimeta.SetStatusCondition(&newPdb.Status.Conditions, metav1.Condition{ 796 Type: policy.DisruptionAllowedCondition, 797 Status: metav1.ConditionFalse, 798 Reason: policy.SyncFailedReason, 799 Message: err.Error(), 800 ObservedGeneration: newPdb.Status.ObservedGeneration, 801 }) 802 803 return dc.getUpdater()(newPdb) 804} 805 806func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32, 807 disruptedPods map[string]metav1.Time) error { 808 809 // We require expectedCount to be > 0 so that PDBs which currently match no 810 // pods are in a safe state when their first pods appear but this controller 811 // has not updated their status yet. This isn't the only race, but it's a 812 // common one that's easy to detect. 813 disruptionsAllowed := currentHealthy - desiredHealthy 814 if expectedCount <= 0 || disruptionsAllowed <= 0 { 815 disruptionsAllowed = 0 816 } 817 818 if pdb.Status.CurrentHealthy == currentHealthy && 819 pdb.Status.DesiredHealthy == desiredHealthy && 820 pdb.Status.ExpectedPods == expectedCount && 821 pdb.Status.DisruptionsAllowed == disruptionsAllowed && 822 apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) && 823 pdb.Status.ObservedGeneration == pdb.Generation && 824 pdbhelper.ConditionsAreUpToDate(pdb) { 825 return nil 826 } 827 828 newPdb := pdb.DeepCopy() 829 newPdb.Status = policy.PodDisruptionBudgetStatus{ 830 CurrentHealthy: currentHealthy, 831 DesiredHealthy: desiredHealthy, 832 ExpectedPods: expectedCount, 833 DisruptionsAllowed: disruptionsAllowed, 834 DisruptedPods: disruptedPods, 835 ObservedGeneration: pdb.Generation, 836 } 837 838 pdbhelper.UpdateDisruptionAllowedCondition(newPdb) 839 840 return dc.getUpdater()(newPdb) 841} 842 843func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { 844 // If this update fails, don't retry it. Allow the failure to get handled & 845 // retried in `processNextWorkItem()`. 846 _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(context.TODO(), pdb, metav1.UpdateOptions{}) 847 return err 848} 849