1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"os"
22	"reflect"
23	"strconv"
24	"sync"
25	"time"
26
27	"k8s.io/klog/v2"
28
29	"k8s.io/apimachinery/pkg/runtime"
30	"k8s.io/apimachinery/pkg/util/diff"
31)
32
33var mutationDetectionEnabled = false
34
35func init() {
36	mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
37}
38
39// MutationDetector is able to monitor objects for mutation within a limited window of time
40type MutationDetector interface {
41	// AddObject adds the given object to the set being monitored for a while from now
42	AddObject(obj interface{})
43
44	// Run starts the monitoring and does not return until the monitoring is stopped.
45	Run(stopCh <-chan struct{})
46}
47
48// NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.
49func NewCacheMutationDetector(name string) MutationDetector {
50	if !mutationDetectionEnabled {
51		return dummyMutationDetector{}
52	}
53	klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
54	return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
55}
56
57type dummyMutationDetector struct{}
58
59func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
60}
61func (dummyMutationDetector) AddObject(obj interface{}) {
62}
63
64// defaultCacheMutationDetector gives a way to detect if a cached object has been mutated
65// It has a list of cached objects and their copies.  I haven't thought of a way
66// to see WHO is mutating it, just that it's getting mutated.
67type defaultCacheMutationDetector struct {
68	name   string
69	period time.Duration
70
71	// compareLock ensures only a single call to CompareObjects runs at a time
72	compareObjectsLock sync.Mutex
73
74	// addLock guards addedObjs between AddObject and CompareObjects
75	addedObjsLock sync.Mutex
76	addedObjs     []cacheObj
77
78	cachedObjs []cacheObj
79
80	retainDuration     time.Duration
81	lastRotated        time.Time
82	retainedCachedObjs []cacheObj
83
84	// failureFunc is injectable for unit testing.  If you don't have it, the process will panic.
85	// This panic is intentional, since turning on this detection indicates you want a strong
86	// failure signal.  This failure is effectively a p0 bug and you can't trust process results
87	// after a mutation anyway.
88	failureFunc func(message string)
89}
90
91// cacheObj holds the actual object and a copy
92type cacheObj struct {
93	cached interface{}
94	copied interface{}
95}
96
97func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
98	// we DON'T want protection from panics.  If we're running this code, we want to die
99	for {
100		if d.lastRotated.IsZero() {
101			d.lastRotated = time.Now()
102		} else if time.Since(d.lastRotated) > d.retainDuration {
103			d.retainedCachedObjs = d.cachedObjs
104			d.cachedObjs = nil
105			d.lastRotated = time.Now()
106		}
107
108		d.CompareObjects()
109
110		select {
111		case <-stopCh:
112			return
113		case <-time.After(d.period):
114		}
115	}
116}
117
118// AddObject makes a deep copy of the object for later comparison.  It only works on runtime.Object
119// but that covers the vast majority of our cached objects
120func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
121	if _, ok := obj.(DeletedFinalStateUnknown); ok {
122		return
123	}
124	if obj, ok := obj.(runtime.Object); ok {
125		copiedObj := obj.DeepCopyObject()
126
127		d.addedObjsLock.Lock()
128		defer d.addedObjsLock.Unlock()
129		d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
130	}
131}
132
133func (d *defaultCacheMutationDetector) CompareObjects() {
134	d.compareObjectsLock.Lock()
135	defer d.compareObjectsLock.Unlock()
136
137	// move addedObjs into cachedObjs under lock
138	// this keeps the critical section small to avoid blocking AddObject while we compare cachedObjs
139	d.addedObjsLock.Lock()
140	d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
141	d.addedObjs = nil
142	d.addedObjsLock.Unlock()
143
144	altered := false
145	for i, obj := range d.cachedObjs {
146		if !reflect.DeepEqual(obj.cached, obj.copied) {
147			fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
148			altered = true
149		}
150	}
151	for i, obj := range d.retainedCachedObjs {
152		if !reflect.DeepEqual(obj.cached, obj.copied) {
153			fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
154			altered = true
155		}
156	}
157
158	if altered {
159		msg := fmt.Sprintf("cache %s modified", d.name)
160		if d.failureFunc != nil {
161			d.failureFunc(msg)
162			return
163		}
164		panic(msg)
165	}
166}
167