1// Package cache provides caching features for data from a Consul server.
2//
3// While this is similar in some ways to the "agent/ae" package, a key
4// difference is that with anti-entropy, the agent is the authoritative
5// source so it resolves differences the server may have. With caching (this
6// package), the server is the authoritative source and we do our best to
7// balance performance and correctness, depending on the type of data being
8// requested.
9//
10// The types of data that can be cached is configurable via the Type interface.
11// This allows specialized behavior for certain types of data. Each type of
12// Consul data (CA roots, leaf certs, intentions, KV, catalog, etc.) will
13// have to be manually implemented. This usually is not much work, see
14// the "agent/cache-types" package.
15package cache
16
17import (
18	"container/heap"
19	"fmt"
20	"sync"
21	"sync/atomic"
22	"time"
23
24	"github.com/armon/go-metrics"
25	"github.com/hashicorp/consul/lib"
26)
27
28//go:generate mockery -all -inpkg
29
30// Constants related to refresh backoff. We probably don't ever need to
31// make these configurable knobs since they primarily exist to lower load.
32const (
33	CacheRefreshBackoffMin = 3               // 3 attempts before backing off
34	CacheRefreshMaxWait    = 1 * time.Minute // maximum backoff wait time
35)
36
37// Cache is a agent-local cache of Consul data. Create a Cache using the
38// New function. A zero-value Cache is not ready for usage and will result
39// in a panic.
40//
41// The types of data to be cached must be registered via RegisterType. Then,
42// calls to Get specify the type and a Request implementation. The
43// implementation of Request is usually done directly on the standard RPC
44// struct in agent/structs.  This API makes cache usage a mostly drop-in
45// replacement for non-cached RPC calls.
46//
47// The cache is partitioned by ACL and datacenter. This allows the cache
48// to be safe for multi-DC queries and for queries where the data is modified
49// due to ACLs all without the cache having to have any clever logic, at
50// the slight expense of a less perfect cache.
51//
52// The Cache exposes various metrics via go-metrics. Please view the source
53// searching for "metrics." to see the various metrics exposed. These can be
54// used to explore the performance of the cache.
55type Cache struct {
56	// types stores the list of data types that the cache knows how to service.
57	// These can be dynamically registered with RegisterType.
58	typesLock sync.RWMutex
59	types     map[string]typeEntry
60
61	// entries contains the actual cache data. Access to entries and
62	// entriesExpiryHeap must be protected by entriesLock.
63	//
64	// entriesExpiryHeap is a heap of *cacheEntry values ordered by
65	// expiry, with the soonest to expire being first in the list (index 0).
66	//
67	// NOTE(mitchellh): The entry map key is currently a string in the format
68	// of "<DC>/<ACL token>/<Request key>" in order to properly partition
69	// requests to different datacenters and ACL tokens. This format has some
70	// big drawbacks: we can't evict by datacenter, ACL token, etc. For an
71	// initial implementation this works and the tests are agnostic to the
72	// internal storage format so changing this should be possible safely.
73	entriesLock       sync.RWMutex
74	entries           map[string]cacheEntry
75	entriesExpiryHeap *expiryHeap
76
77	// stopped is used as an atomic flag to signal that the Cache has been
78	// discarded so background fetches and expiry processing should stop.
79	stopped uint32
80	// stopCh is closed when Close is called
81	stopCh chan struct{}
82}
83
84// typeEntry is a single type that is registered with a Cache.
85type typeEntry struct {
86	Type Type
87	Opts *RegisterOptions
88}
89
90// ResultMeta is returned from Get calls along with the value and can be used
91// to expose information about the cache status for debugging or testing.
92type ResultMeta struct {
93	// Hit indicates whether or not the request was a cache hit
94	Hit bool
95
96	// Age identifies how "stale" the result is. It's semantics differ based on
97	// whether or not the cache type performs background refresh or not as defined
98	// in https://www.consul.io/api/index.html#agent-caching.
99	//
100	// For background refresh types, Age is 0 unless the background blocking query
101	// is currently in a failed state and so not keeping up with the server's
102	// values. If it is non-zero it represents the time since the first failure to
103	// connect during background refresh, and is reset after a background request
104	// does manage to reconnect and either return successfully, or block for at
105	// least the yamux keepalive timeout of 30 seconds (which indicates the
106	// connection is OK but blocked as expected).
107	//
108	// For simple cache types, Age is the time since the result being returned was
109	// fetched from the servers.
110	Age time.Duration
111
112	// Index is the internal ModifyIndex for the cache entry. Not all types
113	// support blocking and all that do will likely have this in their result type
114	// already but this allows generic code to reason about whether cache values
115	// have changed.
116	Index uint64
117}
118
119// Options are options for the Cache.
120type Options struct {
121	// Nothing currently, reserved.
122}
123
124// New creates a new cache with the given RPC client and reasonable defaults.
125// Further settings can be tweaked on the returned value.
126func New(*Options) *Cache {
127	// Initialize the heap. The buffer of 1 is really important because
128	// its possible for the expiry loop to trigger the heap to update
129	// itself and it'd block forever otherwise.
130	h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
131	heap.Init(h)
132
133	c := &Cache{
134		types:             make(map[string]typeEntry),
135		entries:           make(map[string]cacheEntry),
136		entriesExpiryHeap: h,
137		stopCh:            make(chan struct{}),
138	}
139
140	// Start the expiry watcher
141	go c.runExpiryLoop()
142
143	return c
144}
145
146// RegisterOptions are options that can be associated with a type being
147// registered for the cache. This changes the behavior of the cache for
148// this type.
149type RegisterOptions struct {
150	// LastGetTTL is the time that the values returned by this type remain
151	// in the cache after the last get operation. If a value isn't accessed
152	// within this duration, the value is purged from the cache and
153	// background refreshing will cease.
154	LastGetTTL time.Duration
155
156	// Refresh configures whether the data is actively refreshed or if
157	// the data is only refreshed on an explicit Get. The default (false)
158	// is to only request data on explicit Get.
159	Refresh bool
160
161	// RefreshTimer is the time between attempting to refresh data.
162	// If this is zero, then data is refreshed immediately when a fetch
163	// is returned.
164	//
165	// RefreshTimeout determines the maximum query time for a refresh
166	// operation. This is specified as part of the query options and is
167	// expected to be implemented by the Type itself.
168	//
169	// Using these values, various "refresh" mechanisms can be implemented:
170	//
171	//   * With a high timer duration and a low timeout, a timer-based
172	//     refresh can be set that minimizes load on the Consul servers.
173	//
174	//   * With a low timer and high timeout duration, a blocking-query-based
175	//     refresh can be set so that changes in server data are recognized
176	//     within the cache very quickly.
177	//
178	RefreshTimer   time.Duration
179	RefreshTimeout time.Duration
180}
181
182// RegisterType registers a cacheable type.
183//
184// This makes the type available for Get but does not automatically perform
185// any prefetching. In order to populate the cache, Get must be called.
186func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
187	if opts == nil {
188		opts = &RegisterOptions{}
189	}
190	if opts.LastGetTTL == 0 {
191		opts.LastGetTTL = 72 * time.Hour // reasonable default is days
192	}
193
194	c.typesLock.Lock()
195	defer c.typesLock.Unlock()
196	c.types[n] = typeEntry{Type: typ, Opts: opts}
197}
198
199// Get loads the data for the given type and request. If data satisfying the
200// minimum index is present in the cache, it is returned immediately. Otherwise,
201// this will block until the data is available or the request timeout is
202// reached.
203//
204// Multiple Get calls for the same Request (matching CacheKey value) will
205// block on a single network request.
206//
207// The timeout specified by the Request will be the timeout on the cache
208// Get, and does not correspond to the timeout of any background data
209// fetching. If the timeout is reached before data satisfying the minimum
210// index is retrieved, the last known value (maybe nil) is returned. No
211// error is returned on timeout. This matches the behavior of Consul blocking
212// queries.
213func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
214	return c.getWithIndex(t, r, r.CacheInfo().MinIndex)
215}
216
217// getEntryLocked retrieves a cache entry and checks if it is ready to be
218// returned given the other parameters. It reads from entries and the caller
219// has to issue a read lock if necessary.
220func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) {
221	entry, ok := c.entries[key]
222	cacheHit := false
223
224	if !ok {
225		return ok, cacheHit, entry
226	}
227
228	// Check if we have a hit
229	cacheHit = ok && entry.Valid
230
231	supportsBlocking := tEntry.Type.SupportsBlocking()
232
233	// Check index is not specified or lower than value, or the type doesn't
234	// support blocking.
235	if cacheHit && supportsBlocking &&
236		minIndex > 0 && minIndex >= entry.Index {
237		// MinIndex was given and matches or is higher than current value so we
238		// ignore the cache and fallthrough to blocking on a new value below.
239		cacheHit = false
240	}
241
242	// Check MaxAge is not exceeded if this is not a background refreshing type
243	// and MaxAge was specified.
244	if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 &&
245		!entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) {
246		cacheHit = false
247	}
248
249	// Check if we are requested to revalidate. If so the first time round the
250	// loop is not a hit but subsequent ones should be treated normally.
251	if cacheHit && !tEntry.Opts.Refresh && revalidate {
252		cacheHit = false
253	}
254
255	return ok, cacheHit, entry
256}
257
258// getWithIndex implements the main Get functionality but allows internal
259// callers (Watch) to manipulate the blocking index separately from the actual
260// request object.
261func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
262	info := r.CacheInfo()
263	if info.Key == "" {
264		metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
265
266		// If no key is specified, then we do not cache this request.
267		// Pass directly through to the backend.
268		return c.fetchDirect(t, r, minIndex)
269	}
270
271	// Get the actual key for our entry
272	key := c.entryKey(t, &info)
273
274	// First time through
275	first := true
276
277	// timeoutCh for watching our timeout
278	var timeoutCh <-chan time.Time
279
280RETRY_GET:
281	// Get the type that we're fetching
282	c.typesLock.RLock()
283	tEntry, ok := c.types[t]
284	c.typesLock.RUnlock()
285	if !ok {
286		// Shouldn't happen given that we successfully fetched this at least
287		// once. But be robust against panics.
288		return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
289	}
290
291	// Get the current value
292	c.entriesLock.RLock()
293	_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
294	c.entriesLock.RUnlock()
295
296	if cacheHit {
297		meta := ResultMeta{Index: entry.Index}
298		if first {
299			metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
300			meta.Hit = true
301		}
302
303		// If refresh is enabled, calculate age based on whether the background
304		// routine is still connected.
305		if tEntry.Opts.Refresh {
306			meta.Age = time.Duration(0)
307			if !entry.RefreshLostContact.IsZero() {
308				meta.Age = time.Since(entry.RefreshLostContact)
309			}
310		} else {
311			// For non-background refresh types, the age is just how long since we
312			// fetched it last.
313			if !entry.FetchedAt.IsZero() {
314				meta.Age = time.Since(entry.FetchedAt)
315			}
316		}
317
318		// Touch the expiration and fix the heap.
319		c.entriesLock.Lock()
320		entry.Expiry.Reset()
321		c.entriesExpiryHeap.Fix(entry.Expiry)
322		c.entriesLock.Unlock()
323
324		// We purposely do not return an error here since the cache only works with
325		// fetching values that either have a value or have an error, but not both.
326		// The Error may be non-nil in the entry in the case that an error has
327		// occurred _since_ the last good value, but we still want to return the
328		// good value to clients that are not requesting a specific version. The
329		// effect of this is that blocking clients will all see an error immediately
330		// without waiting a whole timeout to see it, but clients that just look up
331		// cache with an older index than the last valid result will still see the
332		// result and not the error here. I.e. the error is not "cached" without a
333		// new fetch attempt occurring, but the last good value can still be fetched
334		// from cache.
335		return entry.Value, meta, nil
336	}
337
338	// If this isn't our first time through and our last value has an error, then
339	// we return the error. This has the behavior that we don't sit in a retry
340	// loop getting the same error for the entire duration of the timeout.
341	// Instead, we make one effort to fetch a new value, and if there was an
342	// error, we return. Note that the invariant is that if both entry.Value AND
343	// entry.Error are non-nil, the error _must_ be more recent than the Value. In
344	// other words valid fetches should reset the error. See
345	// https://github.com/hashicorp/consul/issues/4480.
346	if !first && entry.Error != nil {
347		return entry.Value, ResultMeta{Index: entry.Index}, entry.Error
348	}
349
350	if first {
351		// We increment two different counters for cache misses depending on
352		// whether we're missing because we didn't have the data at all,
353		// or if we're missing because we're blocking on a set index.
354		if minIndex == 0 {
355			metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1)
356		} else {
357			metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1)
358		}
359	}
360
361	// Set our timeout channel if we must
362	if info.Timeout > 0 && timeoutCh == nil {
363		timeoutCh = time.After(info.Timeout)
364	}
365
366	// At this point, we know we either don't have a value at all or the
367	// value we have is too old. We need to wait for new data.
368	waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first)
369	if err != nil {
370		return nil, ResultMeta{Index: entry.Index}, err
371	}
372
373	// No longer our first time through
374	first = false
375
376	select {
377	case <-waiterCh:
378		// Our fetch returned, retry the get from the cache.
379		goto RETRY_GET
380
381	case <-timeoutCh:
382		// Timeout on the cache read, just return whatever we have.
383		return entry.Value, ResultMeta{Index: entry.Index}, nil
384	}
385}
386
387// entryKey returns the key for the entry in the cache. See the note
388// about the entry key format in the structure docs for Cache.
389func (c *Cache) entryKey(t string, r *RequestInfo) string {
390	return makeEntryKey(t, r.Datacenter, r.Token, r.Key)
391}
392
393func makeEntryKey(t, dc, token, key string) string {
394	return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
395}
396
397// fetch triggers a new background fetch for the given Request. If a
398// background fetch is already running for a matching Request, the waiter
399// channel for that request is returned. The effect of this is that there
400// is only ever one blocking query for any matching requests.
401//
402// If allowNew is true then the fetch should create the cache entry
403// if it doesn't exist. If this is false, then fetch will do nothing
404// if the entry doesn't exist. This latter case is to support refreshing.
405func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
406	// Get the type that we're fetching
407	c.typesLock.RLock()
408	tEntry, ok := c.types[t]
409	c.typesLock.RUnlock()
410	if !ok {
411		return nil, fmt.Errorf("unknown type in cache: %s", t)
412	}
413
414	info := r.CacheInfo()
415
416	// We acquire a write lock because we may have to set Fetching to true.
417	c.entriesLock.Lock()
418	defer c.entriesLock.Unlock()
419	ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
420
421	// This handles the case where a fetch succeeded after checking for its existence in
422	// getWithIndex. This ensures that we don't miss updates.
423	if ok && cacheHit && !ignoreExisting {
424		ch := make(chan struct{})
425		close(ch)
426		return ch, nil
427	}
428
429	// If we aren't allowing new values and we don't have an existing value,
430	// return immediately. We return an immediately-closed channel so nothing
431	// blocks.
432	if !ok && !allowNew {
433		ch := make(chan struct{})
434		close(ch)
435		return ch, nil
436	}
437
438	// If we already have an entry and it is actively fetching, then return
439	// the currently active waiter.
440	if ok && entry.Fetching {
441		return entry.Waiter, nil
442	}
443
444	// If we don't have an entry, then create it. The entry must be marked
445	// as invalid so that it isn't returned as a valid value for a zero index.
446	if !ok {
447		entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
448	}
449
450	// Set that we're fetching to true, which makes it so that future
451	// identical calls to fetch will return the same waiter rather than
452	// perform multiple fetches.
453	entry.Fetching = true
454	c.entries[key] = entry
455	metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
456
457	// The actual Fetch must be performed in a goroutine.
458	go func() {
459		// If we have background refresh and currently are in "disconnected" state,
460		// waiting for a response might mean we mark our results as stale for up to
461		// 10 minutes (max blocking timeout) after connection is restored. To reduce
462		// that window, we assume that if the fetch takes more than 31 seconds then
463		// they are correctly blocking. We choose 31 seconds because yamux
464		// keepalives are every 30 seconds so the RPC should fail if the packets are
465		// being blackholed for more than 30 seconds.
466		var connectedTimer *time.Timer
467		if tEntry.Opts.Refresh && entry.Index > 0 &&
468			tEntry.Opts.RefreshTimeout > (31*time.Second) {
469			connectedTimer = time.AfterFunc(31*time.Second, func() {
470				c.entriesLock.Lock()
471				defer c.entriesLock.Unlock()
472				entry, ok := c.entries[key]
473				if !ok || entry.RefreshLostContact.IsZero() {
474					return
475				}
476				entry.RefreshLostContact = time.Time{}
477				c.entries[key] = entry
478			})
479		}
480
481		fOpts := FetchOptions{}
482		if tEntry.Type.SupportsBlocking() {
483			fOpts.MinIndex = entry.Index
484			fOpts.Timeout = tEntry.Opts.RefreshTimeout
485		}
486		if entry.Valid {
487			fOpts.LastResult = &FetchResult{
488				Value: entry.Value,
489				State: entry.State,
490				Index: entry.Index,
491			}
492		}
493
494		// Start building the new entry by blocking on the fetch.
495		result, err := tEntry.Type.Fetch(fOpts, r)
496		if connectedTimer != nil {
497			connectedTimer.Stop()
498		}
499
500		// Copy the existing entry to start.
501		newEntry := entry
502		newEntry.Fetching = false
503
504		// Importantly, always reset the Error. Having both Error and a Value that
505		// are non-nil is allowed in the cache entry but it indicates that the Error
506		// is _newer_ than the last good value. So if the err is nil then we need to
507		// reset to replace any _older_ errors and avoid them bubbling up. If the
508		// error is non-nil then we need to set it anyway and used to do it in the
509		// code below. See https://github.com/hashicorp/consul/issues/4480.
510		newEntry.Error = err
511
512		if result.Value != nil {
513			// A new value was given, so we create a brand new entry.
514			newEntry.Value = result.Value
515			newEntry.State = result.State
516			newEntry.Index = result.Index
517			newEntry.FetchedAt = time.Now()
518			if newEntry.Index < 1 {
519				// Less than one is invalid unless there was an error and in this case
520				// there wasn't since a value was returned. If a badly behaved RPC
521				// returns 0 when it has no data, we might get into a busy loop here. We
522				// set this to minimum of 1 which is safe because no valid user data can
523				// ever be written at raft index 1 due to the bootstrap process for
524				// raft. This insure that any subsequent background refresh request will
525				// always block, but allows the initial request to return immediately
526				// even if there is no data.
527				newEntry.Index = 1
528			}
529
530			// This is a valid entry with a result
531			newEntry.Valid = true
532		} else if result.State != nil && err == nil {
533			// Also set state if it's non-nil but Value is nil. This is important in the
534			// case we are returning nil due to a timeout or a transient error like rate
535			// limiting that we want to mask from the user - there is no result yet but
536			// we want to manage retrying internally before we return an error to user.
537			// The retrying state is in State so we need to still update that in the
538			// entry even if we don't have an actual result yet (e.g. hit a rate limit
539			// on first request for a leaf certificate).
540			newEntry.State = result.State
541		}
542
543		// Error handling
544		if err == nil {
545			metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
546			metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
547
548			if result.Index > 0 {
549				// Reset the attempts counter so we don't have any backoff
550				attempt = 0
551			} else {
552				// Result having a zero index is an implicit error case. There was no
553				// actual error but it implies the RPC found in index (nothing written
554				// yet for that type) but didn't take care to return safe "1" index. We
555				// don't want to actually treat it like an error by setting
556				// newEntry.Error to something non-nil, but we should guard against 100%
557				// CPU burn hot loops caused by that case which will never block but
558				// also won't backoff either. So we treat it as a failed attempt so that
559				// at least the failure backoff will save our CPU while still
560				// periodically refreshing so normal service can resume when the servers
561				// actually have something to return from the RPC. If we get in this
562				// state it can be considered a bug in the RPC implementation (to ever
563				// return a zero index) however since it can happen this is a safety net
564				// for the future.
565				attempt++
566			}
567
568			// If we have refresh active, this successful response means cache is now
569			// "connected" and should not be stale. Reset the lost contact timer.
570			if tEntry.Opts.Refresh {
571				newEntry.RefreshLostContact = time.Time{}
572			}
573		} else {
574			metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
575			metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
576
577			// Increment attempt counter
578			attempt++
579
580			// If we are refreshing and just failed, updated the lost contact time as
581			// our cache will be stale until we get successfully reconnected. We only
582			// set this on the first failure (if it's zero) so we can track how long
583			// it's been since we had a valid connection/up-to-date view of the state.
584			if tEntry.Opts.Refresh && newEntry.RefreshLostContact.IsZero() {
585				newEntry.RefreshLostContact = time.Now()
586			}
587		}
588
589		// Create a new waiter that will be used for the next fetch.
590		newEntry.Waiter = make(chan struct{})
591
592		// Set our entry
593		c.entriesLock.Lock()
594
595		// If this is a new entry (not in the heap yet), then setup the
596		// initial expiry information and insert. If we're already in
597		// the heap we do nothing since we're reusing the same entry.
598		if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
599			newEntry.Expiry = &cacheEntryExpiry{
600				Key: key,
601				TTL: tEntry.Opts.LastGetTTL,
602			}
603			newEntry.Expiry.Reset()
604			heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
605		}
606
607		c.entries[key] = newEntry
608		c.entriesLock.Unlock()
609
610		// Trigger the old waiter
611		close(entry.Waiter)
612
613		// If refresh is enabled, run the refresh in due time. The refresh
614		// below might block, but saves us from spawning another goroutine.
615		if tEntry.Opts.Refresh {
616			c.refresh(tEntry.Opts, attempt, t, key, r)
617		}
618	}()
619
620	return entry.Waiter, nil
621}
622
623// fetchDirect fetches the given request with no caching. Because this
624// bypasses the caching entirely, multiple matching requests will result
625// in multiple actual RPC calls (unlike fetch).
626func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
627	// Get the type that we're fetching
628	c.typesLock.RLock()
629	tEntry, ok := c.types[t]
630	c.typesLock.RUnlock()
631	if !ok {
632		return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
633	}
634
635	// Fetch it with the min index specified directly by the request.
636	result, err := tEntry.Type.Fetch(FetchOptions{
637		MinIndex: minIndex,
638	}, r)
639	if err != nil {
640		return nil, ResultMeta{}, err
641	}
642
643	// Return the result and ignore the rest
644	return result.Value, ResultMeta{}, nil
645}
646
647func backOffWait(failures uint) time.Duration {
648	if failures > CacheRefreshBackoffMin {
649		shift := failures - CacheRefreshBackoffMin
650		waitTime := CacheRefreshMaxWait
651		if shift < 31 {
652			waitTime = (1 << shift) * time.Second
653		}
654		if waitTime > CacheRefreshMaxWait {
655			waitTime = CacheRefreshMaxWait
656		}
657		return waitTime + lib.RandomStagger(waitTime)
658	}
659	return 0
660}
661
662// refresh triggers a fetch for a specific Request according to the
663// registration options.
664func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) {
665	// Sanity-check, we should not schedule anything that has refresh disabled
666	if !opts.Refresh {
667		return
668	}
669	// Check if cache was stopped
670	if atomic.LoadUint32(&c.stopped) == 1 {
671		return
672	}
673
674	// If we're over the attempt minimum, start an exponential backoff.
675	if wait := backOffWait(attempt); wait > 0 {
676		time.Sleep(wait)
677	}
678
679	// If we have a timer, wait for it
680	if opts.RefreshTimer > 0 {
681		time.Sleep(opts.RefreshTimer)
682	}
683
684	// Trigger. The "allowNew" field is false because in the time we were
685	// waiting to refresh we may have expired and got evicted. If that
686	// happened, we don't want to create a new entry.
687	c.fetch(t, key, r, false, attempt, 0, true, true)
688}
689
690// runExpiryLoop is a blocking function that watches the expiration
691// heap and invalidates entries that have expired.
692func (c *Cache) runExpiryLoop() {
693	var expiryTimer *time.Timer
694	for {
695		// If we have a previous timer, stop it.
696		if expiryTimer != nil {
697			expiryTimer.Stop()
698		}
699
700		// Get the entry expiring soonest
701		var entry *cacheEntryExpiry
702		var expiryCh <-chan time.Time
703		c.entriesLock.RLock()
704		if len(c.entriesExpiryHeap.Entries) > 0 {
705			entry = c.entriesExpiryHeap.Entries[0]
706			expiryTimer = time.NewTimer(time.Until(entry.Expires))
707			expiryCh = expiryTimer.C
708		}
709		c.entriesLock.RUnlock()
710
711		select {
712		case <-c.stopCh:
713			return
714		case <-c.entriesExpiryHeap.NotifyCh:
715			// Entries changed, so the heap may have changed. Restart loop.
716
717		case <-expiryCh:
718			c.entriesLock.Lock()
719
720			// Entry expired! Remove it.
721			delete(c.entries, entry.Key)
722			heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
723
724			// This is subtle but important: if we race and simultaneously
725			// evict and fetch a new value, then we set this to -1 to
726			// have it treated as a new value so that the TTL is extended.
727			entry.HeapIndex = -1
728
729			// Set some metrics
730			metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
731			metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
732
733			c.entriesLock.Unlock()
734		}
735	}
736}
737
738// Close stops any background work and frees all resources for the cache.
739// Current Fetch requests are allowed to continue to completion and callers may
740// still access the current cache values so coordination isn't needed with
741// callers, however no background activity will continue. It's intended to close
742// the cache at agent shutdown so no further requests should be made, however
743// concurrent or in-flight ones won't break.
744func (c *Cache) Close() error {
745	wasStopped := atomic.SwapUint32(&c.stopped, 1)
746	if wasStopped == 0 {
747		// First time only, close stop chan
748		close(c.stopCh)
749	}
750	return nil
751}
752
753// Prepopulate puts something in the cache manually. This is useful when the
754// correct initial value is know and the cache shouldn't refetch the same thing
755// on startup. It is used to set the ConnectRootCA and AgentLeafCert when
756// AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first
757// time because it requires a special RPCType. Subsequent runs are fine though.
758func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error {
759	// Check the type that we're prepolulating
760	c.typesLock.RLock()
761	tEntry, ok := c.types[t]
762	c.typesLock.RUnlock()
763	if !ok {
764		return fmt.Errorf("unknown type in cache: %s", t)
765	}
766	key := makeEntryKey(t, dc, token, k)
767	newEntry := cacheEntry{
768		Valid: true, Value: res.Value, State: res.State, Index: res.Index,
769		FetchedAt: time.Now(), Waiter: make(chan struct{}),
770		Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL},
771	}
772	c.entriesLock.Lock()
773	c.entries[key] = newEntry
774	c.entriesLock.Unlock()
775	return nil
776}
777