1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"errors"
21	"fmt"
22	"io"
23	"math/rand"
24	"net"
25	"net/url"
26	"reflect"
27	"strings"
28	"sync"
29	"syscall"
30	"time"
31
32	apierrs "k8s.io/apimachinery/pkg/api/errors"
33	"k8s.io/apimachinery/pkg/api/meta"
34	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35	"k8s.io/apimachinery/pkg/runtime"
36	"k8s.io/apimachinery/pkg/util/clock"
37	"k8s.io/apimachinery/pkg/util/naming"
38	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
39	"k8s.io/apimachinery/pkg/util/wait"
40	"k8s.io/apimachinery/pkg/watch"
41	"k8s.io/klog"
42	"k8s.io/utils/trace"
43)
44
45// Reflector watches a specified resource and causes all changes to be reflected in the given store.
46type Reflector struct {
47	// name identifies this reflector. By default it will be a file:line if possible.
48	name string
49	// metrics tracks basic metric information about the reflector
50	metrics *reflectorMetrics
51
52	// The type of object we expect to place in the store.
53	expectedType reflect.Type
54	// The destination to sync up with the watch source
55	store Store
56	// listerWatcher is used to perform lists and watches.
57	listerWatcher ListerWatcher
58	// period controls timing between one watch ending and
59	// the beginning of the next one.
60	period       time.Duration
61	resyncPeriod time.Duration
62	ShouldResync func() bool
63	// clock allows tests to manipulate time
64	clock clock.Clock
65	// lastSyncResourceVersion is the resource version token last
66	// observed when doing a sync with the underlying store
67	// it is thread safe, but not synchronized with the underlying store
68	lastSyncResourceVersion string
69	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
70	lastSyncResourceVersionMutex sync.RWMutex
71}
72
73var (
74	// We try to spread the load on apiserver by setting timeouts for
75	// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
76	minWatchTimeout = 5 * time.Minute
77)
78
79// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
80// The indexer is configured to key on namespace
81func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
82	indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
83	reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
84	return indexer, reflector
85}
86
87// NewReflector creates a new Reflector object which will keep the given store up to
88// date with the server's contents for the given resource. Reflector promises to
89// only put things in the store that have the type of expectedType, unless expectedType
90// is nil. If resyncPeriod is non-zero, then lists will be executed after every
91// resyncPeriod, so that you can use reflectors to periodically process everything as
92// well as incrementally processing the things that change.
93func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
94	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
95}
96
97// NewNamedReflector same as NewReflector, but with a specified name for logging
98func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
99	r := &Reflector{
100		name:          name,
101		listerWatcher: lw,
102		store:         store,
103		expectedType:  reflect.TypeOf(expectedType),
104		period:        time.Second,
105		resyncPeriod:  resyncPeriod,
106		clock:         &clock.RealClock{},
107	}
108	return r
109}
110
111func makeValidPrometheusMetricLabel(in string) string {
112	// this isn't perfect, but it removes our common characters
113	return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
114}
115
116// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
117// call chains to NewReflector, so they'd be low entropy names for reflectors
118var internalPackages = []string{"client-go/tools/cache/"}
119
120// Run starts a watch and handles watch events. Will restart the watch if it is closed.
121// Run will exit when stopCh is closed.
122func (r *Reflector) Run(stopCh <-chan struct{}) {
123	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
124	wait.Until(func() {
125		if err := r.ListAndWatch(stopCh); err != nil {
126			utilruntime.HandleError(err)
127		}
128	}, r.period, stopCh)
129}
130
131var (
132	// nothing will ever be sent down this channel
133	neverExitWatch <-chan time.Time = make(chan time.Time)
134
135	// Used to indicate that watching stopped so that a resync could happen.
136	errorResyncRequested = errors.New("resync channel fired")
137
138	// Used to indicate that watching stopped because of a signal from the stop
139	// channel passed in from a client of the reflector.
140	errorStopRequested = errors.New("Stop requested")
141)
142
143// resyncChan returns a channel which will receive something when a resync is
144// required, and a cleanup function.
145func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
146	if r.resyncPeriod == 0 {
147		return neverExitWatch, func() bool { return false }
148	}
149	// The cleanup function is required: imagine the scenario where watches
150	// always fail so we end up listing frequently. Then, if we don't
151	// manually stop the timer, we could end up with many timers active
152	// concurrently.
153	t := r.clock.NewTimer(r.resyncPeriod)
154	return t.C(), t.Stop
155}
156
157// ListAndWatch first lists all items and get the resource version at the moment of call,
158// and then use the resource version to watch.
159// It returns error if ListAndWatch didn't even try to initialize watch.
160func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
161	klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
162	var resourceVersion string
163
164	// Explicitly set "0" as resource version - it's fine for the List()
165	// to be served from cache and potentially be delayed relative to
166	// etcd contents. Reflector framework will catch up via Watch() eventually.
167	options := metav1.ListOptions{ResourceVersion: "0"}
168
169	if err := func() error {
170		initTrace := trace.New("Reflector " + r.name + " ListAndWatch")
171		defer initTrace.LogIfLong(10 * time.Second)
172		var list runtime.Object
173		var err error
174		listCh := make(chan struct{}, 1)
175		panicCh := make(chan interface{}, 1)
176		go func() {
177			defer func() {
178				if r := recover(); r != nil {
179					panicCh <- r
180				}
181			}()
182			list, err = r.listerWatcher.List(options)
183			close(listCh)
184		}()
185		select {
186		case <-stopCh:
187			return nil
188		case r := <-panicCh:
189			panic(r)
190		case <-listCh:
191		}
192		if err != nil {
193			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
194		}
195		initTrace.Step("Objects listed")
196		listMetaInterface, err := meta.ListAccessor(list)
197		if err != nil {
198			return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
199		}
200		resourceVersion = listMetaInterface.GetResourceVersion()
201		initTrace.Step("Resource version extracted")
202		items, err := meta.ExtractList(list)
203		if err != nil {
204			return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
205		}
206		initTrace.Step("Objects extracted")
207		if err := r.syncWith(items, resourceVersion); err != nil {
208			return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
209		}
210		initTrace.Step("SyncWith done")
211		r.setLastSyncResourceVersion(resourceVersion)
212		initTrace.Step("Resource version updated")
213		return nil
214	}(); err != nil {
215		return err
216	}
217
218	resyncerrc := make(chan error, 1)
219	cancelCh := make(chan struct{})
220	defer close(cancelCh)
221	go func() {
222		resyncCh, cleanup := r.resyncChan()
223		defer func() {
224			cleanup() // Call the last one written into cleanup
225		}()
226		for {
227			select {
228			case <-resyncCh:
229			case <-stopCh:
230				return
231			case <-cancelCh:
232				return
233			}
234			if r.ShouldResync == nil || r.ShouldResync() {
235				klog.V(4).Infof("%s: forcing resync", r.name)
236				if err := r.store.Resync(); err != nil {
237					resyncerrc <- err
238					return
239				}
240			}
241			cleanup()
242			resyncCh, cleanup = r.resyncChan()
243		}
244	}()
245
246	for {
247		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
248		select {
249		case <-stopCh:
250			return nil
251		default:
252		}
253
254		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
255		options = metav1.ListOptions{
256			ResourceVersion: resourceVersion,
257			// We want to avoid situations of hanging watchers. Stop any wachers that do not
258			// receive any events within the timeout window.
259			TimeoutSeconds: &timeoutSeconds,
260		}
261
262		w, err := r.listerWatcher.Watch(options)
263		if err != nil {
264			switch err {
265			case io.EOF:
266				// watch closed normally
267			case io.ErrUnexpectedEOF:
268				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
269			default:
270				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
271			}
272			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
273			// It doesn't make sense to re-list all objects because most likely we will be able to restart
274			// watch where we ended.
275			// If that's the case wait and resend watch request.
276			if urlError, ok := err.(*url.Error); ok {
277				if opError, ok := urlError.Err.(*net.OpError); ok {
278					if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
279						time.Sleep(time.Second)
280						continue
281					}
282				}
283			}
284			return nil
285		}
286
287		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
288			if err != errorStopRequested {
289				klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
290			}
291			return nil
292		}
293	}
294}
295
296// syncWith replaces the store's items with the given list.
297func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
298	found := make([]interface{}, 0, len(items))
299	for _, item := range items {
300		found = append(found, item)
301	}
302	return r.store.Replace(found, resourceVersion)
303}
304
305// watchHandler watches w and keeps *resourceVersion up to date.
306func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
307	start := r.clock.Now()
308	eventCount := 0
309
310	// Stopping the watcher should be idempotent and if we return from this function there's no way
311	// we're coming back in with the same watch interface.
312	defer w.Stop()
313
314loop:
315	for {
316		select {
317		case <-stopCh:
318			return errorStopRequested
319		case err := <-errc:
320			return err
321		case event, ok := <-w.ResultChan():
322			if !ok {
323				break loop
324			}
325			if event.Type == watch.Error {
326				return apierrs.FromObject(event.Object)
327			}
328			if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
329				utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
330				continue
331			}
332			meta, err := meta.Accessor(event.Object)
333			if err != nil {
334				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
335				continue
336			}
337			newResourceVersion := meta.GetResourceVersion()
338			switch event.Type {
339			case watch.Added:
340				err := r.store.Add(event.Object)
341				if err != nil {
342					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
343				}
344			case watch.Modified:
345				err := r.store.Update(event.Object)
346				if err != nil {
347					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
348				}
349			case watch.Deleted:
350				// TODO: Will any consumers need access to the "last known
351				// state", which is passed in event.Object? If so, may need
352				// to change this.
353				err := r.store.Delete(event.Object)
354				if err != nil {
355					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
356				}
357			default:
358				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
359			}
360			*resourceVersion = newResourceVersion
361			r.setLastSyncResourceVersion(newResourceVersion)
362			eventCount++
363		}
364	}
365
366	watchDuration := r.clock.Now().Sub(start)
367	if watchDuration < 1*time.Second && eventCount == 0 {
368		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
369	}
370	klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
371	return nil
372}
373
374// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
375// The value returned is not synchronized with access to the underlying store and is not thread-safe
376func (r *Reflector) LastSyncResourceVersion() string {
377	r.lastSyncResourceVersionMutex.RLock()
378	defer r.lastSyncResourceVersionMutex.RUnlock()
379	return r.lastSyncResourceVersion
380}
381
382func (r *Reflector) setLastSyncResourceVersion(v string) {
383	r.lastSyncResourceVersionMutex.Lock()
384	defer r.lastSyncResourceVersionMutex.Unlock()
385	r.lastSyncResourceVersion = v
386}
387