1// Package cache provides caching features for data from a Consul server. 2// 3// While this is similar in some ways to the "agent/ae" package, a key 4// difference is that with anti-entropy, the agent is the authoritative 5// source so it resolves differences the server may have. With caching (this 6// package), the server is the authoritative source and we do our best to 7// balance performance and correctness, depending on the type of data being 8// requested. 9// 10// The types of data that can be cached is configurable via the Type interface. 11// This allows specialized behavior for certain types of data. Each type of 12// Consul data (CA roots, leaf certs, intentions, KV, catalog, etc.) will 13// have to be manually implemented. This usually is not much work, see 14// the "agent/cache-types" package. 15package cache 16 17import ( 18 "container/heap" 19 "fmt" 20 "sync" 21 "sync/atomic" 22 "time" 23 24 "github.com/armon/go-metrics" 25 "github.com/hashicorp/consul/lib" 26) 27 28//go:generate mockery -all -inpkg 29 30// Constants related to refresh backoff. We probably don't ever need to 31// make these configurable knobs since they primarily exist to lower load. 32const ( 33 CacheRefreshBackoffMin = 3 // 3 attempts before backing off 34 CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time 35) 36 37// Cache is a agent-local cache of Consul data. Create a Cache using the 38// New function. A zero-value Cache is not ready for usage and will result 39// in a panic. 40// 41// The types of data to be cached must be registered via RegisterType. Then, 42// calls to Get specify the type and a Request implementation. The 43// implementation of Request is usually done directly on the standard RPC 44// struct in agent/structs. This API makes cache usage a mostly drop-in 45// replacement for non-cached RPC calls. 46// 47// The cache is partitioned by ACL and datacenter. This allows the cache 48// to be safe for multi-DC queries and for queries where the data is modified 49// due to ACLs all without the cache having to have any clever logic, at 50// the slight expense of a less perfect cache. 51// 52// The Cache exposes various metrics via go-metrics. Please view the source 53// searching for "metrics." to see the various metrics exposed. These can be 54// used to explore the performance of the cache. 55type Cache struct { 56 // types stores the list of data types that the cache knows how to service. 57 // These can be dynamically registered with RegisterType. 58 typesLock sync.RWMutex 59 types map[string]typeEntry 60 61 // entries contains the actual cache data. Access to entries and 62 // entriesExpiryHeap must be protected by entriesLock. 63 // 64 // entriesExpiryHeap is a heap of *cacheEntry values ordered by 65 // expiry, with the soonest to expire being first in the list (index 0). 66 // 67 // NOTE(mitchellh): The entry map key is currently a string in the format 68 // of "<DC>/<ACL token>/<Request key>" in order to properly partition 69 // requests to different datacenters and ACL tokens. This format has some 70 // big drawbacks: we can't evict by datacenter, ACL token, etc. For an 71 // initial implementation this works and the tests are agnostic to the 72 // internal storage format so changing this should be possible safely. 73 entriesLock sync.RWMutex 74 entries map[string]cacheEntry 75 entriesExpiryHeap *expiryHeap 76 77 // stopped is used as an atomic flag to signal that the Cache has been 78 // discarded so background fetches and expiry processing should stop. 79 stopped uint32 80 // stopCh is closed when Close is called 81 stopCh chan struct{} 82} 83 84// typeEntry is a single type that is registered with a Cache. 85type typeEntry struct { 86 Type Type 87 Opts *RegisterOptions 88} 89 90// ResultMeta is returned from Get calls along with the value and can be used 91// to expose information about the cache status for debugging or testing. 92type ResultMeta struct { 93 // Hit indicates whether or not the request was a cache hit 94 Hit bool 95 96 // Age identifies how "stale" the result is. It's semantics differ based on 97 // whether or not the cache type performs background refresh or not as defined 98 // in https://www.consul.io/api/index.html#agent-caching. 99 // 100 // For background refresh types, Age is 0 unless the background blocking query 101 // is currently in a failed state and so not keeping up with the server's 102 // values. If it is non-zero it represents the time since the first failure to 103 // connect during background refresh, and is reset after a background request 104 // does manage to reconnect and either return successfully, or block for at 105 // least the yamux keepalive timeout of 30 seconds (which indicates the 106 // connection is OK but blocked as expected). 107 // 108 // For simple cache types, Age is the time since the result being returned was 109 // fetched from the servers. 110 Age time.Duration 111 112 // Index is the internal ModifyIndex for the cache entry. Not all types 113 // support blocking and all that do will likely have this in their result type 114 // already but this allows generic code to reason about whether cache values 115 // have changed. 116 Index uint64 117} 118 119// Options are options for the Cache. 120type Options struct { 121 // Nothing currently, reserved. 122} 123 124// New creates a new cache with the given RPC client and reasonable defaults. 125// Further settings can be tweaked on the returned value. 126func New(*Options) *Cache { 127 // Initialize the heap. The buffer of 1 is really important because 128 // its possible for the expiry loop to trigger the heap to update 129 // itself and it'd block forever otherwise. 130 h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} 131 heap.Init(h) 132 133 c := &Cache{ 134 types: make(map[string]typeEntry), 135 entries: make(map[string]cacheEntry), 136 entriesExpiryHeap: h, 137 stopCh: make(chan struct{}), 138 } 139 140 // Start the expiry watcher 141 go c.runExpiryLoop() 142 143 return c 144} 145 146// RegisterOptions are options that can be associated with a type being 147// registered for the cache. This changes the behavior of the cache for 148// this type. 149type RegisterOptions struct { 150 // LastGetTTL is the time that the values returned by this type remain 151 // in the cache after the last get operation. If a value isn't accessed 152 // within this duration, the value is purged from the cache and 153 // background refreshing will cease. 154 LastGetTTL time.Duration 155 156 // Refresh configures whether the data is actively refreshed or if 157 // the data is only refreshed on an explicit Get. The default (false) 158 // is to only request data on explicit Get. 159 Refresh bool 160 161 // RefreshTimer is the time between attempting to refresh data. 162 // If this is zero, then data is refreshed immediately when a fetch 163 // is returned. 164 // 165 // RefreshTimeout determines the maximum query time for a refresh 166 // operation. This is specified as part of the query options and is 167 // expected to be implemented by the Type itself. 168 // 169 // Using these values, various "refresh" mechanisms can be implemented: 170 // 171 // * With a high timer duration and a low timeout, a timer-based 172 // refresh can be set that minimizes load on the Consul servers. 173 // 174 // * With a low timer and high timeout duration, a blocking-query-based 175 // refresh can be set so that changes in server data are recognized 176 // within the cache very quickly. 177 // 178 RefreshTimer time.Duration 179 RefreshTimeout time.Duration 180} 181 182// RegisterType registers a cacheable type. 183// 184// This makes the type available for Get but does not automatically perform 185// any prefetching. In order to populate the cache, Get must be called. 186func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { 187 if opts == nil { 188 opts = &RegisterOptions{} 189 } 190 if opts.LastGetTTL == 0 { 191 opts.LastGetTTL = 72 * time.Hour // reasonable default is days 192 } 193 194 c.typesLock.Lock() 195 defer c.typesLock.Unlock() 196 c.types[n] = typeEntry{Type: typ, Opts: opts} 197} 198 199// Get loads the data for the given type and request. If data satisfying the 200// minimum index is present in the cache, it is returned immediately. Otherwise, 201// this will block until the data is available or the request timeout is 202// reached. 203// 204// Multiple Get calls for the same Request (matching CacheKey value) will 205// block on a single network request. 206// 207// The timeout specified by the Request will be the timeout on the cache 208// Get, and does not correspond to the timeout of any background data 209// fetching. If the timeout is reached before data satisfying the minimum 210// index is retrieved, the last known value (maybe nil) is returned. No 211// error is returned on timeout. This matches the behavior of Consul blocking 212// queries. 213func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { 214 return c.getWithIndex(t, r, r.CacheInfo().MinIndex) 215} 216 217// getEntryLocked retrieves a cache entry and checks if it is ready to be 218// returned given the other parameters. It reads from entries and the caller 219// has to issue a read lock if necessary. 220func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) { 221 entry, ok := c.entries[key] 222 cacheHit := false 223 224 if !ok { 225 return ok, cacheHit, entry 226 } 227 228 // Check if we have a hit 229 cacheHit = ok && entry.Valid 230 231 supportsBlocking := tEntry.Type.SupportsBlocking() 232 233 // Check index is not specified or lower than value, or the type doesn't 234 // support blocking. 235 if cacheHit && supportsBlocking && 236 minIndex > 0 && minIndex >= entry.Index { 237 // MinIndex was given and matches or is higher than current value so we 238 // ignore the cache and fallthrough to blocking on a new value below. 239 cacheHit = false 240 } 241 242 // Check MaxAge is not exceeded if this is not a background refreshing type 243 // and MaxAge was specified. 244 if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 && 245 !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) { 246 cacheHit = false 247 } 248 249 // Check if we are requested to revalidate. If so the first time round the 250 // loop is not a hit but subsequent ones should be treated normally. 251 if cacheHit && !tEntry.Opts.Refresh && revalidate { 252 cacheHit = false 253 } 254 255 return ok, cacheHit, entry 256} 257 258// getWithIndex implements the main Get functionality but allows internal 259// callers (Watch) to manipulate the blocking index separately from the actual 260// request object. 261func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { 262 info := r.CacheInfo() 263 if info.Key == "" { 264 metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) 265 266 // If no key is specified, then we do not cache this request. 267 // Pass directly through to the backend. 268 return c.fetchDirect(t, r, minIndex) 269 } 270 271 // Get the actual key for our entry 272 key := c.entryKey(t, &info) 273 274 // First time through 275 first := true 276 277 // timeoutCh for watching our timeout 278 var timeoutCh <-chan time.Time 279 280RETRY_GET: 281 // Get the type that we're fetching 282 c.typesLock.RLock() 283 tEntry, ok := c.types[t] 284 c.typesLock.RUnlock() 285 if !ok { 286 // Shouldn't happen given that we successfully fetched this at least 287 // once. But be robust against panics. 288 return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) 289 } 290 291 // Get the current value 292 c.entriesLock.RLock() 293 _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) 294 c.entriesLock.RUnlock() 295 296 if cacheHit { 297 meta := ResultMeta{Index: entry.Index} 298 if first { 299 metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) 300 meta.Hit = true 301 } 302 303 // If refresh is enabled, calculate age based on whether the background 304 // routine is still connected. 305 if tEntry.Opts.Refresh { 306 meta.Age = time.Duration(0) 307 if !entry.RefreshLostContact.IsZero() { 308 meta.Age = time.Since(entry.RefreshLostContact) 309 } 310 } else { 311 // For non-background refresh types, the age is just how long since we 312 // fetched it last. 313 if !entry.FetchedAt.IsZero() { 314 meta.Age = time.Since(entry.FetchedAt) 315 } 316 } 317 318 // Touch the expiration and fix the heap. 319 c.entriesLock.Lock() 320 entry.Expiry.Reset() 321 c.entriesExpiryHeap.Fix(entry.Expiry) 322 c.entriesLock.Unlock() 323 324 // We purposely do not return an error here since the cache only works with 325 // fetching values that either have a value or have an error, but not both. 326 // The Error may be non-nil in the entry in the case that an error has 327 // occurred _since_ the last good value, but we still want to return the 328 // good value to clients that are not requesting a specific version. The 329 // effect of this is that blocking clients will all see an error immediately 330 // without waiting a whole timeout to see it, but clients that just look up 331 // cache with an older index than the last valid result will still see the 332 // result and not the error here. I.e. the error is not "cached" without a 333 // new fetch attempt occurring, but the last good value can still be fetched 334 // from cache. 335 return entry.Value, meta, nil 336 } 337 338 // If this isn't our first time through and our last value has an error, then 339 // we return the error. This has the behavior that we don't sit in a retry 340 // loop getting the same error for the entire duration of the timeout. 341 // Instead, we make one effort to fetch a new value, and if there was an 342 // error, we return. Note that the invariant is that if both entry.Value AND 343 // entry.Error are non-nil, the error _must_ be more recent than the Value. In 344 // other words valid fetches should reset the error. See 345 // https://github.com/hashicorp/consul/issues/4480. 346 if !first && entry.Error != nil { 347 return entry.Value, ResultMeta{Index: entry.Index}, entry.Error 348 } 349 350 if first { 351 // We increment two different counters for cache misses depending on 352 // whether we're missing because we didn't have the data at all, 353 // or if we're missing because we're blocking on a set index. 354 if minIndex == 0 { 355 metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1) 356 } else { 357 metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1) 358 } 359 } 360 361 // Set our timeout channel if we must 362 if info.Timeout > 0 && timeoutCh == nil { 363 timeoutCh = time.After(info.Timeout) 364 } 365 366 // At this point, we know we either don't have a value at all or the 367 // value we have is too old. We need to wait for new data. 368 waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first) 369 if err != nil { 370 return nil, ResultMeta{Index: entry.Index}, err 371 } 372 373 // No longer our first time through 374 first = false 375 376 select { 377 case <-waiterCh: 378 // Our fetch returned, retry the get from the cache. 379 goto RETRY_GET 380 381 case <-timeoutCh: 382 // Timeout on the cache read, just return whatever we have. 383 return entry.Value, ResultMeta{Index: entry.Index}, nil 384 } 385} 386 387// entryKey returns the key for the entry in the cache. See the note 388// about the entry key format in the structure docs for Cache. 389func (c *Cache) entryKey(t string, r *RequestInfo) string { 390 return makeEntryKey(t, r.Datacenter, r.Token, r.Key) 391} 392 393func makeEntryKey(t, dc, token, key string) string { 394 return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) 395} 396 397// fetch triggers a new background fetch for the given Request. If a 398// background fetch is already running for a matching Request, the waiter 399// channel for that request is returned. The effect of this is that there 400// is only ever one blocking query for any matching requests. 401// 402// If allowNew is true then the fetch should create the cache entry 403// if it doesn't exist. If this is false, then fetch will do nothing 404// if the entry doesn't exist. This latter case is to support refreshing. 405func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { 406 // Get the type that we're fetching 407 c.typesLock.RLock() 408 tEntry, ok := c.types[t] 409 c.typesLock.RUnlock() 410 if !ok { 411 return nil, fmt.Errorf("unknown type in cache: %s", t) 412 } 413 414 info := r.CacheInfo() 415 416 // We acquire a write lock because we may have to set Fetching to true. 417 c.entriesLock.Lock() 418 defer c.entriesLock.Unlock() 419 ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) 420 421 // This handles the case where a fetch succeeded after checking for its existence in 422 // getWithIndex. This ensures that we don't miss updates. 423 if ok && cacheHit && !ignoreExisting { 424 ch := make(chan struct{}) 425 close(ch) 426 return ch, nil 427 } 428 429 // If we aren't allowing new values and we don't have an existing value, 430 // return immediately. We return an immediately-closed channel so nothing 431 // blocks. 432 if !ok && !allowNew { 433 ch := make(chan struct{}) 434 close(ch) 435 return ch, nil 436 } 437 438 // If we already have an entry and it is actively fetching, then return 439 // the currently active waiter. 440 if ok && entry.Fetching { 441 return entry.Waiter, nil 442 } 443 444 // If we don't have an entry, then create it. The entry must be marked 445 // as invalid so that it isn't returned as a valid value for a zero index. 446 if !ok { 447 entry = cacheEntry{Valid: false, Waiter: make(chan struct{})} 448 } 449 450 // Set that we're fetching to true, which makes it so that future 451 // identical calls to fetch will return the same waiter rather than 452 // perform multiple fetches. 453 entry.Fetching = true 454 c.entries[key] = entry 455 metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) 456 457 // The actual Fetch must be performed in a goroutine. 458 go func() { 459 // If we have background refresh and currently are in "disconnected" state, 460 // waiting for a response might mean we mark our results as stale for up to 461 // 10 minutes (max blocking timeout) after connection is restored. To reduce 462 // that window, we assume that if the fetch takes more than 31 seconds then 463 // they are correctly blocking. We choose 31 seconds because yamux 464 // keepalives are every 30 seconds so the RPC should fail if the packets are 465 // being blackholed for more than 30 seconds. 466 var connectedTimer *time.Timer 467 if tEntry.Opts.Refresh && entry.Index > 0 && 468 tEntry.Opts.RefreshTimeout > (31*time.Second) { 469 connectedTimer = time.AfterFunc(31*time.Second, func() { 470 c.entriesLock.Lock() 471 defer c.entriesLock.Unlock() 472 entry, ok := c.entries[key] 473 if !ok || entry.RefreshLostContact.IsZero() { 474 return 475 } 476 entry.RefreshLostContact = time.Time{} 477 c.entries[key] = entry 478 }) 479 } 480 481 fOpts := FetchOptions{} 482 if tEntry.Type.SupportsBlocking() { 483 fOpts.MinIndex = entry.Index 484 fOpts.Timeout = tEntry.Opts.RefreshTimeout 485 } 486 if entry.Valid { 487 fOpts.LastResult = &FetchResult{ 488 Value: entry.Value, 489 State: entry.State, 490 Index: entry.Index, 491 } 492 } 493 494 // Start building the new entry by blocking on the fetch. 495 result, err := tEntry.Type.Fetch(fOpts, r) 496 if connectedTimer != nil { 497 connectedTimer.Stop() 498 } 499 500 // Copy the existing entry to start. 501 newEntry := entry 502 newEntry.Fetching = false 503 504 // Importantly, always reset the Error. Having both Error and a Value that 505 // are non-nil is allowed in the cache entry but it indicates that the Error 506 // is _newer_ than the last good value. So if the err is nil then we need to 507 // reset to replace any _older_ errors and avoid them bubbling up. If the 508 // error is non-nil then we need to set it anyway and used to do it in the 509 // code below. See https://github.com/hashicorp/consul/issues/4480. 510 newEntry.Error = err 511 512 if result.Value != nil { 513 // A new value was given, so we create a brand new entry. 514 newEntry.Value = result.Value 515 newEntry.State = result.State 516 newEntry.Index = result.Index 517 newEntry.FetchedAt = time.Now() 518 if newEntry.Index < 1 { 519 // Less than one is invalid unless there was an error and in this case 520 // there wasn't since a value was returned. If a badly behaved RPC 521 // returns 0 when it has no data, we might get into a busy loop here. We 522 // set this to minimum of 1 which is safe because no valid user data can 523 // ever be written at raft index 1 due to the bootstrap process for 524 // raft. This insure that any subsequent background refresh request will 525 // always block, but allows the initial request to return immediately 526 // even if there is no data. 527 newEntry.Index = 1 528 } 529 530 // This is a valid entry with a result 531 newEntry.Valid = true 532 } else if result.State != nil && err == nil { 533 // Also set state if it's non-nil but Value is nil. This is important in the 534 // case we are returning nil due to a timeout or a transient error like rate 535 // limiting that we want to mask from the user - there is no result yet but 536 // we want to manage retrying internally before we return an error to user. 537 // The retrying state is in State so we need to still update that in the 538 // entry even if we don't have an actual result yet (e.g. hit a rate limit 539 // on first request for a leaf certificate). 540 newEntry.State = result.State 541 } 542 543 // Error handling 544 if err == nil { 545 metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) 546 metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) 547 548 if result.Index > 0 { 549 // Reset the attempts counter so we don't have any backoff 550 attempt = 0 551 } else { 552 // Result having a zero index is an implicit error case. There was no 553 // actual error but it implies the RPC found in index (nothing written 554 // yet for that type) but didn't take care to return safe "1" index. We 555 // don't want to actually treat it like an error by setting 556 // newEntry.Error to something non-nil, but we should guard against 100% 557 // CPU burn hot loops caused by that case which will never block but 558 // also won't backoff either. So we treat it as a failed attempt so that 559 // at least the failure backoff will save our CPU while still 560 // periodically refreshing so normal service can resume when the servers 561 // actually have something to return from the RPC. If we get in this 562 // state it can be considered a bug in the RPC implementation (to ever 563 // return a zero index) however since it can happen this is a safety net 564 // for the future. 565 attempt++ 566 } 567 568 // If we have refresh active, this successful response means cache is now 569 // "connected" and should not be stale. Reset the lost contact timer. 570 if tEntry.Opts.Refresh { 571 newEntry.RefreshLostContact = time.Time{} 572 } 573 } else { 574 metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) 575 metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) 576 577 // Increment attempt counter 578 attempt++ 579 580 // If we are refreshing and just failed, updated the lost contact time as 581 // our cache will be stale until we get successfully reconnected. We only 582 // set this on the first failure (if it's zero) so we can track how long 583 // it's been since we had a valid connection/up-to-date view of the state. 584 if tEntry.Opts.Refresh && newEntry.RefreshLostContact.IsZero() { 585 newEntry.RefreshLostContact = time.Now() 586 } 587 } 588 589 // Create a new waiter that will be used for the next fetch. 590 newEntry.Waiter = make(chan struct{}) 591 592 // Set our entry 593 c.entriesLock.Lock() 594 595 // If this is a new entry (not in the heap yet), then setup the 596 // initial expiry information and insert. If we're already in 597 // the heap we do nothing since we're reusing the same entry. 598 if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { 599 newEntry.Expiry = &cacheEntryExpiry{ 600 Key: key, 601 TTL: tEntry.Opts.LastGetTTL, 602 } 603 newEntry.Expiry.Reset() 604 heap.Push(c.entriesExpiryHeap, newEntry.Expiry) 605 } 606 607 c.entries[key] = newEntry 608 c.entriesLock.Unlock() 609 610 // Trigger the old waiter 611 close(entry.Waiter) 612 613 // If refresh is enabled, run the refresh in due time. The refresh 614 // below might block, but saves us from spawning another goroutine. 615 if tEntry.Opts.Refresh { 616 c.refresh(tEntry.Opts, attempt, t, key, r) 617 } 618 }() 619 620 return entry.Waiter, nil 621} 622 623// fetchDirect fetches the given request with no caching. Because this 624// bypasses the caching entirely, multiple matching requests will result 625// in multiple actual RPC calls (unlike fetch). 626func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { 627 // Get the type that we're fetching 628 c.typesLock.RLock() 629 tEntry, ok := c.types[t] 630 c.typesLock.RUnlock() 631 if !ok { 632 return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) 633 } 634 635 // Fetch it with the min index specified directly by the request. 636 result, err := tEntry.Type.Fetch(FetchOptions{ 637 MinIndex: minIndex, 638 }, r) 639 if err != nil { 640 return nil, ResultMeta{}, err 641 } 642 643 // Return the result and ignore the rest 644 return result.Value, ResultMeta{}, nil 645} 646 647func backOffWait(failures uint) time.Duration { 648 if failures > CacheRefreshBackoffMin { 649 shift := failures - CacheRefreshBackoffMin 650 waitTime := CacheRefreshMaxWait 651 if shift < 31 { 652 waitTime = (1 << shift) * time.Second 653 } 654 if waitTime > CacheRefreshMaxWait { 655 waitTime = CacheRefreshMaxWait 656 } 657 return waitTime + lib.RandomStagger(waitTime) 658 } 659 return 0 660} 661 662// refresh triggers a fetch for a specific Request according to the 663// registration options. 664func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) { 665 // Sanity-check, we should not schedule anything that has refresh disabled 666 if !opts.Refresh { 667 return 668 } 669 // Check if cache was stopped 670 if atomic.LoadUint32(&c.stopped) == 1 { 671 return 672 } 673 674 // If we're over the attempt minimum, start an exponential backoff. 675 if wait := backOffWait(attempt); wait > 0 { 676 time.Sleep(wait) 677 } 678 679 // If we have a timer, wait for it 680 if opts.RefreshTimer > 0 { 681 time.Sleep(opts.RefreshTimer) 682 } 683 684 // Trigger. The "allowNew" field is false because in the time we were 685 // waiting to refresh we may have expired and got evicted. If that 686 // happened, we don't want to create a new entry. 687 c.fetch(t, key, r, false, attempt, 0, true, true) 688} 689 690// runExpiryLoop is a blocking function that watches the expiration 691// heap and invalidates entries that have expired. 692func (c *Cache) runExpiryLoop() { 693 var expiryTimer *time.Timer 694 for { 695 // If we have a previous timer, stop it. 696 if expiryTimer != nil { 697 expiryTimer.Stop() 698 } 699 700 // Get the entry expiring soonest 701 var entry *cacheEntryExpiry 702 var expiryCh <-chan time.Time 703 c.entriesLock.RLock() 704 if len(c.entriesExpiryHeap.Entries) > 0 { 705 entry = c.entriesExpiryHeap.Entries[0] 706 expiryTimer = time.NewTimer(time.Until(entry.Expires)) 707 expiryCh = expiryTimer.C 708 } 709 c.entriesLock.RUnlock() 710 711 select { 712 case <-c.stopCh: 713 return 714 case <-c.entriesExpiryHeap.NotifyCh: 715 // Entries changed, so the heap may have changed. Restart loop. 716 717 case <-expiryCh: 718 c.entriesLock.Lock() 719 720 // Entry expired! Remove it. 721 delete(c.entries, entry.Key) 722 heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) 723 724 // This is subtle but important: if we race and simultaneously 725 // evict and fetch a new value, then we set this to -1 to 726 // have it treated as a new value so that the TTL is extended. 727 entry.HeapIndex = -1 728 729 // Set some metrics 730 metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) 731 metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) 732 733 c.entriesLock.Unlock() 734 } 735 } 736} 737 738// Close stops any background work and frees all resources for the cache. 739// Current Fetch requests are allowed to continue to completion and callers may 740// still access the current cache values so coordination isn't needed with 741// callers, however no background activity will continue. It's intended to close 742// the cache at agent shutdown so no further requests should be made, however 743// concurrent or in-flight ones won't break. 744func (c *Cache) Close() error { 745 wasStopped := atomic.SwapUint32(&c.stopped, 1) 746 if wasStopped == 0 { 747 // First time only, close stop chan 748 close(c.stopCh) 749 } 750 return nil 751} 752 753// Prepopulate puts something in the cache manually. This is useful when the 754// correct initial value is know and the cache shouldn't refetch the same thing 755// on startup. It is used to set the ConnectRootCA and AgentLeafCert when 756// AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first 757// time because it requires a special RPCType. Subsequent runs are fine though. 758func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error { 759 // Check the type that we're prepolulating 760 c.typesLock.RLock() 761 tEntry, ok := c.types[t] 762 c.typesLock.RUnlock() 763 if !ok { 764 return fmt.Errorf("unknown type in cache: %s", t) 765 } 766 key := makeEntryKey(t, dc, token, k) 767 newEntry := cacheEntry{ 768 Valid: true, Value: res.Value, State: res.State, Index: res.Index, 769 FetchedAt: time.Now(), Waiter: make(chan struct{}), 770 Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL}, 771 } 772 c.entriesLock.Lock() 773 c.entries[key] = newEntry 774 c.entriesLock.Unlock() 775 return nil 776} 777