1/* 2Copyright 2020 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package storageversiongc 18 19import ( 20 "context" 21 "fmt" 22 "time" 23 24 apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" 25 coordinationv1 "k8s.io/api/coordination/v1" 26 apierrors "k8s.io/apimachinery/pkg/api/errors" 27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 28 utilerrors "k8s.io/apimachinery/pkg/util/errors" 29 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 30 "k8s.io/apimachinery/pkg/util/wait" 31 "k8s.io/apiserver/pkg/storageversion" 32 apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1" 33 coordinformers "k8s.io/client-go/informers/coordination/v1" 34 "k8s.io/client-go/kubernetes" 35 coordlisters "k8s.io/client-go/listers/coordination/v1" 36 "k8s.io/client-go/tools/cache" 37 "k8s.io/client-go/util/workqueue" 38 "k8s.io/kubernetes/pkg/controlplane" 39 40 "k8s.io/klog/v2" 41) 42 43// Controller watches kube-apiserver leases and storageversions, and delete stale 44// storage version entries and objects. 45type Controller struct { 46 kubeclientset kubernetes.Interface 47 48 leaseLister coordlisters.LeaseLister 49 leasesSynced cache.InformerSynced 50 51 storageVersionSynced cache.InformerSynced 52 53 leaseQueue workqueue.RateLimitingInterface 54 storageVersionQueue workqueue.RateLimitingInterface 55} 56 57// NewStorageVersionGC creates a new Controller. 58func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { 59 c := &Controller{ 60 kubeclientset: clientset, 61 leaseLister: leaseInformer.Lister(), 62 leasesSynced: leaseInformer.Informer().HasSynced, 63 storageVersionSynced: storageVersionInformer.Informer().HasSynced, 64 leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"), 65 storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"), 66 } 67 68 leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 69 DeleteFunc: c.onDeleteLease, 70 }) 71 // use the default resync period from the informer 72 storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 73 AddFunc: c.onAddStorageVersion, 74 UpdateFunc: c.onUpdateStorageVersion, 75 }) 76 77 return c 78} 79 80// Run starts one worker. 81func (c *Controller) Run(stopCh <-chan struct{}) { 82 defer utilruntime.HandleCrash() 83 defer c.leaseQueue.ShutDown() 84 defer c.storageVersionQueue.ShutDown() 85 defer klog.Infof("Shutting down storage version garbage collector") 86 87 klog.Infof("Starting storage version garbage collector") 88 89 if !cache.WaitForCacheSync(stopCh, c.leasesSynced, c.storageVersionSynced) { 90 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) 91 return 92 } 93 94 // Identity lease deletion and storageversion update don't happen too often. Start one 95 // worker for each of them. 96 // runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker 97 // handles storageversion creation/update with non-existing id. The latter should rarely 98 // happen. It's okay for the two workers to conflict on update. 99 go wait.Until(c.runLeaseWorker, time.Second, stopCh) 100 go wait.Until(c.runStorageVersionWorker, time.Second, stopCh) 101 102 <-stopCh 103} 104 105func (c *Controller) runLeaseWorker() { 106 for c.processNextLease() { 107 } 108} 109 110func (c *Controller) processNextLease() bool { 111 key, quit := c.leaseQueue.Get() 112 if quit { 113 return false 114 } 115 defer c.leaseQueue.Done(key) 116 117 err := c.processDeletedLease(key.(string)) 118 if err == nil { 119 c.leaseQueue.Forget(key) 120 return true 121 } 122 123 utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err)) 124 c.leaseQueue.AddRateLimited(key) 125 return true 126} 127 128func (c *Controller) runStorageVersionWorker() { 129 for c.processNextStorageVersion() { 130 } 131} 132 133func (c *Controller) processNextStorageVersion() bool { 134 key, quit := c.storageVersionQueue.Get() 135 if quit { 136 return false 137 } 138 defer c.storageVersionQueue.Done(key) 139 140 err := c.syncStorageVersion(key.(string)) 141 if err == nil { 142 c.storageVersionQueue.Forget(key) 143 return true 144 } 145 146 utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err)) 147 c.storageVersionQueue.AddRateLimited(key) 148 return true 149} 150 151func (c *Controller) processDeletedLease(name string) error { 152 _, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{}) 153 // the lease isn't deleted, nothing we need to do here 154 if err == nil { 155 return nil 156 } 157 if !apierrors.IsNotFound(err) { 158 return err 159 } 160 // the frequency of this call won't be too high because we only trigger on identity lease deletions 161 storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(context.TODO(), metav1.ListOptions{}) 162 if err != nil { 163 return err 164 } 165 166 var errors []error 167 for _, sv := range storageVersionList.Items { 168 var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion 169 hasStaleRecord := false 170 for _, ssv := range sv.Status.StorageVersions { 171 if ssv.APIServerID == name { 172 hasStaleRecord = true 173 continue 174 } 175 serverStorageVersions = append(serverStorageVersions, ssv) 176 } 177 if !hasStaleRecord { 178 continue 179 } 180 if err := c.updateOrDeleteStorageVersion(&sv, serverStorageVersions); err != nil { 181 errors = append(errors, err) 182 } 183 } 184 185 return utilerrors.NewAggregate(errors) 186} 187 188func (c *Controller) syncStorageVersion(name string) error { 189 sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(context.TODO(), name, metav1.GetOptions{}) 190 if apierrors.IsNotFound(err) { 191 // The problematic storage version that was added/updated recently is gone. 192 // Nothing we need to do here. 193 return nil 194 } 195 if err != nil { 196 return err 197 } 198 199 hasInvalidID := false 200 var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion 201 for _, v := range sv.Status.StorageVersions { 202 lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), v.APIServerID, metav1.GetOptions{}) 203 if err != nil || lease == nil || lease.Labels == nil || 204 lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { 205 // We cannot find a corresponding identity lease from apiserver as well. 206 // We need to clean up this storage version. 207 hasInvalidID = true 208 continue 209 } 210 serverStorageVersions = append(serverStorageVersions, v) 211 } 212 if !hasInvalidID { 213 return nil 214 } 215 return c.updateOrDeleteStorageVersion(sv, serverStorageVersions) 216} 217 218func (c *Controller) onAddStorageVersion(obj interface{}) { 219 castObj := obj.(*apiserverinternalv1alpha1.StorageVersion) 220 c.enqueueStorageVersion(castObj) 221} 222 223func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) { 224 castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion) 225 c.enqueueStorageVersion(castNewObj) 226} 227 228// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver 229func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) { 230 for _, sv := range obj.Status.StorageVersions { 231 lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) 232 if err != nil || lease == nil || lease.Labels == nil || 233 lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { 234 // we cannot find a corresponding identity lease in cache, enqueue the storageversion 235 klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name) 236 c.storageVersionQueue.Add(obj.Name) 237 return 238 } 239 } 240} 241 242func (c *Controller) onDeleteLease(obj interface{}) { 243 castObj, ok := obj.(*coordinationv1.Lease) 244 if !ok { 245 tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 246 if !ok { 247 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) 248 return 249 } 250 castObj, ok = tombstone.Obj.(*coordinationv1.Lease) 251 if !ok { 252 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj)) 253 return 254 } 255 } 256 257 if castObj.Namespace == metav1.NamespaceSystem && 258 castObj.Labels != nil && 259 castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { 260 klog.V(4).Infof("Observed lease %s deleted", castObj.Name) 261 c.enqueueLease(castObj) 262 } 263} 264 265func (c *Controller) enqueueLease(obj *coordinationv1.Lease) { 266 c.leaseQueue.Add(obj.Name) 267} 268 269func (c *Controller) updateOrDeleteStorageVersion(sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error { 270 if len(serverStorageVersions) == 0 { 271 return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete( 272 context.TODO(), sv.Name, metav1.DeleteOptions{}) 273 } 274 sv.Status.StorageVersions = serverStorageVersions 275 storageversion.SetCommonEncodingVersion(sv) 276 _, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus( 277 context.TODO(), sv, metav1.UpdateOptions{}) 278 return err 279} 280