1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package statefulset 18 19import ( 20 "context" 21 "fmt" 22 "reflect" 23 "time" 24 25 apps "k8s.io/api/apps/v1" 26 v1 "k8s.io/api/core/v1" 27 "k8s.io/apimachinery/pkg/api/errors" 28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 29 "k8s.io/apimachinery/pkg/labels" 30 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 31 "k8s.io/apimachinery/pkg/util/wait" 32 utilfeature "k8s.io/apiserver/pkg/util/feature" 33 appsinformers "k8s.io/client-go/informers/apps/v1" 34 coreinformers "k8s.io/client-go/informers/core/v1" 35 clientset "k8s.io/client-go/kubernetes" 36 "k8s.io/client-go/kubernetes/scheme" 37 v1core "k8s.io/client-go/kubernetes/typed/core/v1" 38 appslisters "k8s.io/client-go/listers/apps/v1" 39 corelisters "k8s.io/client-go/listers/core/v1" 40 "k8s.io/client-go/tools/cache" 41 "k8s.io/client-go/tools/record" 42 "k8s.io/client-go/util/workqueue" 43 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 44 "k8s.io/kubernetes/pkg/controller" 45 "k8s.io/kubernetes/pkg/controller/history" 46 "k8s.io/kubernetes/pkg/features" 47 48 "k8s.io/klog/v2" 49) 50 51// controllerKind contains the schema.GroupVersionKind for this controller type. 52var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet") 53 54// StatefulSetController controls statefulsets. 55type StatefulSetController struct { 56 // client interface 57 kubeClient clientset.Interface 58 // control returns an interface capable of syncing a stateful set. 59 // Abstracted out for testing. 60 control StatefulSetControlInterface 61 // podControl is used for patching pods. 62 podControl controller.PodControlInterface 63 // podLister is able to list/get pods from a shared informer's store 64 podLister corelisters.PodLister 65 // podListerSynced returns true if the pod shared informer has synced at least once 66 podListerSynced cache.InformerSynced 67 // setLister is able to list/get stateful sets from a shared informer's store 68 setLister appslisters.StatefulSetLister 69 // setListerSynced returns true if the stateful set shared informer has synced at least once 70 setListerSynced cache.InformerSynced 71 // pvcListerSynced returns true if the pvc shared informer has synced at least once 72 pvcListerSynced cache.InformerSynced 73 // revListerSynced returns true if the rev shared informer has synced at least once 74 revListerSynced cache.InformerSynced 75 // StatefulSets that need to be synced. 76 queue workqueue.RateLimitingInterface 77} 78 79// NewStatefulSetController creates a new statefulset controller. 80func NewStatefulSetController( 81 podInformer coreinformers.PodInformer, 82 setInformer appsinformers.StatefulSetInformer, 83 pvcInformer coreinformers.PersistentVolumeClaimInformer, 84 revInformer appsinformers.ControllerRevisionInformer, 85 kubeClient clientset.Interface, 86) *StatefulSetController { 87 eventBroadcaster := record.NewBroadcaster() 88 eventBroadcaster.StartStructuredLogging(0) 89 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) 90 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"}) 91 ssc := &StatefulSetController{ 92 kubeClient: kubeClient, 93 control: NewDefaultStatefulSetControl( 94 NewRealStatefulPodControl( 95 kubeClient, 96 setInformer.Lister(), 97 podInformer.Lister(), 98 pvcInformer.Lister(), 99 recorder), 100 NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()), 101 history.NewHistory(kubeClient, revInformer.Lister()), 102 recorder, 103 ), 104 pvcListerSynced: pvcInformer.Informer().HasSynced, 105 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), 106 podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder}, 107 108 revListerSynced: revInformer.Informer().HasSynced, 109 } 110 111 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 112 // lookup the statefulset and enqueue 113 AddFunc: ssc.addPod, 114 // lookup current and old statefulset if labels changed 115 UpdateFunc: ssc.updatePod, 116 // lookup statefulset accounting for deletion tombstones 117 DeleteFunc: ssc.deletePod, 118 }) 119 ssc.podLister = podInformer.Lister() 120 ssc.podListerSynced = podInformer.Informer().HasSynced 121 122 setInformer.Informer().AddEventHandler( 123 cache.ResourceEventHandlerFuncs{ 124 AddFunc: ssc.enqueueStatefulSet, 125 UpdateFunc: func(old, cur interface{}) { 126 oldPS := old.(*apps.StatefulSet) 127 curPS := cur.(*apps.StatefulSet) 128 if oldPS.Status.Replicas != curPS.Status.Replicas { 129 klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas) 130 } 131 ssc.enqueueStatefulSet(cur) 132 }, 133 DeleteFunc: ssc.enqueueStatefulSet, 134 }, 135 ) 136 ssc.setLister = setInformer.Lister() 137 ssc.setListerSynced = setInformer.Informer().HasSynced 138 139 // TODO: Watch volumes 140 return ssc 141} 142 143// Run runs the statefulset controller. 144func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) { 145 defer utilruntime.HandleCrash() 146 defer ssc.queue.ShutDown() 147 148 klog.Infof("Starting stateful set controller") 149 defer klog.Infof("Shutting down statefulset controller") 150 151 if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { 152 return 153 } 154 155 for i := 0; i < workers; i++ { 156 go wait.Until(ssc.worker, time.Second, stopCh) 157 } 158 159 <-stopCh 160} 161 162// addPod adds the statefulset for the pod to the sync queue 163func (ssc *StatefulSetController) addPod(obj interface{}) { 164 pod := obj.(*v1.Pod) 165 166 if pod.DeletionTimestamp != nil { 167 // on a restart of the controller manager, it's possible a new pod shows up in a state that 168 // is already pending deletion. Prevent the pod from being a creation observation. 169 ssc.deletePod(pod) 170 return 171 } 172 173 // If it has a ControllerRef, that's all that matters. 174 if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil { 175 set := ssc.resolveControllerRef(pod.Namespace, controllerRef) 176 if set == nil { 177 return 178 } 179 klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels) 180 ssc.enqueueStatefulSet(set) 181 return 182 } 183 184 // Otherwise, it's an orphan. Get a list of all matching controllers and sync 185 // them to see if anyone wants to adopt it. 186 sets := ssc.getStatefulSetsForPod(pod) 187 if len(sets) == 0 { 188 return 189 } 190 klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels) 191 for _, set := range sets { 192 ssc.enqueueStatefulSet(set) 193 } 194} 195 196// updatePod adds the statefulset for the current and old pods to the sync queue. 197func (ssc *StatefulSetController) updatePod(old, cur interface{}) { 198 curPod := cur.(*v1.Pod) 199 oldPod := old.(*v1.Pod) 200 if curPod.ResourceVersion == oldPod.ResourceVersion { 201 // In the event of a re-list we may receive update events for all known pods. 202 // Two different versions of the same pod will always have different RVs. 203 return 204 } 205 206 labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) 207 208 curControllerRef := metav1.GetControllerOf(curPod) 209 oldControllerRef := metav1.GetControllerOf(oldPod) 210 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) 211 if controllerRefChanged && oldControllerRef != nil { 212 // The ControllerRef was changed. Sync the old controller, if any. 213 if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil { 214 ssc.enqueueStatefulSet(set) 215 } 216 } 217 218 // If it has a ControllerRef, that's all that matters. 219 if curControllerRef != nil { 220 set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef) 221 if set == nil { 222 return 223 } 224 klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) 225 ssc.enqueueStatefulSet(set) 226 // TODO: MinReadySeconds in the Pod will generate an Available condition to be added in 227 // the Pod status which in turn will trigger a requeue of the owning replica set thus 228 // having its status updated with the newly available replica. 229 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 { 230 klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds) 231 // Add a second to avoid milliseconds skew in AddAfter. 232 // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info. 233 ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second) 234 } 235 return 236 } 237 238 // Otherwise, it's an orphan. If anything changed, sync matching controllers 239 // to see if anyone wants to adopt it now. 240 if labelChanged || controllerRefChanged { 241 sets := ssc.getStatefulSetsForPod(curPod) 242 if len(sets) == 0 { 243 return 244 } 245 klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta) 246 for _, set := range sets { 247 ssc.enqueueStatefulSet(set) 248 } 249 } 250} 251 252// deletePod enqueues the statefulset for the pod accounting for deletion tombstones. 253func (ssc *StatefulSetController) deletePod(obj interface{}) { 254 pod, ok := obj.(*v1.Pod) 255 256 // When a delete is dropped, the relist will notice a pod in the store not 257 // in the list, leading to the insertion of a tombstone object which contains 258 // the deleted key/value. Note that this value might be stale. 259 if !ok { 260 tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 261 if !ok { 262 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj)) 263 return 264 } 265 pod, ok = tombstone.Obj.(*v1.Pod) 266 if !ok { 267 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj)) 268 return 269 } 270 } 271 272 controllerRef := metav1.GetControllerOf(pod) 273 if controllerRef == nil { 274 // No controller should care about orphans being deleted. 275 return 276 } 277 set := ssc.resolveControllerRef(pod.Namespace, controllerRef) 278 if set == nil { 279 return 280 } 281 klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller()) 282 ssc.enqueueStatefulSet(set) 283} 284 285// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage. 286// It also reconciles ControllerRef by adopting/orphaning. 287// 288// NOTE: Returned Pods are pointers to objects from the cache. 289// If you need to modify one, you need to copy it first. 290func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) { 291 // List all pods to include the pods that don't match the selector anymore but 292 // has a ControllerRef pointing to this StatefulSet. 293 pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything()) 294 if err != nil { 295 return nil, err 296 } 297 298 filter := func(pod *v1.Pod) bool { 299 // Only claim if it matches our StatefulSet name. Otherwise release/ignore. 300 return isMemberOf(set, pod) 301 } 302 303 cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(set)) 304 return cm.ClaimPods(pods, filter) 305} 306 307// If any adoptions are attempted, we should first recheck for deletion with 308// an uncached quorum read sometime after listing Pods/ControllerRevisions (see #42639). 309func (ssc *StatefulSetController) canAdoptFunc(set *apps.StatefulSet) func() error { 310 return controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { 311 fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(context.TODO(), set.Name, metav1.GetOptions{}) 312 if err != nil { 313 return nil, err 314 } 315 if fresh.UID != set.UID { 316 return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID) 317 } 318 return fresh, nil 319 }) 320} 321 322// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector. 323func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error { 324 revisions, err := ssc.control.ListRevisions(set) 325 if err != nil { 326 return err 327 } 328 orphanRevisions := make([]*apps.ControllerRevision, 0) 329 for i := range revisions { 330 if metav1.GetControllerOf(revisions[i]) == nil { 331 orphanRevisions = append(orphanRevisions, revisions[i]) 332 } 333 } 334 if len(orphanRevisions) > 0 { 335 canAdoptErr := ssc.canAdoptFunc(set)() 336 if canAdoptErr != nil { 337 return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr) 338 } 339 return ssc.control.AdoptOrphanRevisions(set, orphanRevisions) 340 } 341 return nil 342} 343 344// getStatefulSetsForPod returns a list of StatefulSets that potentially match 345// a given pod. 346func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet { 347 sets, err := ssc.setLister.GetPodStatefulSets(pod) 348 if err != nil { 349 return nil 350 } 351 // More than one set is selecting the same Pod 352 if len(sets) > 1 { 353 // ControllerRef will ensure we don't do anything crazy, but more than one 354 // item in this list nevertheless constitutes user error. 355 utilruntime.HandleError( 356 fmt.Errorf( 357 "user error: more than one StatefulSet is selecting pods with labels: %+v", 358 pod.Labels)) 359 } 360 return sets 361} 362 363// resolveControllerRef returns the controller referenced by a ControllerRef, 364// or nil if the ControllerRef could not be resolved to a matching controller 365// of the correct Kind. 366func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet { 367 // We can't look up by UID, so look up by Name and then verify UID. 368 // Don't even try to look up by Name if it's the wrong Kind. 369 if controllerRef.Kind != controllerKind.Kind { 370 return nil 371 } 372 set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name) 373 if err != nil { 374 return nil 375 } 376 if set.UID != controllerRef.UID { 377 // The controller we found with this Name is not the same one that the 378 // ControllerRef points to. 379 return nil 380 } 381 return set 382} 383 384// enqueueStatefulSet enqueues the given statefulset in the work queue. 385func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) { 386 key, err := controller.KeyFunc(obj) 387 if err != nil { 388 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) 389 return 390 } 391 ssc.queue.Add(key) 392} 393 394// enqueueStatefulSet enqueues the given statefulset in the work queue after given time 395func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) { 396 key, err := controller.KeyFunc(ss) 397 if err != nil { 398 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err)) 399 return 400 } 401 ssc.queue.AddAfter(key, duration) 402} 403 404// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never 405// invoked concurrently with the same key. 406func (ssc *StatefulSetController) processNextWorkItem() bool { 407 key, quit := ssc.queue.Get() 408 if quit { 409 return false 410 } 411 defer ssc.queue.Done(key) 412 if err := ssc.sync(key.(string)); err != nil { 413 utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err)) 414 ssc.queue.AddRateLimited(key) 415 } else { 416 ssc.queue.Forget(key) 417 } 418 return true 419} 420 421// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed 422func (ssc *StatefulSetController) worker() { 423 for ssc.processNextWorkItem() { 424 } 425} 426 427// sync syncs the given statefulset. 428func (ssc *StatefulSetController) sync(key string) error { 429 startTime := time.Now() 430 defer func() { 431 klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime)) 432 }() 433 434 namespace, name, err := cache.SplitMetaNamespaceKey(key) 435 if err != nil { 436 return err 437 } 438 set, err := ssc.setLister.StatefulSets(namespace).Get(name) 439 if errors.IsNotFound(err) { 440 klog.Infof("StatefulSet has been deleted %v", key) 441 return nil 442 } 443 if err != nil { 444 utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) 445 return err 446 } 447 448 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) 449 if err != nil { 450 utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) 451 // This is a non-transient error, so don't retry. 452 return nil 453 } 454 455 if err := ssc.adoptOrphanRevisions(set); err != nil { 456 return err 457 } 458 459 pods, err := ssc.getPodsForStatefulSet(set, selector) 460 if err != nil { 461 return err 462 } 463 464 return ssc.syncStatefulSet(set, pods) 465} 466 467// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod). 468func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error { 469 klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods)) 470 var status *apps.StatefulSetStatus 471 var err error 472 // TODO: investigate where we mutate the set during the update as it is not obvious. 473 status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods) 474 if err != nil { 475 return err 476 } 477 klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name) 478 // One more sync to handle the clock skew. This is also helping in requeuing right after status update 479 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas { 480 ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second) 481 } 482 483 return nil 484} 485