1package tsm1
2
3import (
4	"fmt"
5	"math"
6	"os"
7	"sync"
8	"sync/atomic"
9	"time"
10
11	"github.com/influxdata/influxdb/models"
12	"github.com/influxdata/influxdb/tsdb"
13	"github.com/influxdata/influxql"
14	"go.uber.org/zap"
15)
16
17// ringShards specifies the number of partitions that the hash ring used to
18// store the entry mappings contains. It must be a power of 2. From empirical
19// testing, a value above the number of cores on the machine does not provide
20// any additional benefit. For now we'll set it to the number of cores on the
21// largest box we could imagine running influx.
22const ringShards = 16
23
24var (
25	// ErrSnapshotInProgress is returned if a snapshot is attempted while one is already running.
26	ErrSnapshotInProgress = fmt.Errorf("snapshot in progress")
27)
28
29// ErrCacheMemorySizeLimitExceeded returns an error indicating an operation
30// could not be completed due to exceeding the cache-max-memory-size setting.
31func ErrCacheMemorySizeLimitExceeded(n, limit uint64) error {
32	return fmt.Errorf("cache-max-memory-size exceeded: (%d/%d)", n, limit)
33}
34
35// entry is a set of values and some metadata.
36type entry struct {
37	mu     sync.RWMutex
38	values Values // All stored values.
39
40	// The type of values stored. Read only so doesn't need to be protected by
41	// mu.
42	vtype byte
43}
44
45// newEntryValues returns a new instance of entry with the given values.  If the
46// values are not valid, an error is returned.
47func newEntryValues(values []Value) (*entry, error) {
48	e := &entry{}
49	e.values = make(Values, 0, len(values))
50	e.values = append(e.values, values...)
51
52	// No values, don't check types and ordering
53	if len(values) == 0 {
54		return e, nil
55	}
56
57	et := valueType(values[0])
58	for _, v := range values {
59		// Make sure all the values are the same type
60		if et != valueType(v) {
61			return nil, tsdb.ErrFieldTypeConflict
62		}
63	}
64
65	// Set the type of values stored.
66	e.vtype = et
67
68	return e, nil
69}
70
71// add adds the given values to the entry.
72func (e *entry) add(values []Value) error {
73	if len(values) == 0 {
74		return nil // Nothing to do.
75	}
76
77	// Are any of the new values the wrong type?
78	if e.vtype != 0 {
79		for _, v := range values {
80			if e.vtype != valueType(v) {
81				return tsdb.ErrFieldTypeConflict
82			}
83		}
84	}
85
86	// entry currently has no values, so add the new ones and we're done.
87	e.mu.Lock()
88	if len(e.values) == 0 {
89		e.values = values
90		e.vtype = valueType(values[0])
91		e.mu.Unlock()
92		return nil
93	}
94
95	// Append the new values to the existing ones...
96	e.values = append(e.values, values...)
97	e.mu.Unlock()
98	return nil
99}
100
101// deduplicate sorts and orders the entry's values. If values are already deduped and sorted,
102// the function does no work and simply returns.
103func (e *entry) deduplicate() {
104	e.mu.Lock()
105	defer e.mu.Unlock()
106
107	if len(e.values) <= 1 {
108		return
109	}
110	e.values = e.values.Deduplicate()
111}
112
113// count returns the number of values in this entry.
114func (e *entry) count() int {
115	e.mu.RLock()
116	n := len(e.values)
117	e.mu.RUnlock()
118	return n
119}
120
121// filter removes all values with timestamps between min and max inclusive.
122func (e *entry) filter(min, max int64) {
123	e.mu.Lock()
124	if len(e.values) > 1 {
125		e.values = e.values.Deduplicate()
126	}
127	e.values = e.values.Exclude(min, max)
128	e.mu.Unlock()
129}
130
131// size returns the size of this entry in bytes.
132func (e *entry) size() int {
133	e.mu.RLock()
134	sz := e.values.Size()
135	e.mu.RUnlock()
136	return sz
137}
138
139// InfluxQLType returns for the entry the data type of its values.
140func (e *entry) InfluxQLType() (influxql.DataType, error) {
141	e.mu.RLock()
142	defer e.mu.RUnlock()
143	return e.values.InfluxQLType()
144}
145
146// Statistics gathered by the Cache.
147const (
148	// levels - point in time measures
149
150	statCacheMemoryBytes = "memBytes"      // level: Size of in-memory cache in bytes
151	statCacheDiskBytes   = "diskBytes"     // level: Size of on-disk snapshots in bytes
152	statSnapshots        = "snapshotCount" // level: Number of active snapshots.
153	statCacheAgeMs       = "cacheAgeMs"    // level: Number of milliseconds since cache was last snapshoted at sample time
154
155	// counters - accumulative measures
156
157	statCachedBytes         = "cachedBytes"         // counter: Total number of bytes written into snapshots.
158	statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
159
160	statCacheWriteOK      = "writeOk"
161	statCacheWriteErr     = "writeErr"
162	statCacheWriteDropped = "writeDropped"
163)
164
165// storer is the interface that descibes a cache's store.
166type storer interface {
167	entry(key []byte) *entry                        // Get an entry by its key.
168	write(key []byte, values Values) (bool, error)  // Write an entry to the store.
169	add(key []byte, entry *entry)                   // Add a new entry to the store.
170	remove(key []byte)                              // Remove an entry from the store.
171	keys(sorted bool) [][]byte                      // Return an optionally sorted slice of entry keys.
172	apply(f func([]byte, *entry) error) error       // Apply f to all entries in the store in parallel.
173	applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
174	reset()                                         // Reset the store to an initial unused state.
175	split(n int) []storer                           // Split splits the store into n stores
176	count() int                                     // Count returns the number of keys in the store
177}
178
179// Cache maintains an in-memory store of Values for a set of keys.
180type Cache struct {
181	// Due to a bug in atomic  size needs to be the first word in the struct, as
182	// that's the only place where you're guaranteed to be 64-bit aligned on a
183	// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
184	size         uint64
185	snapshotSize uint64
186
187	mu      sync.RWMutex
188	store   storer
189	maxSize uint64
190
191	// snapshots are the cache objects that are currently being written to tsm files
192	// they're kept in memory while flushing so they can be queried along with the cache.
193	// they are read only and should never be modified
194	snapshot     *Cache
195	snapshotting bool
196
197	// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
198	snapshotAttempts int
199
200	stats         *CacheStatistics
201	lastSnapshot  time.Time
202	lastWriteTime time.Time
203
204	// A one time synchronization used to initial the cache with a store.  Since the store can allocate a
205	// a large amount memory across shards, we lazily create it.
206	initialize       atomic.Value
207	initializedCount uint32
208}
209
210// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
211// Only used for engine caches, never for snapshots.
212func NewCache(maxSize uint64) *Cache {
213	c := &Cache{
214		maxSize:      maxSize,
215		store:        emptyStore{},
216		stats:        &CacheStatistics{},
217		lastSnapshot: time.Now(),
218	}
219	c.initialize.Store(&sync.Once{})
220	c.UpdateAge()
221	c.UpdateCompactTime(0)
222	c.updateCachedBytes(0)
223	c.updateMemSize(0)
224	c.updateSnapshots()
225	return c
226}
227
228// CacheStatistics hold statistics related to the cache.
229type CacheStatistics struct {
230	MemSizeBytes        int64
231	DiskSizeBytes       int64
232	SnapshotCount       int64
233	CacheAgeMs          int64
234	CachedBytes         int64
235	WALCompactionTimeMs int64
236	WriteOK             int64
237	WriteErr            int64
238	WriteDropped        int64
239}
240
241// Statistics returns statistics for periodic monitoring.
242func (c *Cache) Statistics(tags map[string]string) []models.Statistic {
243	return []models.Statistic{{
244		Name: "tsm1_cache",
245		Tags: tags,
246		Values: map[string]interface{}{
247			statCacheMemoryBytes:    atomic.LoadInt64(&c.stats.MemSizeBytes),
248			statCacheDiskBytes:      atomic.LoadInt64(&c.stats.DiskSizeBytes),
249			statSnapshots:           atomic.LoadInt64(&c.stats.SnapshotCount),
250			statCacheAgeMs:          atomic.LoadInt64(&c.stats.CacheAgeMs),
251			statCachedBytes:         atomic.LoadInt64(&c.stats.CachedBytes),
252			statWALCompactionTimeMs: atomic.LoadInt64(&c.stats.WALCompactionTimeMs),
253			statCacheWriteOK:        atomic.LoadInt64(&c.stats.WriteOK),
254			statCacheWriteErr:       atomic.LoadInt64(&c.stats.WriteErr),
255			statCacheWriteDropped:   atomic.LoadInt64(&c.stats.WriteDropped),
256		},
257	}}
258}
259
260// init initializes the cache and allocates the underlying store.  Once initialized,
261// the store re-used until Freed.
262func (c *Cache) init() {
263	if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
264		return
265	}
266
267	c.mu.Lock()
268	c.store, _ = newring(ringShards)
269	c.mu.Unlock()
270}
271
272// Free releases the underlying store and memory held by the Cache.
273func (c *Cache) Free() {
274	if !atomic.CompareAndSwapUint32(&c.initializedCount, 1, 0) {
275		return
276	}
277
278	c.mu.Lock()
279	c.store = emptyStore{}
280	c.mu.Unlock()
281}
282
283// Write writes the set of values for the key to the cache. This function is goroutine-safe.
284// It returns an error if the cache will exceed its max size by adding the new values.
285func (c *Cache) Write(key []byte, values []Value) error {
286	c.init()
287	addedSize := uint64(Values(values).Size())
288
289	// Enough room in the cache?
290	limit := c.maxSize
291	n := c.Size() + addedSize
292
293	if limit > 0 && n > limit {
294		atomic.AddInt64(&c.stats.WriteErr, 1)
295		return ErrCacheMemorySizeLimitExceeded(n, limit)
296	}
297
298	newKey, err := c.store.write(key, values)
299	if err != nil {
300		atomic.AddInt64(&c.stats.WriteErr, 1)
301		return err
302	}
303
304	if newKey {
305		addedSize += uint64(len(key))
306	}
307	// Update the cache size and the memory size stat.
308	c.increaseSize(addedSize)
309	c.updateMemSize(int64(addedSize))
310	atomic.AddInt64(&c.stats.WriteOK, 1)
311
312	return nil
313}
314
315// WriteMulti writes the map of keys and associated values to the cache. This
316// function is goroutine-safe. It returns an error if the cache will exceeded
317// its max size by adding the new values.  The write attempts to write as many
318// values as possible.  If one key fails, the others can still succeed and an
319// error will be returned.
320func (c *Cache) WriteMulti(values map[string][]Value) error {
321	c.init()
322	var addedSize uint64
323	for _, v := range values {
324		addedSize += uint64(Values(v).Size())
325	}
326
327	// Enough room in the cache?
328	limit := c.maxSize // maxSize is safe for reading without a lock.
329	n := c.Size() + addedSize
330	if limit > 0 && n > limit {
331		atomic.AddInt64(&c.stats.WriteErr, 1)
332		return ErrCacheMemorySizeLimitExceeded(n, limit)
333	}
334
335	var werr error
336	c.mu.RLock()
337	store := c.store
338	c.mu.RUnlock()
339
340	// We'll optimistially set size here, and then decrement it for write errors.
341	c.increaseSize(addedSize)
342	for k, v := range values {
343		newKey, err := store.write([]byte(k), v)
344		if err != nil {
345			// The write failed, hold onto the error and adjust the size delta.
346			werr = err
347			addedSize -= uint64(Values(v).Size())
348			c.decreaseSize(uint64(Values(v).Size()))
349		}
350		if newKey {
351			addedSize += uint64(len(k))
352			c.increaseSize(uint64(len(k)))
353		}
354	}
355
356	// Some points in the batch were dropped.  An error is returned so
357	// error stat is incremented as well.
358	if werr != nil {
359		atomic.AddInt64(&c.stats.WriteDropped, 1)
360		atomic.AddInt64(&c.stats.WriteErr, 1)
361	}
362
363	// Update the memory size stat
364	c.updateMemSize(int64(addedSize))
365	atomic.AddInt64(&c.stats.WriteOK, 1)
366
367	c.mu.Lock()
368	c.lastWriteTime = time.Now()
369	c.mu.Unlock()
370
371	return werr
372}
373
374// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
375// are being flushed, and resets the current cache with new values.
376func (c *Cache) Snapshot() (*Cache, error) {
377	c.init()
378
379	c.mu.Lock()
380	defer c.mu.Unlock()
381
382	if c.snapshotting {
383		return nil, ErrSnapshotInProgress
384	}
385
386	c.snapshotting = true
387	c.snapshotAttempts++ // increment the number of times we tried to do this
388
389	// If no snapshot exists, create a new one, otherwise update the existing snapshot
390	if c.snapshot == nil {
391		store, err := newring(ringShards)
392		if err != nil {
393			return nil, err
394		}
395
396		c.snapshot = &Cache{
397			store: store,
398		}
399	}
400
401	// Did a prior snapshot exist that failed?  If so, return the existing
402	// snapshot to retry.
403	if c.snapshot.Size() > 0 {
404		return c.snapshot, nil
405	}
406
407	c.snapshot.store, c.store = c.store, c.snapshot.store
408	snapshotSize := c.Size()
409
410	// Save the size of the snapshot on the snapshot cache
411	atomic.StoreUint64(&c.snapshot.size, snapshotSize)
412	// Save the size of the snapshot on the live cache
413	atomic.StoreUint64(&c.snapshotSize, snapshotSize)
414
415	// Reset the cache's store.
416	c.store.reset()
417	atomic.StoreUint64(&c.size, 0)
418	c.lastSnapshot = time.Now()
419
420	c.updateCachedBytes(snapshotSize) // increment the number of bytes added to the snapshot
421	c.updateSnapshots()
422
423	return c.snapshot, nil
424}
425
426// Deduplicate sorts the snapshot before returning it. The compactor and any queries
427// coming in while it writes will need the values sorted.
428func (c *Cache) Deduplicate() {
429	c.mu.RLock()
430	store := c.store
431	c.mu.RUnlock()
432
433	// Apply a function that simply calls deduplicate on each entry in the ring.
434	// apply cannot return an error in this invocation.
435	_ = store.apply(func(_ []byte, e *entry) error { e.deduplicate(); return nil })
436}
437
438// ClearSnapshot removes the snapshot cache from the list of flushing caches and
439// adjusts the size.
440func (c *Cache) ClearSnapshot(success bool) {
441	c.init()
442
443	c.mu.RLock()
444	snapStore := c.snapshot.store
445	c.mu.RUnlock()
446
447	// reset the snapshot store outside of the write lock
448	if success {
449		snapStore.reset()
450	}
451
452	c.mu.Lock()
453	defer c.mu.Unlock()
454
455	c.snapshotting = false
456
457	if success {
458		c.snapshotAttempts = 0
459		c.updateMemSize(-int64(atomic.LoadUint64(&c.snapshotSize))) // decrement the number of bytes in cache
460
461		// Reset the snapshot to a fresh Cache.
462		c.snapshot = &Cache{
463			store: c.snapshot.store,
464		}
465
466		atomic.StoreUint64(&c.snapshotSize, 0)
467		c.updateSnapshots()
468	}
469}
470
471// Size returns the number of point-calcuated bytes the cache currently uses.
472func (c *Cache) Size() uint64 {
473	return atomic.LoadUint64(&c.size) + atomic.LoadUint64(&c.snapshotSize)
474}
475
476// increaseSize increases size by delta.
477func (c *Cache) increaseSize(delta uint64) {
478	atomic.AddUint64(&c.size, delta)
479}
480
481// decreaseSize decreases size by delta.
482func (c *Cache) decreaseSize(delta uint64) {
483	// Per sync/atomic docs, bit-flip delta minus one to perform subtraction within AddUint64.
484	atomic.AddUint64(&c.size, ^(delta - 1))
485}
486
487// MaxSize returns the maximum number of bytes the cache may consume.
488func (c *Cache) MaxSize() uint64 {
489	return c.maxSize
490}
491
492func (c *Cache) Count() int {
493	c.mu.RLock()
494	n := c.store.count()
495	c.mu.RUnlock()
496	return n
497}
498
499// Keys returns a sorted slice of all keys under management by the cache.
500func (c *Cache) Keys() [][]byte {
501	c.mu.RLock()
502	store := c.store
503	c.mu.RUnlock()
504	return store.keys(true)
505}
506
507func (c *Cache) Split(n int) []*Cache {
508	if n == 1 {
509		return []*Cache{c}
510	}
511
512	caches := make([]*Cache, n)
513	storers := c.store.split(n)
514	for i := 0; i < n; i++ {
515		caches[i] = &Cache{
516			store: storers[i],
517		}
518	}
519	return caches
520}
521
522// Type returns the series type for a key.
523func (c *Cache) Type(key []byte) (models.FieldType, error) {
524	c.mu.RLock()
525	e := c.store.entry(key)
526	if e == nil && c.snapshot != nil {
527		e = c.snapshot.store.entry(key)
528	}
529	c.mu.RUnlock()
530
531	if e != nil {
532		typ, err := e.InfluxQLType()
533		if err != nil {
534			return models.Empty, tsdb.ErrUnknownFieldType
535		}
536
537		switch typ {
538		case influxql.Float:
539			return models.Float, nil
540		case influxql.Integer:
541			return models.Integer, nil
542		case influxql.Unsigned:
543			return models.Unsigned, nil
544		case influxql.Boolean:
545			return models.Boolean, nil
546		case influxql.String:
547			return models.String, nil
548		}
549	}
550
551	return models.Empty, tsdb.ErrUnknownFieldType
552}
553
554// Values returns a copy of all values, deduped and sorted, for the given key.
555func (c *Cache) Values(key []byte) Values {
556	var snapshotEntries *entry
557
558	c.mu.RLock()
559	e := c.store.entry(key)
560	if c.snapshot != nil {
561		snapshotEntries = c.snapshot.store.entry(key)
562	}
563	c.mu.RUnlock()
564
565	if e == nil {
566		if snapshotEntries == nil {
567			// No values in hot cache or snapshots.
568			return nil
569		}
570	} else {
571		e.deduplicate()
572	}
573
574	// Build the sequence of entries that will be returned, in the correct order.
575	// Calculate the required size of the destination buffer.
576	var entries []*entry
577	sz := 0
578
579	if snapshotEntries != nil {
580		snapshotEntries.deduplicate() // guarantee we are deduplicated
581		entries = append(entries, snapshotEntries)
582		sz += snapshotEntries.count()
583	}
584
585	if e != nil {
586		entries = append(entries, e)
587		sz += e.count()
588	}
589
590	// Any entries? If not, return.
591	if sz == 0 {
592		return nil
593	}
594
595	// Create the buffer, and copy all hot values and snapshots. Individual
596	// entries are sorted at this point, so now the code has to check if the
597	// resultant buffer will be sorted from start to finish.
598	values := make(Values, sz)
599	n := 0
600	for _, e := range entries {
601		e.mu.RLock()
602		n += copy(values[n:], e.values)
603		e.mu.RUnlock()
604	}
605	values = values[:n]
606	values = values.Deduplicate()
607
608	return values
609}
610
611// Delete removes all values for the given keys from the cache.
612func (c *Cache) Delete(keys [][]byte) {
613	c.DeleteRange(keys, math.MinInt64, math.MaxInt64)
614}
615
616// DeleteRange removes the values for all keys containing points
617// with timestamps between between min and max from the cache.
618//
619// TODO(edd): Lock usage could possibly be optimised if necessary.
620func (c *Cache) DeleteRange(keys [][]byte, min, max int64) {
621	c.init()
622
623	c.mu.Lock()
624	defer c.mu.Unlock()
625
626	for _, k := range keys {
627		// Make sure key exist in the cache, skip if it does not
628		e := c.store.entry(k)
629		if e == nil {
630			continue
631		}
632
633		origSize := uint64(e.size())
634		if min == math.MinInt64 && max == math.MaxInt64 {
635			c.decreaseSize(origSize + uint64(len(k)))
636			c.store.remove(k)
637			continue
638		}
639
640		e.filter(min, max)
641		if e.count() == 0 {
642			c.store.remove(k)
643			c.decreaseSize(origSize + uint64(len(k)))
644			continue
645		}
646
647		c.decreaseSize(origSize - uint64(e.size()))
648	}
649	atomic.StoreInt64(&c.stats.MemSizeBytes, int64(c.Size()))
650}
651
652// SetMaxSize updates the memory limit of the cache.
653func (c *Cache) SetMaxSize(size uint64) {
654	c.mu.Lock()
655	c.maxSize = size
656	c.mu.Unlock()
657}
658
659// values returns the values for the key. It assumes the data is already sorted.
660// It doesn't lock the cache but it does read-lock the entry if there is one for the key.
661// values should only be used in compact.go in the CacheKeyIterator.
662func (c *Cache) values(key []byte) Values {
663	e := c.store.entry(key)
664	if e == nil {
665		return nil
666	}
667	e.mu.RLock()
668	v := e.values
669	e.mu.RUnlock()
670	return v
671}
672
673// ApplyEntryFn applies the function f to each entry in the Cache.
674// ApplyEntryFn calls f on each entry in turn, within the same goroutine.
675// It is safe for use by multiple goroutines.
676func (c *Cache) ApplyEntryFn(f func(key []byte, entry *entry) error) error {
677	c.mu.RLock()
678	store := c.store
679	c.mu.RUnlock()
680	return store.applySerial(f)
681}
682
683// CacheLoader processes a set of WAL segment files, and loads a cache with the data
684// contained within those files.  Processing of the supplied files take place in the
685// order they exist in the files slice.
686type CacheLoader struct {
687	files []string
688
689	Logger *zap.Logger
690}
691
692// NewCacheLoader returns a new instance of a CacheLoader.
693func NewCacheLoader(files []string) *CacheLoader {
694	return &CacheLoader{
695		files:  files,
696		Logger: zap.NewNop(),
697	}
698}
699
700// Load returns a cache loaded with the data contained within the segment files.
701// If, during reading of a segment file, corruption is encountered, that segment
702// file is truncated up to and including the last valid byte, and processing
703// continues with the next segment file.
704func (cl *CacheLoader) Load(cache *Cache) error {
705
706	var r *WALSegmentReader
707	for _, fn := range cl.files {
708		if err := func() error {
709			f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
710			if err != nil {
711				return err
712			}
713			defer f.Close()
714
715			// Log some information about the segments.
716			stat, err := os.Stat(f.Name())
717			if err != nil {
718				return err
719			}
720			cl.Logger.Info("Reading file", zap.String("path", f.Name()), zap.Int64("size", stat.Size()))
721
722			// Nothing to read, skip it
723			if stat.Size() == 0 {
724				return nil
725			}
726
727			if r == nil {
728				r = NewWALSegmentReader(f)
729				defer r.Close()
730			} else {
731				r.Reset(f)
732			}
733
734			for r.Next() {
735				entry, err := r.Read()
736				if err != nil {
737					n := r.Count()
738					cl.Logger.Info("File corrupt", zap.Error(err), zap.String("path", f.Name()), zap.Int64("pos", n))
739					if err := f.Truncate(n); err != nil {
740						return err
741					}
742					break
743				}
744
745				switch t := entry.(type) {
746				case *WriteWALEntry:
747					if err := cache.WriteMulti(t.Values); err != nil {
748						return err
749					}
750				case *DeleteRangeWALEntry:
751					cache.DeleteRange(t.Keys, t.Min, t.Max)
752				case *DeleteWALEntry:
753					cache.Delete(t.Keys)
754				}
755			}
756
757			return r.Close()
758		}(); err != nil {
759			return err
760		}
761	}
762	return nil
763}
764
765// WithLogger sets the logger on the CacheLoader.
766func (cl *CacheLoader) WithLogger(log *zap.Logger) {
767	cl.Logger = log.With(zap.String("service", "cacheloader"))
768}
769
770func (c *Cache) LastWriteTime() time.Time {
771	c.mu.RLock()
772	defer c.mu.RUnlock()
773	return c.lastWriteTime
774}
775
776// UpdateAge updates the age statistic based on the current time.
777func (c *Cache) UpdateAge() {
778	c.mu.RLock()
779	defer c.mu.RUnlock()
780	ageStat := int64(time.Since(c.lastSnapshot) / time.Millisecond)
781	atomic.StoreInt64(&c.stats.CacheAgeMs, ageStat)
782}
783
784// UpdateCompactTime updates WAL compaction time statistic based on d.
785func (c *Cache) UpdateCompactTime(d time.Duration) {
786	atomic.AddInt64(&c.stats.WALCompactionTimeMs, int64(d/time.Millisecond))
787}
788
789// updateCachedBytes increases the cachedBytes counter by b.
790func (c *Cache) updateCachedBytes(b uint64) {
791	atomic.AddInt64(&c.stats.CachedBytes, int64(b))
792}
793
794// updateMemSize updates the memSize level by b.
795func (c *Cache) updateMemSize(b int64) {
796	atomic.AddInt64(&c.stats.MemSizeBytes, b)
797}
798
799func valueType(v Value) byte {
800	switch v.(type) {
801	case FloatValue:
802		return 1
803	case IntegerValue:
804		return 2
805	case StringValue:
806		return 3
807	case BooleanValue:
808		return 4
809	default:
810		return 0
811	}
812}
813
814// updateSnapshots updates the snapshotsCount and the diskSize levels.
815func (c *Cache) updateSnapshots() {
816	// Update disk stats
817	atomic.StoreInt64(&c.stats.DiskSizeBytes, int64(atomic.LoadUint64(&c.snapshotSize)))
818	atomic.StoreInt64(&c.stats.SnapshotCount, int64(c.snapshotAttempts))
819}
820
821type emptyStore struct{}
822
823func (e emptyStore) entry(key []byte) *entry                        { return nil }
824func (e emptyStore) write(key []byte, values Values) (bool, error)  { return false, nil }
825func (e emptyStore) add(key []byte, entry *entry)                   {}
826func (e emptyStore) remove(key []byte)                              {}
827func (e emptyStore) keys(sorted bool) [][]byte                      { return nil }
828func (e emptyStore) apply(f func([]byte, *entry) error) error       { return nil }
829func (e emptyStore) applySerial(f func([]byte, *entry) error) error { return nil }
830func (e emptyStore) reset()                                         {}
831func (e emptyStore) split(n int) []storer                           { return nil }
832func (e emptyStore) count() int                                     { return 0 }
833