1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package scheduling 18 19import ( 20 "fmt" 21 storagehelpers "k8s.io/component-helpers/storage/volume" 22 "strconv" 23 "sync" 24 25 "k8s.io/klog/v2" 26 27 v1 "k8s.io/api/core/v1" 28 "k8s.io/apimachinery/pkg/api/meta" 29 "k8s.io/client-go/tools/cache" 30) 31 32// AssumeCache is a cache on top of the informer that allows for updating 33// objects outside of informer events and also restoring the informer 34// cache's version of the object. Objects are assumed to be 35// Kubernetes API objects that implement meta.Interface 36type AssumeCache interface { 37 // Assume updates the object in-memory only 38 Assume(obj interface{}) error 39 40 // Restore the informer cache's version of the object 41 Restore(objName string) 42 43 // Get the object by name 44 Get(objName string) (interface{}, error) 45 46 // Get the API object by name 47 GetAPIObj(objName string) (interface{}, error) 48 49 // List all the objects in the cache 50 List(indexObj interface{}) []interface{} 51} 52 53type errWrongType struct { 54 typeName string 55 object interface{} 56} 57 58func (e *errWrongType) Error() string { 59 return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object) 60} 61 62type errNotFound struct { 63 typeName string 64 objectName string 65} 66 67func (e *errNotFound) Error() string { 68 return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName) 69} 70 71type errObjectName struct { 72 detailedErr error 73} 74 75func (e *errObjectName) Error() string { 76 return fmt.Sprintf("failed to get object name: %v", e.detailedErr) 77} 78 79// assumeCache stores two pointers to represent a single object: 80// * The pointer to the informer object. 81// * The pointer to the latest object, which could be the same as 82// the informer object, or an in-memory object. 83// 84// An informer update always overrides the latest object pointer. 85// 86// Assume() only updates the latest object pointer. 87// Restore() sets the latest object pointer back to the informer object. 88// Get/List() always returns the latest object pointer. 89type assumeCache struct { 90 // Synchronizes updates to store 91 rwMutex sync.RWMutex 92 93 // describes the object stored 94 description string 95 96 // Stores objInfo pointers 97 store cache.Indexer 98 99 // Index function for object 100 indexFunc cache.IndexFunc 101 indexName string 102} 103 104type objInfo struct { 105 // name of the object 106 name string 107 108 // Latest version of object could be cached-only or from informer 109 latestObj interface{} 110 111 // Latest object from informer 112 apiObj interface{} 113} 114 115func objInfoKeyFunc(obj interface{}) (string, error) { 116 objInfo, ok := obj.(*objInfo) 117 if !ok { 118 return "", &errWrongType{"objInfo", obj} 119 } 120 return objInfo.name, nil 121} 122 123func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { 124 objInfo, ok := obj.(*objInfo) 125 if !ok { 126 return []string{""}, &errWrongType{"objInfo", obj} 127 } 128 return c.indexFunc(objInfo.latestObj) 129} 130 131// NewAssumeCache creates an assume cache for general objects. 132func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache { 133 c := &assumeCache{ 134 description: description, 135 indexFunc: indexFunc, 136 indexName: indexName, 137 } 138 indexers := cache.Indexers{} 139 if indexName != "" && indexFunc != nil { 140 indexers[indexName] = c.objInfoIndexFunc 141 } 142 c.store = cache.NewIndexer(objInfoKeyFunc, indexers) 143 144 // Unit tests don't use informers 145 if informer != nil { 146 informer.AddEventHandler( 147 cache.ResourceEventHandlerFuncs{ 148 AddFunc: c.add, 149 UpdateFunc: c.update, 150 DeleteFunc: c.delete, 151 }, 152 ) 153 } 154 return c 155} 156 157func (c *assumeCache) add(obj interface{}) { 158 if obj == nil { 159 return 160 } 161 162 name, err := cache.MetaNamespaceKeyFunc(obj) 163 if err != nil { 164 klog.Errorf("add failed: %v", &errObjectName{err}) 165 return 166 } 167 168 c.rwMutex.Lock() 169 defer c.rwMutex.Unlock() 170 171 if objInfo, _ := c.getObjInfo(name); objInfo != nil { 172 newVersion, err := c.getObjVersion(name, obj) 173 if err != nil { 174 klog.Errorf("add: couldn't get object version: %v", err) 175 return 176 } 177 178 storedVersion, err := c.getObjVersion(name, objInfo.latestObj) 179 if err != nil { 180 klog.Errorf("add: couldn't get stored object version: %v", err) 181 return 182 } 183 184 // Only update object if version is newer. 185 // This is so we don't override assumed objects due to informer resync. 186 if newVersion <= storedVersion { 187 klog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion) 188 return 189 } 190 } 191 192 objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj} 193 if err = c.store.Update(objInfo); err != nil { 194 klog.Warningf("got error when updating stored object : %v", err) 195 } else { 196 klog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj) 197 } 198} 199 200func (c *assumeCache) update(oldObj interface{}, newObj interface{}) { 201 c.add(newObj) 202} 203 204func (c *assumeCache) delete(obj interface{}) { 205 if obj == nil { 206 return 207 } 208 209 name, err := cache.MetaNamespaceKeyFunc(obj) 210 if err != nil { 211 klog.Errorf("delete failed: %v", &errObjectName{err}) 212 return 213 } 214 215 c.rwMutex.Lock() 216 defer c.rwMutex.Unlock() 217 218 objInfo := &objInfo{name: name} 219 err = c.store.Delete(objInfo) 220 if err != nil { 221 klog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err) 222 } 223} 224 225func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) { 226 objAccessor, err := meta.Accessor(obj) 227 if err != nil { 228 return -1, err 229 } 230 231 objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) 232 if err != nil { 233 return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err) 234 } 235 return objResourceVersion, nil 236} 237 238func (c *assumeCache) getObjInfo(name string) (*objInfo, error) { 239 obj, ok, err := c.store.GetByKey(name) 240 if err != nil { 241 return nil, err 242 } 243 if !ok { 244 return nil, &errNotFound{c.description, name} 245 } 246 247 objInfo, ok := obj.(*objInfo) 248 if !ok { 249 return nil, &errWrongType{"objInfo", obj} 250 } 251 return objInfo, nil 252} 253 254func (c *assumeCache) Get(objName string) (interface{}, error) { 255 c.rwMutex.RLock() 256 defer c.rwMutex.RUnlock() 257 258 objInfo, err := c.getObjInfo(objName) 259 if err != nil { 260 return nil, err 261 } 262 return objInfo.latestObj, nil 263} 264 265func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) { 266 c.rwMutex.RLock() 267 defer c.rwMutex.RUnlock() 268 269 objInfo, err := c.getObjInfo(objName) 270 if err != nil { 271 return nil, err 272 } 273 return objInfo.apiObj, nil 274} 275 276func (c *assumeCache) List(indexObj interface{}) []interface{} { 277 c.rwMutex.RLock() 278 defer c.rwMutex.RUnlock() 279 280 allObjs := []interface{}{} 281 objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) 282 if err != nil { 283 klog.Errorf("list index error: %v", err) 284 return nil 285 } 286 287 for _, obj := range objs { 288 objInfo, ok := obj.(*objInfo) 289 if !ok { 290 klog.Errorf("list error: %v", &errWrongType{"objInfo", obj}) 291 continue 292 } 293 allObjs = append(allObjs, objInfo.latestObj) 294 } 295 return allObjs 296} 297 298func (c *assumeCache) Assume(obj interface{}) error { 299 name, err := cache.MetaNamespaceKeyFunc(obj) 300 if err != nil { 301 return &errObjectName{err} 302 } 303 304 c.rwMutex.Lock() 305 defer c.rwMutex.Unlock() 306 307 objInfo, err := c.getObjInfo(name) 308 if err != nil { 309 return err 310 } 311 312 newVersion, err := c.getObjVersion(name, obj) 313 if err != nil { 314 return err 315 } 316 317 storedVersion, err := c.getObjVersion(name, objInfo.latestObj) 318 if err != nil { 319 return err 320 } 321 322 if newVersion < storedVersion { 323 return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion) 324 } 325 326 // Only update the cached object 327 objInfo.latestObj = obj 328 klog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion) 329 return nil 330} 331 332func (c *assumeCache) Restore(objName string) { 333 c.rwMutex.Lock() 334 defer c.rwMutex.Unlock() 335 336 objInfo, err := c.getObjInfo(objName) 337 if err != nil { 338 // This could be expected if object got deleted 339 klog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err) 340 } else { 341 objInfo.latestObj = objInfo.apiObj 342 klog.V(4).Infof("Restored %v %q", c.description, objName) 343 } 344} 345 346// PVAssumeCache is a AssumeCache for PersistentVolume objects 347type PVAssumeCache interface { 348 AssumeCache 349 350 GetPV(pvName string) (*v1.PersistentVolume, error) 351 GetAPIPV(pvName string) (*v1.PersistentVolume, error) 352 ListPVs(storageClassName string) []*v1.PersistentVolume 353} 354 355type pvAssumeCache struct { 356 AssumeCache 357} 358 359func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { 360 if pv, ok := obj.(*v1.PersistentVolume); ok { 361 return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil 362 } 363 return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) 364} 365 366// NewPVAssumeCache creates a PV assume cache. 367func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache { 368 return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)} 369} 370 371func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { 372 obj, err := c.Get(pvName) 373 if err != nil { 374 return nil, err 375 } 376 377 pv, ok := obj.(*v1.PersistentVolume) 378 if !ok { 379 return nil, &errWrongType{"v1.PersistentVolume", obj} 380 } 381 return pv, nil 382} 383 384func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) { 385 obj, err := c.GetAPIObj(pvName) 386 if err != nil { 387 return nil, err 388 } 389 pv, ok := obj.(*v1.PersistentVolume) 390 if !ok { 391 return nil, &errWrongType{"v1.PersistentVolume", obj} 392 } 393 return pv, nil 394} 395 396func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { 397 objs := c.List(&v1.PersistentVolume{ 398 Spec: v1.PersistentVolumeSpec{ 399 StorageClassName: storageClassName, 400 }, 401 }) 402 pvs := []*v1.PersistentVolume{} 403 for _, obj := range objs { 404 pv, ok := obj.(*v1.PersistentVolume) 405 if !ok { 406 klog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj}) 407 continue 408 } 409 pvs = append(pvs, pv) 410 } 411 return pvs 412} 413 414// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects 415type PVCAssumeCache interface { 416 AssumeCache 417 418 // GetPVC returns the PVC from the cache with given pvcKey. 419 // pvcKey is the result of MetaNamespaceKeyFunc on PVC obj 420 GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) 421 GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) 422} 423 424type pvcAssumeCache struct { 425 AssumeCache 426} 427 428// NewPVCAssumeCache creates a PVC assume cache. 429func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache { 430 return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)} 431} 432 433func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { 434 obj, err := c.Get(pvcKey) 435 if err != nil { 436 return nil, err 437 } 438 439 pvc, ok := obj.(*v1.PersistentVolumeClaim) 440 if !ok { 441 return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} 442 } 443 return pvc, nil 444} 445 446func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) { 447 obj, err := c.GetAPIObj(pvcKey) 448 if err != nil { 449 return nil, err 450 } 451 pvc, ok := obj.(*v1.PersistentVolumeClaim) 452 if !ok { 453 return nil, &errWrongType{"v1.PersistentVolumeClaim", obj} 454 } 455 return pvc, nil 456} 457