1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package statefulset
18
19import (
20	"context"
21	"fmt"
22	"reflect"
23	"time"
24
25	apps "k8s.io/api/apps/v1"
26	v1 "k8s.io/api/core/v1"
27	"k8s.io/apimachinery/pkg/api/errors"
28	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29	"k8s.io/apimachinery/pkg/labels"
30	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31	"k8s.io/apimachinery/pkg/util/wait"
32	utilfeature "k8s.io/apiserver/pkg/util/feature"
33	appsinformers "k8s.io/client-go/informers/apps/v1"
34	coreinformers "k8s.io/client-go/informers/core/v1"
35	clientset "k8s.io/client-go/kubernetes"
36	"k8s.io/client-go/kubernetes/scheme"
37	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
38	appslisters "k8s.io/client-go/listers/apps/v1"
39	corelisters "k8s.io/client-go/listers/core/v1"
40	"k8s.io/client-go/tools/cache"
41	"k8s.io/client-go/tools/record"
42	"k8s.io/client-go/util/workqueue"
43	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
44	"k8s.io/kubernetes/pkg/controller"
45	"k8s.io/kubernetes/pkg/controller/history"
46	"k8s.io/kubernetes/pkg/features"
47
48	"k8s.io/klog/v2"
49)
50
51// controllerKind contains the schema.GroupVersionKind for this controller type.
52var controllerKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
53
54// StatefulSetController controls statefulsets.
55type StatefulSetController struct {
56	// client interface
57	kubeClient clientset.Interface
58	// control returns an interface capable of syncing a stateful set.
59	// Abstracted out for testing.
60	control StatefulSetControlInterface
61	// podControl is used for patching pods.
62	podControl controller.PodControlInterface
63	// podLister is able to list/get pods from a shared informer's store
64	podLister corelisters.PodLister
65	// podListerSynced returns true if the pod shared informer has synced at least once
66	podListerSynced cache.InformerSynced
67	// setLister is able to list/get stateful sets from a shared informer's store
68	setLister appslisters.StatefulSetLister
69	// setListerSynced returns true if the stateful set shared informer has synced at least once
70	setListerSynced cache.InformerSynced
71	// pvcListerSynced returns true if the pvc shared informer has synced at least once
72	pvcListerSynced cache.InformerSynced
73	// revListerSynced returns true if the rev shared informer has synced at least once
74	revListerSynced cache.InformerSynced
75	// StatefulSets that need to be synced.
76	queue workqueue.RateLimitingInterface
77}
78
79// NewStatefulSetController creates a new statefulset controller.
80func NewStatefulSetController(
81	podInformer coreinformers.PodInformer,
82	setInformer appsinformers.StatefulSetInformer,
83	pvcInformer coreinformers.PersistentVolumeClaimInformer,
84	revInformer appsinformers.ControllerRevisionInformer,
85	kubeClient clientset.Interface,
86) *StatefulSetController {
87	eventBroadcaster := record.NewBroadcaster()
88	eventBroadcaster.StartStructuredLogging(0)
89	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
90	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
91	ssc := &StatefulSetController{
92		kubeClient: kubeClient,
93		control: NewDefaultStatefulSetControl(
94			NewRealStatefulPodControl(
95				kubeClient,
96				setInformer.Lister(),
97				podInformer.Lister(),
98				pvcInformer.Lister(),
99				recorder),
100			NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
101			history.NewHistory(kubeClient, revInformer.Lister()),
102			recorder,
103		),
104		pvcListerSynced: pvcInformer.Informer().HasSynced,
105		queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
106		podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
107
108		revListerSynced: revInformer.Informer().HasSynced,
109	}
110
111	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
112		// lookup the statefulset and enqueue
113		AddFunc: ssc.addPod,
114		// lookup current and old statefulset if labels changed
115		UpdateFunc: ssc.updatePod,
116		// lookup statefulset accounting for deletion tombstones
117		DeleteFunc: ssc.deletePod,
118	})
119	ssc.podLister = podInformer.Lister()
120	ssc.podListerSynced = podInformer.Informer().HasSynced
121
122	setInformer.Informer().AddEventHandler(
123		cache.ResourceEventHandlerFuncs{
124			AddFunc: ssc.enqueueStatefulSet,
125			UpdateFunc: func(old, cur interface{}) {
126				oldPS := old.(*apps.StatefulSet)
127				curPS := cur.(*apps.StatefulSet)
128				if oldPS.Status.Replicas != curPS.Status.Replicas {
129					klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
130				}
131				ssc.enqueueStatefulSet(cur)
132			},
133			DeleteFunc: ssc.enqueueStatefulSet,
134		},
135	)
136	ssc.setLister = setInformer.Lister()
137	ssc.setListerSynced = setInformer.Informer().HasSynced
138
139	// TODO: Watch volumes
140	return ssc
141}
142
143// Run runs the statefulset controller.
144func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
145	defer utilruntime.HandleCrash()
146	defer ssc.queue.ShutDown()
147
148	klog.Infof("Starting stateful set controller")
149	defer klog.Infof("Shutting down statefulset controller")
150
151	if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
152		return
153	}
154
155	for i := 0; i < workers; i++ {
156		go wait.Until(ssc.worker, time.Second, stopCh)
157	}
158
159	<-stopCh
160}
161
162// addPod adds the statefulset for the pod to the sync queue
163func (ssc *StatefulSetController) addPod(obj interface{}) {
164	pod := obj.(*v1.Pod)
165
166	if pod.DeletionTimestamp != nil {
167		// on a restart of the controller manager, it's possible a new pod shows up in a state that
168		// is already pending deletion. Prevent the pod from being a creation observation.
169		ssc.deletePod(pod)
170		return
171	}
172
173	// If it has a ControllerRef, that's all that matters.
174	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
175		set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
176		if set == nil {
177			return
178		}
179		klog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
180		ssc.enqueueStatefulSet(set)
181		return
182	}
183
184	// Otherwise, it's an orphan. Get a list of all matching controllers and sync
185	// them to see if anyone wants to adopt it.
186	sets := ssc.getStatefulSetsForPod(pod)
187	if len(sets) == 0 {
188		return
189	}
190	klog.V(4).Infof("Orphan Pod %s created, labels: %+v", pod.Name, pod.Labels)
191	for _, set := range sets {
192		ssc.enqueueStatefulSet(set)
193	}
194}
195
196// updatePod adds the statefulset for the current and old pods to the sync queue.
197func (ssc *StatefulSetController) updatePod(old, cur interface{}) {
198	curPod := cur.(*v1.Pod)
199	oldPod := old.(*v1.Pod)
200	if curPod.ResourceVersion == oldPod.ResourceVersion {
201		// In the event of a re-list we may receive update events for all known pods.
202		// Two different versions of the same pod will always have different RVs.
203		return
204	}
205
206	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
207
208	curControllerRef := metav1.GetControllerOf(curPod)
209	oldControllerRef := metav1.GetControllerOf(oldPod)
210	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
211	if controllerRefChanged && oldControllerRef != nil {
212		// The ControllerRef was changed. Sync the old controller, if any.
213		if set := ssc.resolveControllerRef(oldPod.Namespace, oldControllerRef); set != nil {
214			ssc.enqueueStatefulSet(set)
215		}
216	}
217
218	// If it has a ControllerRef, that's all that matters.
219	if curControllerRef != nil {
220		set := ssc.resolveControllerRef(curPod.Namespace, curControllerRef)
221		if set == nil {
222			return
223		}
224		klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
225		ssc.enqueueStatefulSet(set)
226		// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
227		// the Pod status which in turn will trigger a requeue of the owning replica set thus
228		// having its status updated with the newly available replica.
229		if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
230			klog.V(2).Infof("StatefulSet %s will be enqueued after %ds for availability check", set.Name, set.Spec.MinReadySeconds)
231			// Add a second to avoid milliseconds skew in AddAfter.
232			// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
233			ssc.enqueueSSAfter(set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
234		}
235		return
236	}
237
238	// Otherwise, it's an orphan. If anything changed, sync matching controllers
239	// to see if anyone wants to adopt it now.
240	if labelChanged || controllerRefChanged {
241		sets := ssc.getStatefulSetsForPod(curPod)
242		if len(sets) == 0 {
243			return
244		}
245		klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
246		for _, set := range sets {
247			ssc.enqueueStatefulSet(set)
248		}
249	}
250}
251
252// deletePod enqueues the statefulset for the pod accounting for deletion tombstones.
253func (ssc *StatefulSetController) deletePod(obj interface{}) {
254	pod, ok := obj.(*v1.Pod)
255
256	// When a delete is dropped, the relist will notice a pod in the store not
257	// in the list, leading to the insertion of a tombstone object which contains
258	// the deleted key/value. Note that this value might be stale.
259	if !ok {
260		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
261		if !ok {
262			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
263			return
264		}
265		pod, ok = tombstone.Obj.(*v1.Pod)
266		if !ok {
267			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
268			return
269		}
270	}
271
272	controllerRef := metav1.GetControllerOf(pod)
273	if controllerRef == nil {
274		// No controller should care about orphans being deleted.
275		return
276	}
277	set := ssc.resolveControllerRef(pod.Namespace, controllerRef)
278	if set == nil {
279		return
280	}
281	klog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
282	ssc.enqueueStatefulSet(set)
283}
284
285// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
286// It also reconciles ControllerRef by adopting/orphaning.
287//
288// NOTE: Returned Pods are pointers to objects from the cache.
289//       If you need to modify one, you need to copy it first.
290func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
291	// List all pods to include the pods that don't match the selector anymore but
292	// has a ControllerRef pointing to this StatefulSet.
293	pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
294	if err != nil {
295		return nil, err
296	}
297
298	filter := func(pod *v1.Pod) bool {
299		// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
300		return isMemberOf(set, pod)
301	}
302
303	cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, ssc.canAdoptFunc(set))
304	return cm.ClaimPods(pods, filter)
305}
306
307// If any adoptions are attempted, we should first recheck for deletion with
308// an uncached quorum read sometime after listing Pods/ControllerRevisions (see #42639).
309func (ssc *StatefulSetController) canAdoptFunc(set *apps.StatefulSet) func() error {
310	return controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
311		fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(context.TODO(), set.Name, metav1.GetOptions{})
312		if err != nil {
313			return nil, err
314		}
315		if fresh.UID != set.UID {
316			return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID)
317		}
318		return fresh, nil
319	})
320}
321
322// adoptOrphanRevisions adopts any orphaned ControllerRevisions matched by set's Selector.
323func (ssc *StatefulSetController) adoptOrphanRevisions(set *apps.StatefulSet) error {
324	revisions, err := ssc.control.ListRevisions(set)
325	if err != nil {
326		return err
327	}
328	orphanRevisions := make([]*apps.ControllerRevision, 0)
329	for i := range revisions {
330		if metav1.GetControllerOf(revisions[i]) == nil {
331			orphanRevisions = append(orphanRevisions, revisions[i])
332		}
333	}
334	if len(orphanRevisions) > 0 {
335		canAdoptErr := ssc.canAdoptFunc(set)()
336		if canAdoptErr != nil {
337			return fmt.Errorf("can't adopt ControllerRevisions: %v", canAdoptErr)
338		}
339		return ssc.control.AdoptOrphanRevisions(set, orphanRevisions)
340	}
341	return nil
342}
343
344// getStatefulSetsForPod returns a list of StatefulSets that potentially match
345// a given pod.
346func (ssc *StatefulSetController) getStatefulSetsForPod(pod *v1.Pod) []*apps.StatefulSet {
347	sets, err := ssc.setLister.GetPodStatefulSets(pod)
348	if err != nil {
349		return nil
350	}
351	// More than one set is selecting the same Pod
352	if len(sets) > 1 {
353		// ControllerRef will ensure we don't do anything crazy, but more than one
354		// item in this list nevertheless constitutes user error.
355		utilruntime.HandleError(
356			fmt.Errorf(
357				"user error: more than one StatefulSet is selecting pods with labels: %+v",
358				pod.Labels))
359	}
360	return sets
361}
362
363// resolveControllerRef returns the controller referenced by a ControllerRef,
364// or nil if the ControllerRef could not be resolved to a matching controller
365// of the correct Kind.
366func (ssc *StatefulSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.StatefulSet {
367	// We can't look up by UID, so look up by Name and then verify UID.
368	// Don't even try to look up by Name if it's the wrong Kind.
369	if controllerRef.Kind != controllerKind.Kind {
370		return nil
371	}
372	set, err := ssc.setLister.StatefulSets(namespace).Get(controllerRef.Name)
373	if err != nil {
374		return nil
375	}
376	if set.UID != controllerRef.UID {
377		// The controller we found with this Name is not the same one that the
378		// ControllerRef points to.
379		return nil
380	}
381	return set
382}
383
384// enqueueStatefulSet enqueues the given statefulset in the work queue.
385func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
386	key, err := controller.KeyFunc(obj)
387	if err != nil {
388		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
389		return
390	}
391	ssc.queue.Add(key)
392}
393
394// enqueueStatefulSet enqueues the given statefulset in the work queue after given time
395func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
396	key, err := controller.KeyFunc(ss)
397	if err != nil {
398		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
399		return
400	}
401	ssc.queue.AddAfter(key, duration)
402}
403
404// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
405// invoked concurrently with the same key.
406func (ssc *StatefulSetController) processNextWorkItem() bool {
407	key, quit := ssc.queue.Get()
408	if quit {
409		return false
410	}
411	defer ssc.queue.Done(key)
412	if err := ssc.sync(key.(string)); err != nil {
413		utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
414		ssc.queue.AddRateLimited(key)
415	} else {
416		ssc.queue.Forget(key)
417	}
418	return true
419}
420
421// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
422func (ssc *StatefulSetController) worker() {
423	for ssc.processNextWorkItem() {
424	}
425}
426
427// sync syncs the given statefulset.
428func (ssc *StatefulSetController) sync(key string) error {
429	startTime := time.Now()
430	defer func() {
431		klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
432	}()
433
434	namespace, name, err := cache.SplitMetaNamespaceKey(key)
435	if err != nil {
436		return err
437	}
438	set, err := ssc.setLister.StatefulSets(namespace).Get(name)
439	if errors.IsNotFound(err) {
440		klog.Infof("StatefulSet has been deleted %v", key)
441		return nil
442	}
443	if err != nil {
444		utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
445		return err
446	}
447
448	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
449	if err != nil {
450		utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
451		// This is a non-transient error, so don't retry.
452		return nil
453	}
454
455	if err := ssc.adoptOrphanRevisions(set); err != nil {
456		return err
457	}
458
459	pods, err := ssc.getPodsForStatefulSet(set, selector)
460	if err != nil {
461		return err
462	}
463
464	return ssc.syncStatefulSet(set, pods)
465}
466
467// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
468func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
469	klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
470	var status *apps.StatefulSetStatus
471	var err error
472	// TODO: investigate where we mutate the set during the update as it is not obvious.
473	status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
474	if err != nil {
475		return err
476	}
477	klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
478	// One more sync to handle the clock skew. This is also helping in requeuing right after status update
479	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
480		ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
481	}
482
483	return nil
484}
485