1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package controller 18 19import ( 20 "context" 21 "encoding/binary" 22 "encoding/json" 23 "fmt" 24 "hash/fnv" 25 "math" 26 "sync" 27 "sync/atomic" 28 "time" 29 30 apps "k8s.io/api/apps/v1" 31 v1 "k8s.io/api/core/v1" 32 apierrors "k8s.io/apimachinery/pkg/api/errors" 33 "k8s.io/apimachinery/pkg/api/meta" 34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 35 "k8s.io/apimachinery/pkg/labels" 36 "k8s.io/apimachinery/pkg/runtime" 37 "k8s.io/apimachinery/pkg/types" 38 "k8s.io/apimachinery/pkg/util/clock" 39 "k8s.io/apimachinery/pkg/util/rand" 40 "k8s.io/apimachinery/pkg/util/sets" 41 "k8s.io/apimachinery/pkg/util/strategicpatch" 42 "k8s.io/apimachinery/pkg/util/wait" 43 utilfeature "k8s.io/apiserver/pkg/util/feature" 44 clientset "k8s.io/client-go/kubernetes" 45 "k8s.io/client-go/tools/cache" 46 "k8s.io/client-go/tools/record" 47 clientretry "k8s.io/client-go/util/retry" 48 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 49 "k8s.io/kubernetes/pkg/apis/core/helper" 50 _ "k8s.io/kubernetes/pkg/apis/core/install" 51 "k8s.io/kubernetes/pkg/apis/core/validation" 52 "k8s.io/kubernetes/pkg/features" 53 hashutil "k8s.io/kubernetes/pkg/util/hash" 54 taintutils "k8s.io/kubernetes/pkg/util/taints" 55 "k8s.io/utils/integer" 56 57 "k8s.io/klog/v2" 58) 59 60const ( 61 // If a watch drops a delete event for a pod, it'll take this long 62 // before a dormant controller waiting for those packets is woken up anyway. It is 63 // specifically targeted at the case where some problem prevents an update 64 // of expectations, without it the controller could stay asleep forever. This should 65 // be set based on the expected latency of watch events. 66 // 67 // Currently a controller can service (create *and* observe the watch events for said 68 // creation) about 10 pods a second, so it takes about 1 min to service 69 // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s 70 // latency/pod at the scale of 3000 pods over 100 nodes. 71 ExpectationsTimeout = 5 * time.Minute 72 // When batching pod creates, SlowStartInitialBatchSize is the size of the 73 // initial batch. The size of each successive batch is twice the size of 74 // the previous batch. For example, for a value of 1, batch sizes would be 75 // 1, 2, 4, 8, ... and for a value of 10, batch sizes would be 76 // 10, 20, 40, 80, ... Setting the value higher means that quota denials 77 // will result in more doomed API calls and associated event spam. Setting 78 // the value lower will result in more API call round trip periods for 79 // large batches. 80 // 81 // Given a number of pods to start "N": 82 // The number of doomed calls per sync once quota is exceeded is given by: 83 // min(N,SlowStartInitialBatchSize) 84 // The number of batches is given by: 85 // 1+floor(log_2(ceil(N/SlowStartInitialBatchSize))) 86 SlowStartInitialBatchSize = 1 87) 88 89var UpdateTaintBackoff = wait.Backoff{ 90 Steps: 5, 91 Duration: 100 * time.Millisecond, 92 Jitter: 1.0, 93} 94 95var UpdateLabelBackoff = wait.Backoff{ 96 Steps: 5, 97 Duration: 100 * time.Millisecond, 98 Jitter: 1.0, 99} 100 101var ( 102 KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc 103 podPhaseToOrdinal = map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2} 104) 105 106type ResyncPeriodFunc func() time.Duration 107 108// Returns 0 for resyncPeriod in case resyncing is not needed. 109func NoResyncPeriodFunc() time.Duration { 110 return 0 111} 112 113// StaticResyncPeriodFunc returns the resync period specified 114func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc { 115 return func() time.Duration { 116 return resyncPeriod 117 } 118} 119 120// Expectations are a way for controllers to tell the controller manager what they expect. eg: 121// ControllerExpectations: { 122// controller1: expects 2 adds in 2 minutes 123// controller2: expects 2 dels in 2 minutes 124// controller3: expects -1 adds in 2 minutes => controller3's expectations have already been met 125// } 126// 127// Implementation: 128// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion 129// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller 130// 131// * Once set expectations can only be lowered 132// * A controller isn't synced till its expectations are either fulfilled, or expire 133// * Controllers that don't set expectations will get woken up for every matching controllee 134 135// ExpKeyFunc to parse out the key from a ControlleeExpectation 136var ExpKeyFunc = func(obj interface{}) (string, error) { 137 if e, ok := obj.(*ControlleeExpectations); ok { 138 return e.key, nil 139 } 140 return "", fmt.Errorf("could not find key for obj %#v", obj) 141} 142 143// ControllerExpectationsInterface is an interface that allows users to set and wait on expectations. 144// Only abstracted out for testing. 145// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different 146// types of controllers, because the keys might conflict across types. 147type ControllerExpectationsInterface interface { 148 GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) 149 SatisfiedExpectations(controllerKey string) bool 150 DeleteExpectations(controllerKey string) 151 SetExpectations(controllerKey string, add, del int) error 152 ExpectCreations(controllerKey string, adds int) error 153 ExpectDeletions(controllerKey string, dels int) error 154 CreationObserved(controllerKey string) 155 DeletionObserved(controllerKey string) 156 RaiseExpectations(controllerKey string, add, del int) 157 LowerExpectations(controllerKey string, add, del int) 158} 159 160// ControllerExpectations is a cache mapping controllers to what they expect to see before being woken up for a sync. 161type ControllerExpectations struct { 162 cache.Store 163} 164 165// GetExpectations returns the ControlleeExpectations of the given controller. 166func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) { 167 exp, exists, err := r.GetByKey(controllerKey) 168 if err == nil && exists { 169 return exp.(*ControlleeExpectations), true, nil 170 } 171 return nil, false, err 172} 173 174// DeleteExpectations deletes the expectations of the given controller from the TTLStore. 175func (r *ControllerExpectations) DeleteExpectations(controllerKey string) { 176 if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists { 177 if err := r.Delete(exp); err != nil { 178 klog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err) 179 } 180 } 181} 182 183// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed. 184// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller 185// manager. 186func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool { 187 if exp, exists, err := r.GetExpectations(controllerKey); exists { 188 if exp.Fulfilled() { 189 klog.V(4).Infof("Controller expectations fulfilled %#v", exp) 190 return true 191 } else if exp.isExpired() { 192 klog.V(4).Infof("Controller expectations expired %#v", exp) 193 return true 194 } else { 195 klog.V(4).Infof("Controller still waiting on expectations %#v", exp) 196 return false 197 } 198 } else if err != nil { 199 klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) 200 } else { 201 // When a new controller is created, it doesn't have expectations. 202 // When it doesn't see expected watch events for > TTL, the expectations expire. 203 // - In this case it wakes up, creates/deletes controllees, and sets expectations again. 204 // When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire. 205 // - In this case it continues without setting expectations till it needs to create/delete controllees. 206 klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey) 207 } 208 // Trigger a sync if we either encountered and error (which shouldn't happen since we're 209 // getting from local store) or this controller hasn't established expectations. 210 return true 211} 212 213// TODO: Extend ExpirationCache to support explicit expiration. 214// TODO: Make this possible to disable in tests. 215// TODO: Support injection of clock. 216func (exp *ControlleeExpectations) isExpired() bool { 217 return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout 218} 219 220// SetExpectations registers new expectations for the given controller. Forgets existing expectations. 221func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error { 222 exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()} 223 klog.V(4).Infof("Setting expectations %#v", exp) 224 return r.Add(exp) 225} 226 227func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error { 228 return r.SetExpectations(controllerKey, adds, 0) 229} 230 231func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) error { 232 return r.SetExpectations(controllerKey, 0, dels) 233} 234 235// Decrements the expectation counts of the given controller. 236func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) { 237 if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { 238 exp.Add(int64(-add), int64(-del)) 239 // The expectations might've been modified since the update on the previous line. 240 klog.V(4).Infof("Lowered expectations %#v", exp) 241 } 242} 243 244// Increments the expectation counts of the given controller. 245func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) { 246 if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists { 247 exp.Add(int64(add), int64(del)) 248 // The expectations might've been modified since the update on the previous line. 249 klog.V(4).Infof("Raised expectations %#v", exp) 250 } 251} 252 253// CreationObserved atomically decrements the `add` expectation count of the given controller. 254func (r *ControllerExpectations) CreationObserved(controllerKey string) { 255 r.LowerExpectations(controllerKey, 1, 0) 256} 257 258// DeletionObserved atomically decrements the `del` expectation count of the given controller. 259func (r *ControllerExpectations) DeletionObserved(controllerKey string) { 260 r.LowerExpectations(controllerKey, 0, 1) 261} 262 263// ControlleeExpectations track controllee creates/deletes. 264type ControlleeExpectations struct { 265 // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms 266 // See: https://golang.org/pkg/sync/atomic/ for more information 267 add int64 268 del int64 269 key string 270 timestamp time.Time 271} 272 273// Add increments the add and del counters. 274func (e *ControlleeExpectations) Add(add, del int64) { 275 atomic.AddInt64(&e.add, add) 276 atomic.AddInt64(&e.del, del) 277} 278 279// Fulfilled returns true if this expectation has been fulfilled. 280func (e *ControlleeExpectations) Fulfilled() bool { 281 // TODO: think about why this line being atomic doesn't matter 282 return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 283} 284 285// GetExpectations returns the add and del expectations of the controllee. 286func (e *ControlleeExpectations) GetExpectations() (int64, int64) { 287 return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del) 288} 289 290// NewControllerExpectations returns a store for ControllerExpectations. 291func NewControllerExpectations() *ControllerExpectations { 292 return &ControllerExpectations{cache.NewStore(ExpKeyFunc)} 293} 294 295// UIDSetKeyFunc to parse out the key from a UIDSet. 296var UIDSetKeyFunc = func(obj interface{}) (string, error) { 297 if u, ok := obj.(*UIDSet); ok { 298 return u.key, nil 299 } 300 return "", fmt.Errorf("could not find key for obj %#v", obj) 301} 302 303// UIDSet holds a key and a set of UIDs. Used by the 304// UIDTrackingControllerExpectations to remember which UID it has seen/still 305// waiting for. 306type UIDSet struct { 307 sets.String 308 key string 309} 310 311// UIDTrackingControllerExpectations tracks the UID of the pods it deletes. 312// This cache is needed over plain old expectations to safely handle graceful 313// deletion. The desired behavior is to treat an update that sets the 314// DeletionTimestamp on an object as a delete. To do so consistently, one needs 315// to remember the expected deletes so they aren't double counted. 316// TODO: Track creates as well (#22599) 317type UIDTrackingControllerExpectations struct { 318 ControllerExpectationsInterface 319 // TODO: There is a much nicer way to do this that involves a single store, 320 // a lock per entry, and a ControlleeExpectationsInterface type. 321 uidStoreLock sync.Mutex 322 // Store used for the UIDs associated with any expectation tracked via the 323 // ControllerExpectationsInterface. 324 uidStore cache.Store 325} 326 327// GetUIDs is a convenience method to avoid exposing the set of expected uids. 328// The returned set is not thread safe, all modifications must be made holding 329// the uidStoreLock. 330func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String { 331 if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists { 332 return uid.(*UIDSet).String 333 } 334 return nil 335} 336 337// ExpectDeletions records expectations for the given deleteKeys, against the given controller. 338func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error { 339 expectedUIDs := sets.NewString() 340 for _, k := range deletedKeys { 341 expectedUIDs.Insert(k) 342 } 343 klog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys) 344 u.uidStoreLock.Lock() 345 defer u.uidStoreLock.Unlock() 346 347 if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 { 348 klog.Errorf("Clobbering existing delete keys: %+v", existing) 349 } 350 if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil { 351 return err 352 } 353 return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len()) 354} 355 356// DeletionObserved records the given deleteKey as a deletion, for the given rc. 357func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) { 358 u.uidStoreLock.Lock() 359 defer u.uidStoreLock.Unlock() 360 361 uids := u.GetUIDs(rcKey) 362 if uids != nil && uids.Has(deleteKey) { 363 klog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey) 364 u.ControllerExpectationsInterface.DeletionObserved(rcKey) 365 uids.Delete(deleteKey) 366 } 367} 368 369// DeleteExpectations deletes the UID set and invokes DeleteExpectations on the 370// underlying ControllerExpectationsInterface. 371func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) { 372 u.uidStoreLock.Lock() 373 defer u.uidStoreLock.Unlock() 374 375 u.ControllerExpectationsInterface.DeleteExpectations(rcKey) 376 if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists { 377 if err := u.uidStore.Delete(uidExp); err != nil { 378 klog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err) 379 } 380 } 381} 382 383// NewUIDTrackingControllerExpectations returns a wrapper around 384// ControllerExpectations that is aware of deleteKeys. 385func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations { 386 return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)} 387} 388 389// Reasons for pod events 390const ( 391 // FailedCreatePodReason is added in an event and in a replica set condition 392 // when a pod for a replica set is failed to be created. 393 FailedCreatePodReason = "FailedCreate" 394 // SuccessfulCreatePodReason is added in an event when a pod for a replica set 395 // is successfully created. 396 SuccessfulCreatePodReason = "SuccessfulCreate" 397 // FailedDeletePodReason is added in an event and in a replica set condition 398 // when a pod for a replica set is failed to be deleted. 399 FailedDeletePodReason = "FailedDelete" 400 // SuccessfulDeletePodReason is added in an event when a pod for a replica set 401 // is successfully deleted. 402 SuccessfulDeletePodReason = "SuccessfulDelete" 403) 404 405// RSControlInterface is an interface that knows how to add or delete 406// ReplicaSets, as well as increment or decrement them. It is used 407// by the deployment controller to ease testing of actions that it takes. 408type RSControlInterface interface { 409 PatchReplicaSet(namespace, name string, data []byte) error 410} 411 412// RealRSControl is the default implementation of RSControllerInterface. 413type RealRSControl struct { 414 KubeClient clientset.Interface 415 Recorder record.EventRecorder 416} 417 418var _ RSControlInterface = &RealRSControl{} 419 420func (r RealRSControl) PatchReplicaSet(namespace, name string, data []byte) error { 421 _, err := r.KubeClient.AppsV1().ReplicaSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) 422 return err 423} 424 425// TODO: merge the controller revision interface in controller_history.go with this one 426// ControllerRevisionControlInterface is an interface that knows how to patch 427// ControllerRevisions, as well as increment or decrement them. It is used 428// by the daemonset controller to ease testing of actions that it takes. 429type ControllerRevisionControlInterface interface { 430 PatchControllerRevision(namespace, name string, data []byte) error 431} 432 433// RealControllerRevisionControl is the default implementation of ControllerRevisionControlInterface. 434type RealControllerRevisionControl struct { 435 KubeClient clientset.Interface 436} 437 438var _ ControllerRevisionControlInterface = &RealControllerRevisionControl{} 439 440func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name string, data []byte) error { 441 _, err := r.KubeClient.AppsV1().ControllerRevisions(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) 442 return err 443} 444 445// PodControlInterface is an interface that knows how to add or delete pods 446// created as an interface to allow testing. 447type PodControlInterface interface { 448 // CreatePods creates new pods according to the spec, and sets object as the pod's controller. 449 CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error 450 // CreatePodsWithGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName. 451 CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error 452 // DeletePod deletes the pod identified by podID. 453 DeletePod(namespace string, podID string, object runtime.Object) error 454 // PatchPod patches the pod. 455 PatchPod(namespace, name string, data []byte) error 456} 457 458// RealPodControl is the default implementation of PodControlInterface. 459type RealPodControl struct { 460 KubeClient clientset.Interface 461 Recorder record.EventRecorder 462} 463 464var _ PodControlInterface = &RealPodControl{} 465 466func getPodsLabelSet(template *v1.PodTemplateSpec) labels.Set { 467 desiredLabels := make(labels.Set) 468 for k, v := range template.Labels { 469 desiredLabels[k] = v 470 } 471 return desiredLabels 472} 473 474func getPodsFinalizers(template *v1.PodTemplateSpec) []string { 475 desiredFinalizers := make([]string, len(template.Finalizers)) 476 copy(desiredFinalizers, template.Finalizers) 477 return desiredFinalizers 478} 479 480func getPodsAnnotationSet(template *v1.PodTemplateSpec) labels.Set { 481 desiredAnnotations := make(labels.Set) 482 for k, v := range template.Annotations { 483 desiredAnnotations[k] = v 484 } 485 return desiredAnnotations 486} 487 488func getPodsPrefix(controllerName string) string { 489 // use the dash (if the name isn't too long) to make the pod name a bit prettier 490 prefix := fmt.Sprintf("%s-", controllerName) 491 if len(validation.ValidatePodName(prefix, true)) != 0 { 492 prefix = controllerName 493 } 494 return prefix 495} 496 497func validateControllerRef(controllerRef *metav1.OwnerReference) error { 498 if controllerRef == nil { 499 return fmt.Errorf("controllerRef is nil") 500 } 501 if len(controllerRef.APIVersion) == 0 { 502 return fmt.Errorf("controllerRef has empty APIVersion") 503 } 504 if len(controllerRef.Kind) == 0 { 505 return fmt.Errorf("controllerRef has empty Kind") 506 } 507 if controllerRef.Controller == nil || *controllerRef.Controller != true { 508 return fmt.Errorf("controllerRef.Controller is not set to true") 509 } 510 if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { 511 return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") 512 } 513 return nil 514} 515 516func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { 517 return r.CreatePodsWithGenerateName(namespace, template, controllerObject, controllerRef, "") 518} 519 520func (r RealPodControl) CreatePodsWithGenerateName(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error { 521 if err := validateControllerRef(controllerRef); err != nil { 522 return err 523 } 524 pod, err := GetPodFromTemplate(template, controllerObject, controllerRef) 525 if err != nil { 526 return err 527 } 528 if len(generateName) > 0 { 529 pod.ObjectMeta.GenerateName = generateName 530 } 531 return r.createPods(namespace, pod, controllerObject) 532} 533 534func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { 535 _, err := r.KubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) 536 return err 537} 538 539func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Object, controllerRef *metav1.OwnerReference) (*v1.Pod, error) { 540 desiredLabels := getPodsLabelSet(template) 541 desiredFinalizers := getPodsFinalizers(template) 542 desiredAnnotations := getPodsAnnotationSet(template) 543 accessor, err := meta.Accessor(parentObject) 544 if err != nil { 545 return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err) 546 } 547 prefix := getPodsPrefix(accessor.GetName()) 548 549 pod := &v1.Pod{ 550 ObjectMeta: metav1.ObjectMeta{ 551 Labels: desiredLabels, 552 Annotations: desiredAnnotations, 553 GenerateName: prefix, 554 Finalizers: desiredFinalizers, 555 }, 556 } 557 if controllerRef != nil { 558 pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef) 559 } 560 pod.Spec = *template.Spec.DeepCopy() 561 return pod, nil 562} 563 564func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime.Object) error { 565 if len(labels.Set(pod.Labels)) == 0 { 566 return fmt.Errorf("unable to create pods, no labels") 567 } 568 newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) 569 if err != nil { 570 // only send an event if the namespace isn't terminating 571 if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { 572 r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err) 573 } 574 return err 575 } 576 accessor, err := meta.Accessor(object) 577 if err != nil { 578 klog.Errorf("parentObject does not have ObjectMeta, %v", err) 579 return nil 580 } 581 klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name) 582 r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name) 583 584 return nil 585} 586 587func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error { 588 accessor, err := meta.Accessor(object) 589 if err != nil { 590 return fmt.Errorf("object does not have ObjectMeta, %v", err) 591 } 592 klog.V(2).InfoS("Deleting pod", "controller", accessor.GetName(), "pod", klog.KRef(namespace, podID)) 593 if err := r.KubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), podID, metav1.DeleteOptions{}); err != nil { 594 if apierrors.IsNotFound(err) { 595 klog.V(4).Infof("pod %v/%v has already been deleted.", namespace, podID) 596 return err 597 } 598 r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err) 599 return fmt.Errorf("unable to delete pods: %v", err) 600 } 601 r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID) 602 603 return nil 604} 605 606type FakePodControl struct { 607 sync.Mutex 608 Templates []v1.PodTemplateSpec 609 ControllerRefs []metav1.OwnerReference 610 DeletePodName []string 611 Patches [][]byte 612 Err error 613 CreateLimit int 614 CreateCallCount int 615} 616 617var _ PodControlInterface = &FakePodControl{} 618 619func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { 620 f.Lock() 621 defer f.Unlock() 622 f.Patches = append(f.Patches, data) 623 if f.Err != nil { 624 return f.Err 625 } 626 return nil 627} 628 629func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { 630 f.Lock() 631 defer f.Unlock() 632 f.CreateCallCount++ 633 if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { 634 return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) 635 } 636 f.Templates = append(f.Templates, *spec) 637 f.ControllerRefs = append(f.ControllerRefs, *controllerRef) 638 if f.Err != nil { 639 return f.Err 640 } 641 return nil 642} 643 644func (f *FakePodControl) CreatePodsWithGenerateName(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error { 645 f.Lock() 646 defer f.Unlock() 647 f.CreateCallCount++ 648 if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit { 649 return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount) 650 } 651 f.Templates = append(f.Templates, *spec) 652 f.ControllerRefs = append(f.ControllerRefs, *controllerRef) 653 if f.Err != nil { 654 return f.Err 655 } 656 return nil 657} 658 659func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error { 660 f.Lock() 661 defer f.Unlock() 662 f.DeletePodName = append(f.DeletePodName, podID) 663 if f.Err != nil { 664 return f.Err 665 } 666 return nil 667} 668 669func (f *FakePodControl) Clear() { 670 f.Lock() 671 defer f.Unlock() 672 f.DeletePodName = []string{} 673 f.Templates = []v1.PodTemplateSpec{} 674 f.ControllerRefs = []metav1.OwnerReference{} 675 f.Patches = [][]byte{} 676 f.CreateLimit = 0 677 f.CreateCallCount = 0 678} 679 680// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs. 681type ByLogging []*v1.Pod 682 683func (s ByLogging) Len() int { return len(s) } 684func (s ByLogging) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 685 686func (s ByLogging) Less(i, j int) bool { 687 // 1. assigned < unassigned 688 if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) { 689 return len(s[i].Spec.NodeName) > 0 690 } 691 // 2. PodRunning < PodUnknown < PodPending 692 if s[i].Status.Phase != s[j].Status.Phase { 693 return podPhaseToOrdinal[s[i].Status.Phase] > podPhaseToOrdinal[s[j].Status.Phase] 694 } 695 // 3. ready < not ready 696 if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) { 697 return podutil.IsPodReady(s[i]) 698 } 699 // TODO: take availability into account when we push minReadySeconds information from deployment into pods, 700 // see https://github.com/kubernetes/kubernetes/issues/22065 701 // 4. Been ready for more time < less time < empty time 702 if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) { 703 readyTime1 := podReadyTime(s[i]) 704 readyTime2 := podReadyTime(s[j]) 705 if !readyTime1.Equal(readyTime2) { 706 return afterOrZero(readyTime2, readyTime1) 707 } 708 } 709 // 5. Pods with containers with higher restart counts < lower restart counts 710 if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) { 711 return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j]) 712 } 713 // 6. older pods < newer pods < empty timestamp pods 714 if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) { 715 return afterOrZero(&s[j].CreationTimestamp, &s[i].CreationTimestamp) 716 } 717 return false 718} 719 720// ActivePods type allows custom sorting of pods so a controller can pick the best ones to delete. 721type ActivePods []*v1.Pod 722 723func (s ActivePods) Len() int { return len(s) } 724func (s ActivePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 725 726func (s ActivePods) Less(i, j int) bool { 727 // 1. Unassigned < assigned 728 // If only one of the pods is unassigned, the unassigned one is smaller 729 if s[i].Spec.NodeName != s[j].Spec.NodeName && (len(s[i].Spec.NodeName) == 0 || len(s[j].Spec.NodeName) == 0) { 730 return len(s[i].Spec.NodeName) == 0 731 } 732 // 2. PodPending < PodUnknown < PodRunning 733 if podPhaseToOrdinal[s[i].Status.Phase] != podPhaseToOrdinal[s[j].Status.Phase] { 734 return podPhaseToOrdinal[s[i].Status.Phase] < podPhaseToOrdinal[s[j].Status.Phase] 735 } 736 // 3. Not ready < ready 737 // If only one of the pods is not ready, the not ready one is smaller 738 if podutil.IsPodReady(s[i]) != podutil.IsPodReady(s[j]) { 739 return !podutil.IsPodReady(s[i]) 740 } 741 // TODO: take availability into account when we push minReadySeconds information from deployment into pods, 742 // see https://github.com/kubernetes/kubernetes/issues/22065 743 // 4. Been ready for empty time < less time < more time 744 // If both pods are ready, the latest ready one is smaller 745 if podutil.IsPodReady(s[i]) && podutil.IsPodReady(s[j]) { 746 readyTime1 := podReadyTime(s[i]) 747 readyTime2 := podReadyTime(s[j]) 748 if !readyTime1.Equal(readyTime2) { 749 return afterOrZero(readyTime1, readyTime2) 750 } 751 } 752 // 5. Pods with containers with higher restart counts < lower restart counts 753 if maxContainerRestarts(s[i]) != maxContainerRestarts(s[j]) { 754 return maxContainerRestarts(s[i]) > maxContainerRestarts(s[j]) 755 } 756 // 6. Empty creation time pods < newer pods < older pods 757 if !s[i].CreationTimestamp.Equal(&s[j].CreationTimestamp) { 758 return afterOrZero(&s[i].CreationTimestamp, &s[j].CreationTimestamp) 759 } 760 return false 761} 762 763// ActivePodsWithRanks is a sortable list of pods and a list of corresponding 764// ranks which will be considered during sorting. The two lists must have equal 765// length. After sorting, the pods will be ordered as follows, applying each 766// rule in turn until one matches: 767// 768// 1. If only one of the pods is assigned to a node, the pod that is not 769// assigned comes before the pod that is. 770// 2. If the pods' phases differ, a pending pod comes before a pod whose phase 771// is unknown, and a pod whose phase is unknown comes before a running pod. 772// 3. If exactly one of the pods is ready, the pod that is not ready comes 773// before the ready pod. 774// 4. If controller.kubernetes.io/pod-deletion-cost annotation is set, then 775// the pod with the lower value will come first. 776// 5. If the pods' ranks differ, the pod with greater rank comes before the pod 777// with lower rank. 778// 6. If both pods are ready but have not been ready for the same amount of 779// time, the pod that has been ready for a shorter amount of time comes 780// before the pod that has been ready for longer. 781// 7. If one pod has a container that has restarted more than any container in 782// the other pod, the pod with the container with more restarts comes 783// before the other pod. 784// 8. If the pods' creation times differ, the pod that was created more recently 785// comes before the older pod. 786// 787// In 6 and 8, times are compared in a logarithmic scale. This allows a level 788// of randomness among equivalent Pods when sorting. If two pods have the same 789// logarithmic rank, they are sorted by UUID to provide a pseudorandom order. 790// 791// If none of these rules matches, the second pod comes before the first pod. 792// 793// The intention of this ordering is to put pods that should be preferred for 794// deletion first in the list. 795type ActivePodsWithRanks struct { 796 // Pods is a list of pods. 797 Pods []*v1.Pod 798 799 // Rank is a ranking of pods. This ranking is used during sorting when 800 // comparing two pods that are both scheduled, in the same phase, and 801 // having the same ready status. 802 Rank []int 803 804 // Now is a reference timestamp for doing logarithmic timestamp comparisons. 805 // If zero, comparison happens without scaling. 806 Now metav1.Time 807} 808 809func (s ActivePodsWithRanks) Len() int { 810 return len(s.Pods) 811} 812 813func (s ActivePodsWithRanks) Swap(i, j int) { 814 s.Pods[i], s.Pods[j] = s.Pods[j], s.Pods[i] 815 s.Rank[i], s.Rank[j] = s.Rank[j], s.Rank[i] 816} 817 818// Less compares two pods with corresponding ranks and returns true if the first 819// one should be preferred for deletion. 820func (s ActivePodsWithRanks) Less(i, j int) bool { 821 // 1. Unassigned < assigned 822 // If only one of the pods is unassigned, the unassigned one is smaller 823 if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) { 824 return len(s.Pods[i].Spec.NodeName) == 0 825 } 826 // 2. PodPending < PodUnknown < PodRunning 827 if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] { 828 return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase] 829 } 830 // 3. Not ready < ready 831 // If only one of the pods is not ready, the not ready one is smaller 832 if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) { 833 return !podutil.IsPodReady(s.Pods[i]) 834 } 835 836 // 4. higher pod-deletion-cost < lower pod-deletion cost 837 if utilfeature.DefaultFeatureGate.Enabled(features.PodDeletionCost) { 838 pi, _ := helper.GetDeletionCostFromPodAnnotations(s.Pods[i].Annotations) 839 pj, _ := helper.GetDeletionCostFromPodAnnotations(s.Pods[j].Annotations) 840 if pi != pj { 841 return pi < pj 842 } 843 } 844 845 // 5. Doubled up < not doubled up 846 // If one of the two pods is on the same node as one or more additional 847 // ready pods that belong to the same replicaset, whichever pod has more 848 // colocated ready pods is less 849 if s.Rank[i] != s.Rank[j] { 850 return s.Rank[i] > s.Rank[j] 851 } 852 // TODO: take availability into account when we push minReadySeconds information from deployment into pods, 853 // see https://github.com/kubernetes/kubernetes/issues/22065 854 // 6. Been ready for empty time < less time < more time 855 // If both pods are ready, the latest ready one is smaller 856 if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) { 857 readyTime1 := podReadyTime(s.Pods[i]) 858 readyTime2 := podReadyTime(s.Pods[j]) 859 if !readyTime1.Equal(readyTime2) { 860 if !utilfeature.DefaultFeatureGate.Enabled(features.LogarithmicScaleDown) { 861 return afterOrZero(readyTime1, readyTime2) 862 } else { 863 if s.Now.IsZero() || readyTime1.IsZero() || readyTime2.IsZero() { 864 return afterOrZero(readyTime1, readyTime2) 865 } 866 rankDiff := logarithmicRankDiff(*readyTime1, *readyTime2, s.Now) 867 if rankDiff == 0 { 868 return s.Pods[i].UID < s.Pods[j].UID 869 } 870 return rankDiff < 0 871 } 872 } 873 } 874 // 7. Pods with containers with higher restart counts < lower restart counts 875 if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) { 876 return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j]) 877 } 878 // 8. Empty creation time pods < newer pods < older pods 879 if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) { 880 if !utilfeature.DefaultFeatureGate.Enabled(features.LogarithmicScaleDown) { 881 return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp) 882 } else { 883 if s.Now.IsZero() || s.Pods[i].CreationTimestamp.IsZero() || s.Pods[j].CreationTimestamp.IsZero() { 884 return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp) 885 } 886 rankDiff := logarithmicRankDiff(s.Pods[i].CreationTimestamp, s.Pods[j].CreationTimestamp, s.Now) 887 if rankDiff == 0 { 888 return s.Pods[i].UID < s.Pods[j].UID 889 } 890 return rankDiff < 0 891 } 892 } 893 return false 894} 895 896// afterOrZero checks if time t1 is after time t2; if one of them 897// is zero, the zero time is seen as after non-zero time. 898func afterOrZero(t1, t2 *metav1.Time) bool { 899 if t1.Time.IsZero() || t2.Time.IsZero() { 900 return t1.Time.IsZero() 901 } 902 return t1.After(t2.Time) 903} 904 905// logarithmicRankDiff calculates the base-2 logarithmic ranks of 2 timestamps, 906// compared to the current timestamp 907func logarithmicRankDiff(t1, t2, now metav1.Time) int64 { 908 d1 := now.Sub(t1.Time) 909 d2 := now.Sub(t2.Time) 910 r1 := int64(-1) 911 r2 := int64(-1) 912 if d1 > 0 { 913 r1 = int64(math.Log2(float64(d1))) 914 } 915 if d2 > 0 { 916 r2 = int64(math.Log2(float64(d2))) 917 } 918 return r1 - r2 919} 920 921func podReadyTime(pod *v1.Pod) *metav1.Time { 922 if podutil.IsPodReady(pod) { 923 for _, c := range pod.Status.Conditions { 924 // we only care about pod ready conditions 925 if c.Type == v1.PodReady && c.Status == v1.ConditionTrue { 926 return &c.LastTransitionTime 927 } 928 } 929 } 930 return &metav1.Time{} 931} 932 933func maxContainerRestarts(pod *v1.Pod) int { 934 maxRestarts := 0 935 for _, c := range pod.Status.ContainerStatuses { 936 maxRestarts = integer.IntMax(maxRestarts, int(c.RestartCount)) 937 } 938 return maxRestarts 939} 940 941// FilterActivePods returns pods that have not terminated. 942func FilterActivePods(pods []*v1.Pod) []*v1.Pod { 943 var result []*v1.Pod 944 for _, p := range pods { 945 if IsPodActive(p) { 946 result = append(result, p) 947 } else { 948 klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v", 949 p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp) 950 } 951 } 952 return result 953} 954 955func IsPodActive(p *v1.Pod) bool { 956 return v1.PodSucceeded != p.Status.Phase && 957 v1.PodFailed != p.Status.Phase && 958 p.DeletionTimestamp == nil 959} 960 961// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. 962func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet { 963 activeFilter := func(rs *apps.ReplicaSet) bool { 964 return rs != nil && *(rs.Spec.Replicas) > 0 965 } 966 return FilterReplicaSets(replicaSets, activeFilter) 967} 968 969type filterRS func(rs *apps.ReplicaSet) bool 970 971// FilterReplicaSets returns replica sets that are filtered by filterFn (all returned ones should match filterFn). 972func FilterReplicaSets(RSes []*apps.ReplicaSet, filterFn filterRS) []*apps.ReplicaSet { 973 var filtered []*apps.ReplicaSet 974 for i := range RSes { 975 if filterFn(RSes[i]) { 976 filtered = append(filtered, RSes[i]) 977 } 978 } 979 return filtered 980} 981 982// PodKey returns a key unique to the given pod within a cluster. 983// It's used so we consistently use the same key scheme in this module. 984// It does exactly what cache.MetaNamespaceKeyFunc would have done 985// except there's not possibility for error since we know the exact type. 986func PodKey(pod *v1.Pod) string { 987 return fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) 988} 989 990// ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker. 991type ControllersByCreationTimestamp []*v1.ReplicationController 992 993func (o ControllersByCreationTimestamp) Len() int { return len(o) } 994func (o ControllersByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } 995func (o ControllersByCreationTimestamp) Less(i, j int) bool { 996 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { 997 return o[i].Name < o[j].Name 998 } 999 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) 1000} 1001 1002// ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. 1003type ReplicaSetsByCreationTimestamp []*apps.ReplicaSet 1004 1005func (o ReplicaSetsByCreationTimestamp) Len() int { return len(o) } 1006func (o ReplicaSetsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } 1007func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { 1008 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) { 1009 return o[i].Name < o[j].Name 1010 } 1011 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) 1012} 1013 1014// ReplicaSetsBySizeOlder sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. 1015// By using the creation timestamp, this sorts from old to new replica sets. 1016type ReplicaSetsBySizeOlder []*apps.ReplicaSet 1017 1018func (o ReplicaSetsBySizeOlder) Len() int { return len(o) } 1019func (o ReplicaSetsBySizeOlder) Swap(i, j int) { o[i], o[j] = o[j], o[i] } 1020func (o ReplicaSetsBySizeOlder) Less(i, j int) bool { 1021 if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) { 1022 return ReplicaSetsByCreationTimestamp(o).Less(i, j) 1023 } 1024 return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas) 1025} 1026 1027// ReplicaSetsBySizeNewer sorts a list of ReplicaSet by size in descending order, using their creation timestamp or name as a tie breaker. 1028// By using the creation timestamp, this sorts from new to old replica sets. 1029type ReplicaSetsBySizeNewer []*apps.ReplicaSet 1030 1031func (o ReplicaSetsBySizeNewer) Len() int { return len(o) } 1032func (o ReplicaSetsBySizeNewer) Swap(i, j int) { o[i], o[j] = o[j], o[i] } 1033func (o ReplicaSetsBySizeNewer) Less(i, j int) bool { 1034 if *(o[i].Spec.Replicas) == *(o[j].Spec.Replicas) { 1035 return ReplicaSetsByCreationTimestamp(o).Less(j, i) 1036 } 1037 return *(o[i].Spec.Replicas) > *(o[j].Spec.Replicas) 1038} 1039 1040// AddOrUpdateTaintOnNode add taints to the node. If taint was added into node, it'll issue API calls 1041// to update nodes; otherwise, no API calls. Return error if any. 1042func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v1.Taint) error { 1043 if len(taints) == 0 { 1044 return nil 1045 } 1046 firstTry := true 1047 return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { 1048 var err error 1049 var oldNode *v1.Node 1050 // First we try getting node from the API server cache, as it's cheaper. If it fails 1051 // we get it from etcd to be sure to have fresh data. 1052 if firstTry { 1053 oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"}) 1054 firstTry = false 1055 } else { 1056 oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 1057 } 1058 if err != nil { 1059 return err 1060 } 1061 1062 var newNode *v1.Node 1063 oldNodeCopy := oldNode 1064 updated := false 1065 for _, taint := range taints { 1066 curNewNode, ok, err := taintutils.AddOrUpdateTaint(oldNodeCopy, taint) 1067 if err != nil { 1068 return fmt.Errorf("failed to update taint of node") 1069 } 1070 updated = updated || ok 1071 newNode = curNewNode 1072 oldNodeCopy = curNewNode 1073 } 1074 if !updated { 1075 return nil 1076 } 1077 return PatchNodeTaints(c, nodeName, oldNode, newNode) 1078 }) 1079} 1080 1081// RemoveTaintOffNode is for cleaning up taints temporarily added to node, 1082// won't fail if target taint doesn't exist or has been removed. 1083// If passed a node it'll check if there's anything to be done, if taint is not present it won't issue 1084// any API calls. 1085func RemoveTaintOffNode(c clientset.Interface, nodeName string, node *v1.Node, taints ...*v1.Taint) error { 1086 if len(taints) == 0 { 1087 return nil 1088 } 1089 // Short circuit for limiting amount of API calls. 1090 if node != nil { 1091 match := false 1092 for _, taint := range taints { 1093 if taintutils.TaintExists(node.Spec.Taints, taint) { 1094 match = true 1095 break 1096 } 1097 } 1098 if !match { 1099 return nil 1100 } 1101 } 1102 1103 firstTry := true 1104 return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { 1105 var err error 1106 var oldNode *v1.Node 1107 // First we try getting node from the API server cache, as it's cheaper. If it fails 1108 // we get it from etcd to be sure to have fresh data. 1109 if firstTry { 1110 oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"}) 1111 firstTry = false 1112 } else { 1113 oldNode, err = c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 1114 } 1115 if err != nil { 1116 return err 1117 } 1118 1119 var newNode *v1.Node 1120 oldNodeCopy := oldNode 1121 updated := false 1122 for _, taint := range taints { 1123 curNewNode, ok, err := taintutils.RemoveTaint(oldNodeCopy, taint) 1124 if err != nil { 1125 return fmt.Errorf("failed to remove taint of node") 1126 } 1127 updated = updated || ok 1128 newNode = curNewNode 1129 oldNodeCopy = curNewNode 1130 } 1131 if !updated { 1132 return nil 1133 } 1134 return PatchNodeTaints(c, nodeName, oldNode, newNode) 1135 }) 1136} 1137 1138// PatchNodeTaints patches node's taints. 1139func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { 1140 oldData, err := json.Marshal(oldNode) 1141 if err != nil { 1142 return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) 1143 } 1144 1145 newTaints := newNode.Spec.Taints 1146 newNodeClone := oldNode.DeepCopy() 1147 newNodeClone.Spec.Taints = newTaints 1148 newData, err := json.Marshal(newNodeClone) 1149 if err != nil { 1150 return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err) 1151 } 1152 1153 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) 1154 if err != nil { 1155 return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) 1156 } 1157 1158 _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) 1159 return err 1160} 1161 1162// ComputeHash returns a hash value calculated from pod template and 1163// a collisionCount to avoid hash collision. The hash will be safe encoded to 1164// avoid bad words. 1165func ComputeHash(template *v1.PodTemplateSpec, collisionCount *int32) string { 1166 podTemplateSpecHasher := fnv.New32a() 1167 hashutil.DeepHashObject(podTemplateSpecHasher, *template) 1168 1169 // Add collisionCount in the hash if it exists. 1170 if collisionCount != nil { 1171 collisionCountBytes := make([]byte, 8) 1172 binary.LittleEndian.PutUint32(collisionCountBytes, uint32(*collisionCount)) 1173 podTemplateSpecHasher.Write(collisionCountBytes) 1174 } 1175 1176 return rand.SafeEncodeString(fmt.Sprint(podTemplateSpecHasher.Sum32())) 1177} 1178 1179func AddOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, labelsToUpdate map[string]string) error { 1180 firstTry := true 1181 return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error { 1182 var err error 1183 var node *v1.Node 1184 // First we try getting node from the API server cache, as it's cheaper. If it fails 1185 // we get it from etcd to be sure to have fresh data. 1186 if firstTry { 1187 node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{ResourceVersion: "0"}) 1188 firstTry = false 1189 } else { 1190 node, err = kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 1191 } 1192 if err != nil { 1193 return err 1194 } 1195 1196 // Make a copy of the node and update the labels. 1197 newNode := node.DeepCopy() 1198 if newNode.Labels == nil { 1199 newNode.Labels = make(map[string]string) 1200 } 1201 for key, value := range labelsToUpdate { 1202 newNode.Labels[key] = value 1203 } 1204 1205 oldData, err := json.Marshal(node) 1206 if err != nil { 1207 return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err) 1208 } 1209 newData, err := json.Marshal(newNode) 1210 if err != nil { 1211 return fmt.Errorf("failed to marshal the new node %#v: %v", newNode, err) 1212 } 1213 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) 1214 if err != nil { 1215 return fmt.Errorf("failed to create a two-way merge patch: %v", err) 1216 } 1217 if _, err := kubeClient.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { 1218 return fmt.Errorf("failed to patch the node: %v", err) 1219 } 1220 return nil 1221 }) 1222} 1223