1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package scheduling
18
19import (
20	"fmt"
21	storagehelpers "k8s.io/component-helpers/storage/volume"
22	"strconv"
23	"sync"
24
25	"k8s.io/klog/v2"
26
27	v1 "k8s.io/api/core/v1"
28	"k8s.io/apimachinery/pkg/api/meta"
29	"k8s.io/client-go/tools/cache"
30)
31
32// AssumeCache is a cache on top of the informer that allows for updating
33// objects outside of informer events and also restoring the informer
34// cache's version of the object.  Objects are assumed to be
35// Kubernetes API objects that implement meta.Interface
36type AssumeCache interface {
37	// Assume updates the object in-memory only
38	Assume(obj interface{}) error
39
40	// Restore the informer cache's version of the object
41	Restore(objName string)
42
43	// Get the object by name
44	Get(objName string) (interface{}, error)
45
46	// Get the API object by name
47	GetAPIObj(objName string) (interface{}, error)
48
49	// List all the objects in the cache
50	List(indexObj interface{}) []interface{}
51}
52
53type errWrongType struct {
54	typeName string
55	object   interface{}
56}
57
58func (e *errWrongType) Error() string {
59	return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
60}
61
62type errNotFound struct {
63	typeName   string
64	objectName string
65}
66
67func (e *errNotFound) Error() string {
68	return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
69}
70
71type errObjectName struct {
72	detailedErr error
73}
74
75func (e *errObjectName) Error() string {
76	return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
77}
78
79// assumeCache stores two pointers to represent a single object:
80// * The pointer to the informer object.
81// * The pointer to the latest object, which could be the same as
82//   the informer object, or an in-memory object.
83//
84// An informer update always overrides the latest object pointer.
85//
86// Assume() only updates the latest object pointer.
87// Restore() sets the latest object pointer back to the informer object.
88// Get/List() always returns the latest object pointer.
89type assumeCache struct {
90	// Synchronizes updates to store
91	rwMutex sync.RWMutex
92
93	// describes the object stored
94	description string
95
96	// Stores objInfo pointers
97	store cache.Indexer
98
99	// Index function for object
100	indexFunc cache.IndexFunc
101	indexName string
102}
103
104type objInfo struct {
105	// name of the object
106	name string
107
108	// Latest version of object could be cached-only or from informer
109	latestObj interface{}
110
111	// Latest object from informer
112	apiObj interface{}
113}
114
115func objInfoKeyFunc(obj interface{}) (string, error) {
116	objInfo, ok := obj.(*objInfo)
117	if !ok {
118		return "", &errWrongType{"objInfo", obj}
119	}
120	return objInfo.name, nil
121}
122
123func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
124	objInfo, ok := obj.(*objInfo)
125	if !ok {
126		return []string{""}, &errWrongType{"objInfo", obj}
127	}
128	return c.indexFunc(objInfo.latestObj)
129}
130
131// NewAssumeCache creates an assume cache for general objects.
132func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
133	c := &assumeCache{
134		description: description,
135		indexFunc:   indexFunc,
136		indexName:   indexName,
137	}
138	indexers := cache.Indexers{}
139	if indexName != "" && indexFunc != nil {
140		indexers[indexName] = c.objInfoIndexFunc
141	}
142	c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
143
144	// Unit tests don't use informers
145	if informer != nil {
146		informer.AddEventHandler(
147			cache.ResourceEventHandlerFuncs{
148				AddFunc:    c.add,
149				UpdateFunc: c.update,
150				DeleteFunc: c.delete,
151			},
152		)
153	}
154	return c
155}
156
157func (c *assumeCache) add(obj interface{}) {
158	if obj == nil {
159		return
160	}
161
162	name, err := cache.MetaNamespaceKeyFunc(obj)
163	if err != nil {
164		klog.Errorf("add failed: %v", &errObjectName{err})
165		return
166	}
167
168	c.rwMutex.Lock()
169	defer c.rwMutex.Unlock()
170
171	if objInfo, _ := c.getObjInfo(name); objInfo != nil {
172		newVersion, err := c.getObjVersion(name, obj)
173		if err != nil {
174			klog.Errorf("add: couldn't get object version: %v", err)
175			return
176		}
177
178		storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
179		if err != nil {
180			klog.Errorf("add: couldn't get stored object version: %v", err)
181			return
182		}
183
184		// Only update object if version is newer.
185		// This is so we don't override assumed objects due to informer resync.
186		if newVersion <= storedVersion {
187			klog.V(10).Infof("Skip adding %v %v to assume cache because version %v is not newer than %v", c.description, name, newVersion, storedVersion)
188			return
189		}
190	}
191
192	objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
193	if err = c.store.Update(objInfo); err != nil {
194		klog.Warningf("got error when updating stored object : %v", err)
195	} else {
196		klog.V(10).Infof("Adding %v %v to assume cache: %+v ", c.description, name, obj)
197	}
198}
199
200func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
201	c.add(newObj)
202}
203
204func (c *assumeCache) delete(obj interface{}) {
205	if obj == nil {
206		return
207	}
208
209	name, err := cache.MetaNamespaceKeyFunc(obj)
210	if err != nil {
211		klog.Errorf("delete failed: %v", &errObjectName{err})
212		return
213	}
214
215	c.rwMutex.Lock()
216	defer c.rwMutex.Unlock()
217
218	objInfo := &objInfo{name: name}
219	err = c.store.Delete(objInfo)
220	if err != nil {
221		klog.Errorf("delete: failed to delete %v %v: %v", c.description, name, err)
222	}
223}
224
225func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
226	objAccessor, err := meta.Accessor(obj)
227	if err != nil {
228		return -1, err
229	}
230
231	objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
232	if err != nil {
233		return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
234	}
235	return objResourceVersion, nil
236}
237
238func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
239	obj, ok, err := c.store.GetByKey(name)
240	if err != nil {
241		return nil, err
242	}
243	if !ok {
244		return nil, &errNotFound{c.description, name}
245	}
246
247	objInfo, ok := obj.(*objInfo)
248	if !ok {
249		return nil, &errWrongType{"objInfo", obj}
250	}
251	return objInfo, nil
252}
253
254func (c *assumeCache) Get(objName string) (interface{}, error) {
255	c.rwMutex.RLock()
256	defer c.rwMutex.RUnlock()
257
258	objInfo, err := c.getObjInfo(objName)
259	if err != nil {
260		return nil, err
261	}
262	return objInfo.latestObj, nil
263}
264
265func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
266	c.rwMutex.RLock()
267	defer c.rwMutex.RUnlock()
268
269	objInfo, err := c.getObjInfo(objName)
270	if err != nil {
271		return nil, err
272	}
273	return objInfo.apiObj, nil
274}
275
276func (c *assumeCache) List(indexObj interface{}) []interface{} {
277	c.rwMutex.RLock()
278	defer c.rwMutex.RUnlock()
279
280	allObjs := []interface{}{}
281	objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
282	if err != nil {
283		klog.Errorf("list index error: %v", err)
284		return nil
285	}
286
287	for _, obj := range objs {
288		objInfo, ok := obj.(*objInfo)
289		if !ok {
290			klog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
291			continue
292		}
293		allObjs = append(allObjs, objInfo.latestObj)
294	}
295	return allObjs
296}
297
298func (c *assumeCache) Assume(obj interface{}) error {
299	name, err := cache.MetaNamespaceKeyFunc(obj)
300	if err != nil {
301		return &errObjectName{err}
302	}
303
304	c.rwMutex.Lock()
305	defer c.rwMutex.Unlock()
306
307	objInfo, err := c.getObjInfo(name)
308	if err != nil {
309		return err
310	}
311
312	newVersion, err := c.getObjVersion(name, obj)
313	if err != nil {
314		return err
315	}
316
317	storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
318	if err != nil {
319		return err
320	}
321
322	if newVersion < storedVersion {
323		return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
324	}
325
326	// Only update the cached object
327	objInfo.latestObj = obj
328	klog.V(4).Infof("Assumed %v %q, version %v", c.description, name, newVersion)
329	return nil
330}
331
332func (c *assumeCache) Restore(objName string) {
333	c.rwMutex.Lock()
334	defer c.rwMutex.Unlock()
335
336	objInfo, err := c.getObjInfo(objName)
337	if err != nil {
338		// This could be expected if object got deleted
339		klog.V(5).Infof("Restore %v %q warning: %v", c.description, objName, err)
340	} else {
341		objInfo.latestObj = objInfo.apiObj
342		klog.V(4).Infof("Restored %v %q", c.description, objName)
343	}
344}
345
346// PVAssumeCache is a AssumeCache for PersistentVolume objects
347type PVAssumeCache interface {
348	AssumeCache
349
350	GetPV(pvName string) (*v1.PersistentVolume, error)
351	GetAPIPV(pvName string) (*v1.PersistentVolume, error)
352	ListPVs(storageClassName string) []*v1.PersistentVolume
353}
354
355type pvAssumeCache struct {
356	AssumeCache
357}
358
359func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
360	if pv, ok := obj.(*v1.PersistentVolume); ok {
361		return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
362	}
363	return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
364}
365
366// NewPVAssumeCache creates a PV assume cache.
367func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
368	return &pvAssumeCache{NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
369}
370
371func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
372	obj, err := c.Get(pvName)
373	if err != nil {
374		return nil, err
375	}
376
377	pv, ok := obj.(*v1.PersistentVolume)
378	if !ok {
379		return nil, &errWrongType{"v1.PersistentVolume", obj}
380	}
381	return pv, nil
382}
383
384func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
385	obj, err := c.GetAPIObj(pvName)
386	if err != nil {
387		return nil, err
388	}
389	pv, ok := obj.(*v1.PersistentVolume)
390	if !ok {
391		return nil, &errWrongType{"v1.PersistentVolume", obj}
392	}
393	return pv, nil
394}
395
396func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
397	objs := c.List(&v1.PersistentVolume{
398		Spec: v1.PersistentVolumeSpec{
399			StorageClassName: storageClassName,
400		},
401	})
402	pvs := []*v1.PersistentVolume{}
403	for _, obj := range objs {
404		pv, ok := obj.(*v1.PersistentVolume)
405		if !ok {
406			klog.Errorf("ListPVs: %v", &errWrongType{"v1.PersistentVolume", obj})
407			continue
408		}
409		pvs = append(pvs, pv)
410	}
411	return pvs
412}
413
414// PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
415type PVCAssumeCache interface {
416	AssumeCache
417
418	// GetPVC returns the PVC from the cache with given pvcKey.
419	// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
420	GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
421	GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
422}
423
424type pvcAssumeCache struct {
425	AssumeCache
426}
427
428// NewPVCAssumeCache creates a PVC assume cache.
429func NewPVCAssumeCache(informer cache.SharedIndexInformer) PVCAssumeCache {
430	return &pvcAssumeCache{NewAssumeCache(informer, "v1.PersistentVolumeClaim", "", nil)}
431}
432
433func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
434	obj, err := c.Get(pvcKey)
435	if err != nil {
436		return nil, err
437	}
438
439	pvc, ok := obj.(*v1.PersistentVolumeClaim)
440	if !ok {
441		return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
442	}
443	return pvc, nil
444}
445
446func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
447	obj, err := c.GetAPIObj(pvcKey)
448	if err != nil {
449		return nil, err
450	}
451	pvc, ok := obj.(*v1.PersistentVolumeClaim)
452	if !ok {
453		return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
454	}
455	return pvc, nil
456}
457