1package tsm1 2 3import ( 4 "fmt" 5 "math" 6 "os" 7 "sync" 8 "sync/atomic" 9 "time" 10 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxdb/tsdb" 13 "github.com/influxdata/influxql" 14 "go.uber.org/zap" 15) 16 17// ringShards specifies the number of partitions that the hash ring used to 18// store the entry mappings contains. It must be a power of 2. From empirical 19// testing, a value above the number of cores on the machine does not provide 20// any additional benefit. For now we'll set it to the number of cores on the 21// largest box we could imagine running influx. 22const ringShards = 16 23 24var ( 25 // ErrSnapshotInProgress is returned if a snapshot is attempted while one is already running. 26 ErrSnapshotInProgress = fmt.Errorf("snapshot in progress") 27) 28 29// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation 30// could not be completed due to exceeding the cache-max-memory-size setting. 31func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error { 32 return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit) 33} 34 35// entry is a set of values and some metadata. 36type entry struct { 37 mu sync.RWMutex 38 values Values // All stored values. 39 40 // The type of values stored. Read only so doesn't need to be protected by 41 // mu. 42 vtype byte 43} 44 45// newEntryValues returns a new instance of entry with the given values. If the 46// values are not valid, an error is returned. 47func newEntryValues(values []Value) (*entry, error) { 48 e := &entry{} 49 e.values = make(Values, 0, len(values)) 50 e.values = append(e.values, values...) 51 52 // No values, don't check types and ordering 53 if len(values) == 0 { 54 return e, nil 55 } 56 57 et := valueType(values[0]) 58 for _, v := range values { 59 // Make sure all the values are the same type 60 if et != valueType(v) { 61 return nil, tsdb.ErrFieldTypeConflict 62 } 63 } 64 65 // Set the type of values stored. 66 e.vtype = et 67 68 return e, nil 69} 70 71// add adds the given values to the entry. 72func (e *entry) add(values []Value) error { 73 if len(values) == 0 { 74 return nil // Nothing to do. 75 } 76 77 // Are any of the new values the wrong type? 78 if e.vtype != 0 { 79 for _, v := range values { 80 if e.vtype != valueType(v) { 81 return tsdb.ErrFieldTypeConflict 82 } 83 } 84 } 85 86 // entry currently has no values, so add the new ones and we're done. 87 e.mu.Lock() 88 if len(e.values) == 0 { 89 e.values = values 90 e.vtype = valueType(values[0]) 91 e.mu.Unlock() 92 return nil 93 } 94 95 // Append the new values to the existing ones... 96 e.values = append(e.values, values...) 97 e.mu.Unlock() 98 return nil 99} 100 101// deduplicate sorts and orders the entry's values. If values are already deduped and sorted, 102// the function does no work and simply returns. 103func (e *entry) deduplicate() { 104 e.mu.Lock() 105 defer e.mu.Unlock() 106 107 if len(e.values) <= 1 { 108 return 109 } 110 e.values = e.values.Deduplicate() 111} 112 113// count returns the number of values in this entry. 114func (e *entry) count() int { 115 e.mu.RLock() 116 n := len(e.values) 117 e.mu.RUnlock() 118 return n 119} 120 121// filter removes all values with timestamps between min and max inclusive. 122func (e *entry) filter(min, max int64) { 123 e.mu.Lock() 124 if len(e.values) > 1 { 125 e.values = e.values.Deduplicate() 126 } 127 e.values = e.values.Exclude(min, max) 128 e.mu.Unlock() 129} 130 131// size returns the size of this entry in bytes. 132func (e *entry) size() int { 133 e.mu.RLock() 134 sz := e.values.Size() 135 e.mu.RUnlock() 136 return sz 137} 138 139// InfluxQLType returns for the entry the data type of its values. 140func (e *entry) InfluxQLType() (influxql.DataType, error) { 141 e.mu.RLock() 142 defer e.mu.RUnlock() 143 return e.values.InfluxQLType() 144} 145 146// Statistics gathered by the Cache. 147const ( 148 // levels - point in time measures 149 150 statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes 151 statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes 152 statSnapshots = "snapshotCount" // level: Number of active snapshots. 153 statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time 154 155 // counters - accumulative measures 156 157 statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots. 158 statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots 159 160 statCacheWriteOK = "writeOk" 161 statCacheWriteErr = "writeErr" 162 statCacheWriteDropped = "writeDropped" 163) 164 165// storer is the interface that descibes a cache's store. 166type storer interface { 167 entry(key []byte) *entry // Get an entry by its key. 168 write(key []byte, values Values) (bool, error) // Write an entry to the store. 169 add(key []byte, entry *entry) // Add a new entry to the store. 170 remove(key []byte) // Remove an entry from the store. 171 keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys. 172 apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel. 173 applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial. 174 reset() // Reset the store to an initial unused state. 175 split(n int) []storer // Split splits the store into n stores 176 count() int // Count returns the number of keys in the store 177} 178 179// Cache maintains an in-memory store of Values for a set of keys. 180type Cache struct { 181 // Due to a bug in atomic size needs to be the first word in the struct, as 182 // that's the only place where you're guaranteed to be 64-bit aligned on a 183 // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG 184 size uint64 185 snapshotSize uint64 186 187 mu sync.RWMutex 188 store storer 189 maxSize uint64 190 191 // snapshots are the cache objects that are currently being written to tsm files 192 // they're kept in memory while flushing so they can be queried along with the cache. 193 // they are read only and should never be modified 194 snapshot *Cache 195 snapshotting bool 196 197 // This number is the number of pending or failed WriteSnaphot attempts since the last successful one. 198 snapshotAttempts int 199 200 stats *CacheStatistics 201 lastSnapshot time.Time 202 lastWriteTime time.Time 203 204 // A one time synchronization used to initial the cache with a store. Since the store can allocate a 205 // a large amount memory across shards, we lazily create it. 206 initialize atomic.Value 207 initializedCount uint32 208} 209 210// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. 211// Only used for engine caches, never for snapshots. 212func NewCache(maxSize uint64) *Cache { 213 c := &Cache{ 214 maxSize: maxSize, 215 store: emptyStore{}, 216 stats: &CacheStatistics{}, 217 lastSnapshot: time.Now(), 218 } 219 c.initialize.Store(&sync.Once{}) 220 c.UpdateAge() 221 c.UpdateCompactTime(0) 222 c.updateCachedBytes(0) 223 c.updateMemSize(0) 224 c.updateSnapshots() 225 return c 226} 227 228// CacheStatistics hold statistics related to the cache. 229type CacheStatistics struct { 230 MemSizeBytes int64 231 DiskSizeBytes int64 232 SnapshotCount int64 233 CacheAgeMs int64 234 CachedBytes int64 235 WALCompactionTimeMs int64 236 WriteOK int64 237 WriteErr int64 238 WriteDropped int64 239} 240 241// Statistics returns statistics for periodic monitoring. 242func (c *Cache) Statistics(tags map[string]string) []models.Statistic { 243 return []models.Statistic{{ 244 Name: "tsm1_cache", 245 Tags: tags, 246 Values: map[string]interface{}{ 247 statCacheMemoryBytes: atomic.LoadInt64(&c.stats.MemSizeBytes), 248 statCacheDiskBytes: atomic.LoadInt64(&c.stats.DiskSizeBytes), 249 statSnapshots: atomic.LoadInt64(&c.stats.SnapshotCount), 250 statCacheAgeMs: atomic.LoadInt64(&c.stats.CacheAgeMs), 251 statCachedBytes: atomic.LoadInt64(&c.stats.CachedBytes), 252 statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs), 253 statCacheWriteOK: atomic.LoadInt64(&c.stats.WriteOK), 254 statCacheWriteErr: atomic.LoadInt64(&c.stats.WriteErr), 255 statCacheWriteDropped: atomic.LoadInt64(&c.stats.WriteDropped), 256 }, 257 }} 258} 259 260// init initializes the cache and allocates the underlying store. Once initialized, 261// the store re-used until Freed. 262func (c *Cache) init() { 263 if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) { 264 return 265 } 266 267 c.mu.Lock() 268 c.store, _ = newring(ringShards) 269 c.mu.Unlock() 270} 271 272// Free releases the underlying store and memory held by the Cache. 273func (c *Cache) Free() { 274 if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) { 275 return 276 } 277 278 c.mu.Lock() 279 c.store = emptyStore{} 280 c.mu.Unlock() 281} 282 283// Write writes the set of values for the key to the cache. This function is goroutine-safe. 284// It returns an error if the cache will exceed its max size by adding the new values. 285func (c *Cache) Write(key []byte, values []Value) error { 286 c.init() 287 addedSize := uint64(Values(values).Size()) 288 289 // Enough room in the cache? 290 limit := c.maxSize 291 n := c.Size() + addedSize 292 293 if limit > 0 && n > limit { 294 atomic.AddInt64(&c.stats.WriteErr, 1) 295 return ErrCacheMemorySizeLimitExceeded(n, limit) 296 } 297 298 newKey, err := c.store.write(key, values) 299 if err != nil { 300 atomic.AddInt64(&c.stats.WriteErr, 1) 301 return err 302 } 303 304 if newKey { 305 addedSize += uint64(len(key)) 306 } 307 // Update the cache size and the memory size stat. 308 c.increaseSize(addedSize) 309 c.updateMemSize(int64(addedSize)) 310 atomic.AddInt64(&c.stats.WriteOK, 1) 311 312 return nil 313} 314 315// WriteMulti writes the map of keys and associated values to the cache. This 316// function is goroutine-safe. It returns an error if the cache will exceeded 317// its max size by adding the new values. The write attempts to write as many 318// values as possible. If one key fails, the others can still succeed and an 319// error will be returned. 320func (c *Cache) WriteMulti(values map[string][]Value) error { 321 c.init() 322 var addedSize uint64 323 for _, v := range values { 324 addedSize += uint64(Values(v).Size()) 325 } 326 327 // Enough room in the cache? 328 limit := c.maxSize // maxSize is safe for reading without a lock. 329 n := c.Size() + addedSize 330 if limit > 0 && n > limit { 331 atomic.AddInt64(&c.stats.WriteErr, 1) 332 return ErrCacheMemorySizeLimitExceeded(n, limit) 333 } 334 335 var werr error 336 c.mu.RLock() 337 store := c.store 338 c.mu.RUnlock() 339 340 // We'll optimistially set size here, and then decrement it for write errors. 341 c.increaseSize(addedSize) 342 for k, v := range values { 343 newKey, err := store.write([]byte(k), v) 344 if err != nil { 345 // The write failed, hold onto the error and adjust the size delta. 346 werr = err 347 addedSize -= uint64(Values(v).Size()) 348 c.decreaseSize(uint64(Values(v).Size())) 349 } 350 if newKey { 351 addedSize += uint64(len(k)) 352 c.increaseSize(uint64(len(k))) 353 } 354 } 355 356 // Some points in the batch were dropped. An error is returned so 357 // error stat is incremented as well. 358 if werr != nil { 359 atomic.AddInt64(&c.stats.WriteDropped, 1) 360 atomic.AddInt64(&c.stats.WriteErr, 1) 361 } 362 363 // Update the memory size stat 364 c.updateMemSize(int64(addedSize)) 365 atomic.AddInt64(&c.stats.WriteOK, 1) 366 367 c.mu.Lock() 368 c.lastWriteTime = time.Now() 369 c.mu.Unlock() 370 371 return werr 372} 373 374// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that 375// are being flushed, and resets the current cache with new values. 376func (c *Cache) Snapshot() (*Cache, error) { 377 c.init() 378 379 c.mu.Lock() 380 defer c.mu.Unlock() 381 382 if c.snapshotting { 383 return nil, ErrSnapshotInProgress 384 } 385 386 c.snapshotting = true 387 c.snapshotAttempts++ // increment the number of times we tried to do this 388 389 // If no snapshot exists, create a new one, otherwise update the existing snapshot 390 if c.snapshot == nil { 391 store, err := newring(ringShards) 392 if err != nil { 393 return nil, err 394 } 395 396 c.snapshot = &Cache{ 397 store: store, 398 } 399 } 400 401 // Did a prior snapshot exist that failed? If so, return the existing 402 // snapshot to retry. 403 if c.snapshot.Size() > 0 { 404 return c.snapshot, nil 405 } 406 407 c.snapshot.store, c.store = c.store, c.snapshot.store 408 snapshotSize := c.Size() 409 410 // Save the size of the snapshot on the snapshot cache 411 atomic.StoreUint64(&c.snapshot.size, snapshotSize) 412 // Save the size of the snapshot on the live cache 413 atomic.StoreUint64(&c.snapshotSize, snapshotSize) 414 415 // Reset the cache's store. 416 c.store.reset() 417 atomic.StoreUint64(&c.size, 0) 418 c.lastSnapshot = time.Now() 419 420 c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot 421 c.updateSnapshots() 422 423 return c.snapshot, nil 424} 425 426// Deduplicate sorts the snapshot before returning it. The compactor and any queries 427// coming in while it writes will need the values sorted. 428func (c *Cache) Deduplicate() { 429 c.mu.RLock() 430 store := c.store 431 c.mu.RUnlock() 432 433 // Apply a function that simply calls deduplicate on each entry in the ring. 434 // apply cannot return an error in this invocation. 435 _ = store.apply(func(_ []byte, e *entry) error { e.deduplicate(); return nil }) 436} 437 438// ClearSnapshot removes the snapshot cache from the list of flushing caches and 439// adjusts the size. 440func (c *Cache) ClearSnapshot(success bool) { 441 c.init() 442 443 c.mu.RLock() 444 snapStore := c.snapshot.store 445 c.mu.RUnlock() 446 447 // reset the snapshot store outside of the write lock 448 if success { 449 snapStore.reset() 450 } 451 452 c.mu.Lock() 453 defer c.mu.Unlock() 454 455 c.snapshotting = false 456 457 if success { 458 c.snapshotAttempts = 0 459 c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache 460 461 // Reset the snapshot to a fresh Cache. 462 c.snapshot = &Cache{ 463 store: c.snapshot.store, 464 } 465 466 atomic.StoreUint64(&c.snapshotSize, 0) 467 c.updateSnapshots() 468 } 469} 470 471// Size returns the number of point-calcuated bytes the cache currently uses. 472func (c *Cache) Size() uint64 { 473 return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize) 474} 475 476// increaseSize increases size by delta. 477func (c *Cache) increaseSize(delta uint64) { 478 atomic.AddUint64(&c.size, delta) 479} 480 481// decreaseSize decreases size by delta. 482func (c *Cache) decreaseSize(delta uint64) { 483 // Per sync/atomic docs, bit-flip delta minus one to perform subtraction within AddUint64. 484 atomic.AddUint64(&c.size, ^(delta - 1)) 485} 486 487// MaxSize returns the maximum number of bytes the cache may consume. 488func (c *Cache) MaxSize() uint64 { 489 return c.maxSize 490} 491 492func (c *Cache) Count() int { 493 c.mu.RLock() 494 n := c.store.count() 495 c.mu.RUnlock() 496 return n 497} 498 499// Keys returns a sorted slice of all keys under management by the cache. 500func (c *Cache) Keys() [][]byte { 501 c.mu.RLock() 502 store := c.store 503 c.mu.RUnlock() 504 return store.keys(true) 505} 506 507func (c *Cache) Split(n int) []*Cache { 508 if n == 1 { 509 return []*Cache{c} 510 } 511 512 caches := make([]*Cache, n) 513 storers := c.store.split(n) 514 for i := 0; i < n; i++ { 515 caches[i] = &Cache{ 516 store: storers[i], 517 } 518 } 519 return caches 520} 521 522// Type returns the series type for a key. 523func (c *Cache) Type(key []byte) (models.FieldType, error) { 524 c.mu.RLock() 525 e := c.store.entry(key) 526 if e == nil && c.snapshot != nil { 527 e = c.snapshot.store.entry(key) 528 } 529 c.mu.RUnlock() 530 531 if e != nil { 532 typ, err := e.InfluxQLType() 533 if err != nil { 534 return models.Empty, tsdb.ErrUnknownFieldType 535 } 536 537 switch typ { 538 case influxql.Float: 539 return models.Float, nil 540 case influxql.Integer: 541 return models.Integer, nil 542 case influxql.Unsigned: 543 return models.Unsigned, nil 544 case influxql.Boolean: 545 return models.Boolean, nil 546 case influxql.String: 547 return models.String, nil 548 } 549 } 550 551 return models.Empty, tsdb.ErrUnknownFieldType 552} 553 554// Values returns a copy of all values, deduped and sorted, for the given key. 555func (c *Cache) Values(key []byte) Values { 556 var snapshotEntries *entry 557 558 c.mu.RLock() 559 e := c.store.entry(key) 560 if c.snapshot != nil { 561 snapshotEntries = c.snapshot.store.entry(key) 562 } 563 c.mu.RUnlock() 564 565 if e == nil { 566 if snapshotEntries == nil { 567 // No values in hot cache or snapshots. 568 return nil 569 } 570 } else { 571 e.deduplicate() 572 } 573 574 // Build the sequence of entries that will be returned, in the correct order. 575 // Calculate the required size of the destination buffer. 576 var entries []*entry 577 sz := 0 578 579 if snapshotEntries != nil { 580 snapshotEntries.deduplicate() // guarantee we are deduplicated 581 entries = append(entries, snapshotEntries) 582 sz += snapshotEntries.count() 583 } 584 585 if e != nil { 586 entries = append(entries, e) 587 sz += e.count() 588 } 589 590 // Any entries? If not, return. 591 if sz == 0 { 592 return nil 593 } 594 595 // Create the buffer, and copy all hot values and snapshots. Individual 596 // entries are sorted at this point, so now the code has to check if the 597 // resultant buffer will be sorted from start to finish. 598 values := make(Values, sz) 599 n := 0 600 for _, e := range entries { 601 e.mu.RLock() 602 n += copy(values[n:], e.values) 603 e.mu.RUnlock() 604 } 605 values = values[:n] 606 values = values.Deduplicate() 607 608 return values 609} 610 611// Delete removes all values for the given keys from the cache. 612func (c *Cache) Delete(keys [][]byte) { 613 c.DeleteRange(keys, math.MinInt64, math.MaxInt64) 614} 615 616// DeleteRange removes the values for all keys containing points 617// with timestamps between between min and max from the cache. 618// 619// TODO(edd): Lock usage could possibly be optimised if necessary. 620func (c *Cache) DeleteRange(keys [][]byte, min, max int64) { 621 c.init() 622 623 c.mu.Lock() 624 defer c.mu.Unlock() 625 626 for _, k := range keys { 627 // Make sure key exist in the cache, skip if it does not 628 e := c.store.entry(k) 629 if e == nil { 630 continue 631 } 632 633 origSize := uint64(e.size()) 634 if min == math.MinInt64 && max == math.MaxInt64 { 635 c.decreaseSize(origSize + uint64(len(k))) 636 c.store.remove(k) 637 continue 638 } 639 640 e.filter(min, max) 641 if e.count() == 0 { 642 c.store.remove(k) 643 c.decreaseSize(origSize + uint64(len(k))) 644 continue 645 } 646 647 c.decreaseSize(origSize - uint64(e.size())) 648 } 649 atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size())) 650} 651 652// SetMaxSize updates the memory limit of the cache. 653func (c *Cache) SetMaxSize(size uint64) { 654 c.mu.Lock() 655 c.maxSize = size 656 c.mu.Unlock() 657} 658 659// values returns the values for the key. It assumes the data is already sorted. 660// It doesn't lock the cache but it does read-lock the entry if there is one for the key. 661// values should only be used in compact.go in the CacheKeyIterator. 662func (c *Cache) values(key []byte) Values { 663 e := c.store.entry(key) 664 if e == nil { 665 return nil 666 } 667 e.mu.RLock() 668 v := e.values 669 e.mu.RUnlock() 670 return v 671} 672 673// ApplyEntryFn applies the function f to each entry in the Cache. 674// ApplyEntryFn calls f on each entry in turn, within the same goroutine. 675// It is safe for use by multiple goroutines. 676func (c *Cache) ApplyEntryFn(f func(key []byte, entry *entry) error) error { 677 c.mu.RLock() 678 store := c.store 679 c.mu.RUnlock() 680 return store.applySerial(f) 681} 682 683// CacheLoader processes a set of WAL segment files, and loads a cache with the data 684// contained within those files. Processing of the supplied files take place in the 685// order they exist in the files slice. 686type CacheLoader struct { 687 files []string 688 689 Logger *zap.Logger 690} 691 692// NewCacheLoader returns a new instance of a CacheLoader. 693func NewCacheLoader(files []string) *CacheLoader { 694 return &CacheLoader{ 695 files: files, 696 Logger: zap.NewNop(), 697 } 698} 699 700// Load returns a cache loaded with the data contained within the segment files. 701// If, during reading of a segment file, corruption is encountered, that segment 702// file is truncated up to and including the last valid byte, and processing 703// continues with the next segment file. 704func (cl *CacheLoader) Load(cache *Cache) error { 705 706 var r *WALSegmentReader 707 for _, fn := range cl.files { 708 if err := func() error { 709 f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) 710 if err != nil { 711 return err 712 } 713 defer f.Close() 714 715 // Log some information about the segments. 716 stat, err := os.Stat(f.Name()) 717 if err != nil { 718 return err 719 } 720 cl.Logger.Info("Reading file", zap.String("path", f.Name()), zap.Int64("size", stat.Size())) 721 722 // Nothing to read, skip it 723 if stat.Size() == 0 { 724 return nil 725 } 726 727 if r == nil { 728 r = NewWALSegmentReader(f) 729 defer r.Close() 730 } else { 731 r.Reset(f) 732 } 733 734 for r.Next() { 735 entry, err := r.Read() 736 if err != nil { 737 n := r.Count() 738 cl.Logger.Info("File corrupt", zap.Error(err), zap.String("path", f.Name()), zap.Int64("pos", n)) 739 if err := f.Truncate(n); err != nil { 740 return err 741 } 742 break 743 } 744 745 switch t := entry.(type) { 746 case *WriteWALEntry: 747 if err := cache.WriteMulti(t.Values); err != nil { 748 return err 749 } 750 case *DeleteRangeWALEntry: 751 cache.DeleteRange(t.Keys, t.Min, t.Max) 752 case *DeleteWALEntry: 753 cache.Delete(t.Keys) 754 } 755 } 756 757 return r.Close() 758 }(); err != nil { 759 return err 760 } 761 } 762 return nil 763} 764 765// WithLogger sets the logger on the CacheLoader. 766func (cl *CacheLoader) WithLogger(log *zap.Logger) { 767 cl.Logger = log.With(zap.String("service", "cacheloader")) 768} 769 770func (c *Cache) LastWriteTime() time.Time { 771 c.mu.RLock() 772 defer c.mu.RUnlock() 773 return c.lastWriteTime 774} 775 776// UpdateAge updates the age statistic based on the current time. 777func (c *Cache) UpdateAge() { 778 c.mu.RLock() 779 defer c.mu.RUnlock() 780 ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond) 781 atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat) 782} 783 784// UpdateCompactTime updates WAL compaction time statistic based on d. 785func (c *Cache) UpdateCompactTime(d time.Duration) { 786 atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond)) 787} 788 789// updateCachedBytes increases the cachedBytes counter by b. 790func (c *Cache) updateCachedBytes(b uint64) { 791 atomic.AddInt64(&c.stats.CachedBytes, int64(b)) 792} 793 794// updateMemSize updates the memSize level by b. 795func (c *Cache) updateMemSize(b int64) { 796 atomic.AddInt64(&c.stats.MemSizeBytes, b) 797} 798 799func valueType(v Value) byte { 800 switch v.(type) { 801 case FloatValue: 802 return 1 803 case IntegerValue: 804 return 2 805 case StringValue: 806 return 3 807 case BooleanValue: 808 return 4 809 default: 810 return 0 811 } 812} 813 814// updateSnapshots updates the snapshotsCount and the diskSize levels. 815func (c *Cache) updateSnapshots() { 816 // Update disk stats 817 atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize))) 818 atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts)) 819} 820 821type emptyStore struct{} 822 823func (e emptyStore) entry(key []byte) *entry { return nil } 824func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil } 825func (e emptyStore) add(key []byte, entry *entry) {} 826func (e emptyStore) remove(key []byte) {} 827func (e emptyStore) keys(sorted bool) [][]byte { return nil } 828func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil } 829func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil } 830func (e emptyStore) reset() {} 831func (e emptyStore) split(n int) []storer { return nil } 832func (e emptyStore) count() int { return 0 } 833