1/*
2Copyright 2020 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package storageversiongc
18
19import (
20	"context"
21	"fmt"
22	"time"
23
24	apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
25	coordinationv1 "k8s.io/api/coordination/v1"
26	apierrors "k8s.io/apimachinery/pkg/api/errors"
27	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28	utilerrors "k8s.io/apimachinery/pkg/util/errors"
29	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30	"k8s.io/apimachinery/pkg/util/wait"
31	"k8s.io/apiserver/pkg/storageversion"
32	apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1"
33	coordinformers "k8s.io/client-go/informers/coordination/v1"
34	"k8s.io/client-go/kubernetes"
35	coordlisters "k8s.io/client-go/listers/coordination/v1"
36	"k8s.io/client-go/tools/cache"
37	"k8s.io/client-go/util/workqueue"
38	"k8s.io/kubernetes/pkg/controlplane"
39
40	"k8s.io/klog/v2"
41)
42
43// Controller watches kube-apiserver leases and storageversions, and delete stale
44// storage version entries and objects.
45type Controller struct {
46	kubeclientset kubernetes.Interface
47
48	leaseLister  coordlisters.LeaseLister
49	leasesSynced cache.InformerSynced
50
51	storageVersionSynced cache.InformerSynced
52
53	leaseQueue          workqueue.RateLimitingInterface
54	storageVersionQueue workqueue.RateLimitingInterface
55}
56
57// NewStorageVersionGC creates a new Controller.
58func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
59	c := &Controller{
60		kubeclientset:        clientset,
61		leaseLister:          leaseInformer.Lister(),
62		leasesSynced:         leaseInformer.Informer().HasSynced,
63		storageVersionSynced: storageVersionInformer.Informer().HasSynced,
64		leaseQueue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
65		storageVersionQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
66	}
67
68	leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
69		DeleteFunc: c.onDeleteLease,
70	})
71	// use the default resync period from the informer
72	storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
73		AddFunc:    c.onAddStorageVersion,
74		UpdateFunc: c.onUpdateStorageVersion,
75	})
76
77	return c
78}
79
80// Run starts one worker.
81func (c *Controller) Run(stopCh <-chan struct{}) {
82	defer utilruntime.HandleCrash()
83	defer c.leaseQueue.ShutDown()
84	defer c.storageVersionQueue.ShutDown()
85	defer klog.Infof("Shutting down storage version garbage collector")
86
87	klog.Infof("Starting storage version garbage collector")
88
89	if !cache.WaitForCacheSync(stopCh, c.leasesSynced, c.storageVersionSynced) {
90		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
91		return
92	}
93
94	// Identity lease deletion and storageversion update don't happen too often. Start one
95	// worker for each of them.
96	// runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker
97	// handles storageversion creation/update with non-existing id. The latter should rarely
98	// happen. It's okay for the two workers to conflict on update.
99	go wait.Until(c.runLeaseWorker, time.Second, stopCh)
100	go wait.Until(c.runStorageVersionWorker, time.Second, stopCh)
101
102	<-stopCh
103}
104
105func (c *Controller) runLeaseWorker() {
106	for c.processNextLease() {
107	}
108}
109
110func (c *Controller) processNextLease() bool {
111	key, quit := c.leaseQueue.Get()
112	if quit {
113		return false
114	}
115	defer c.leaseQueue.Done(key)
116
117	err := c.processDeletedLease(key.(string))
118	if err == nil {
119		c.leaseQueue.Forget(key)
120		return true
121	}
122
123	utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err))
124	c.leaseQueue.AddRateLimited(key)
125	return true
126}
127
128func (c *Controller) runStorageVersionWorker() {
129	for c.processNextStorageVersion() {
130	}
131}
132
133func (c *Controller) processNextStorageVersion() bool {
134	key, quit := c.storageVersionQueue.Get()
135	if quit {
136		return false
137	}
138	defer c.storageVersionQueue.Done(key)
139
140	err := c.syncStorageVersion(key.(string))
141	if err == nil {
142		c.storageVersionQueue.Forget(key)
143		return true
144	}
145
146	utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err))
147	c.storageVersionQueue.AddRateLimited(key)
148	return true
149}
150
151func (c *Controller) processDeletedLease(name string) error {
152	_, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{})
153	// the lease isn't deleted, nothing we need to do here
154	if err == nil {
155		return nil
156	}
157	if !apierrors.IsNotFound(err) {
158		return err
159	}
160	// the frequency of this call won't be too high because we only trigger on identity lease deletions
161	storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(context.TODO(), metav1.ListOptions{})
162	if err != nil {
163		return err
164	}
165
166	var errors []error
167	for _, sv := range storageVersionList.Items {
168		var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
169		hasStaleRecord := false
170		for _, ssv := range sv.Status.StorageVersions {
171			if ssv.APIServerID == name {
172				hasStaleRecord = true
173				continue
174			}
175			serverStorageVersions = append(serverStorageVersions, ssv)
176		}
177		if !hasStaleRecord {
178			continue
179		}
180		if err := c.updateOrDeleteStorageVersion(&sv, serverStorageVersions); err != nil {
181			errors = append(errors, err)
182		}
183	}
184
185	return utilerrors.NewAggregate(errors)
186}
187
188func (c *Controller) syncStorageVersion(name string) error {
189	sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(context.TODO(), name, metav1.GetOptions{})
190	if apierrors.IsNotFound(err) {
191		// The problematic storage version that was added/updated recently is gone.
192		// Nothing we need to do here.
193		return nil
194	}
195	if err != nil {
196		return err
197	}
198
199	hasInvalidID := false
200	var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
201	for _, v := range sv.Status.StorageVersions {
202		lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), v.APIServerID, metav1.GetOptions{})
203		if err != nil || lease == nil || lease.Labels == nil ||
204			lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
205			// We cannot find a corresponding identity lease from apiserver as well.
206			// We need to clean up this storage version.
207			hasInvalidID = true
208			continue
209		}
210		serverStorageVersions = append(serverStorageVersions, v)
211	}
212	if !hasInvalidID {
213		return nil
214	}
215	return c.updateOrDeleteStorageVersion(sv, serverStorageVersions)
216}
217
218func (c *Controller) onAddStorageVersion(obj interface{}) {
219	castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
220	c.enqueueStorageVersion(castObj)
221}
222
223func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) {
224	castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
225	c.enqueueStorageVersion(castNewObj)
226}
227
228// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
229func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) {
230	for _, sv := range obj.Status.StorageVersions {
231		lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
232		if err != nil || lease == nil || lease.Labels == nil ||
233			lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
234			// we cannot find a corresponding identity lease in cache, enqueue the storageversion
235			klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name)
236			c.storageVersionQueue.Add(obj.Name)
237			return
238		}
239	}
240}
241
242func (c *Controller) onDeleteLease(obj interface{}) {
243	castObj, ok := obj.(*coordinationv1.Lease)
244	if !ok {
245		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
246		if !ok {
247			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
248			return
249		}
250		castObj, ok = tombstone.Obj.(*coordinationv1.Lease)
251		if !ok {
252			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj))
253			return
254		}
255	}
256
257	if castObj.Namespace == metav1.NamespaceSystem &&
258		castObj.Labels != nil &&
259		castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
260		klog.V(4).Infof("Observed lease %s deleted", castObj.Name)
261		c.enqueueLease(castObj)
262	}
263}
264
265func (c *Controller) enqueueLease(obj *coordinationv1.Lease) {
266	c.leaseQueue.Add(obj.Name)
267}
268
269func (c *Controller) updateOrDeleteStorageVersion(sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error {
270	if len(serverStorageVersions) == 0 {
271		return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete(
272			context.TODO(), sv.Name, metav1.DeleteOptions{})
273	}
274	sv.Status.StorageVersions = serverStorageVersions
275	storageversion.SetCommonEncodingVersion(sv)
276	_, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus(
277		context.TODO(), sv, metav1.UpdateOptions{})
278	return err
279}
280