1// Copyright 2014 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package local
15
16import (
17	"bufio"
18	"context"
19	"encoding/binary"
20	"fmt"
21	"io"
22	"io/ioutil"
23	"math"
24	"os"
25	"path/filepath"
26	"strconv"
27	"strings"
28	"sync"
29	"sync/atomic"
30	"time"
31
32	"github.com/prometheus/client_golang/prometheus"
33	"github.com/prometheus/common/log"
34	"github.com/prometheus/common/model"
35
36	"github.com/prometheus/prometheus/storage/local/chunk"
37	"github.com/prometheus/prometheus/storage/local/codable"
38	"github.com/prometheus/prometheus/storage/local/index"
39	"github.com/prometheus/prometheus/util/flock"
40)
41
42const (
43	// Version of the storage as it can be found in the version file.
44	// Increment to protect against incompatible changes.
45	Version         = 1
46	versionFileName = "VERSION"
47
48	seriesFileSuffix     = ".db"
49	seriesTempFileSuffix = ".db.tmp"
50	seriesDirNameLen     = 2 // How many bytes of the fingerprint in dir name.
51	hintFileSuffix       = ".hint"
52
53	mappingsFileName      = "mappings.db"
54	mappingsTempFileName  = "mappings.db.tmp"
55	mappingsFormatVersion = 1
56	mappingsMagicString   = "PrometheusMappings"
57
58	dirtyFileName = "DIRTY"
59
60	fileBufSize = 1 << 16 // 64kiB.
61
62	chunkHeaderLen             = 17
63	chunkHeaderTypeOffset      = 0
64	chunkHeaderFirstTimeOffset = 1
65	chunkHeaderLastTimeOffset  = 9
66	chunkLenWithHeader         = chunk.ChunkLen + chunkHeaderLen
67	chunkMaxBatchSize          = 62 // Max no. of chunks to load or write in
68	// one batch.  Note that 62 is the largest number of chunks that fit
69	// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.
70
71	indexingMaxBatchSize  = 1024 * 1024
72	indexingBatchTimeout  = 500 * time.Millisecond // Commit batch when idle for that long.
73	indexingQueueCapacity = 1024 * 256
74)
75
76var fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string.
77
78const (
79	flagHeadChunkPersisted byte = 1 << iota
80	// Add more flags here like:
81	// flagFoo
82	// flagBar
83)
84
85type indexingOpType byte
86
87const (
88	add indexingOpType = iota
89	remove
90)
91
92type indexingOp struct {
93	fingerprint model.Fingerprint
94	metric      model.Metric
95	opType      indexingOpType
96}
97
98// A Persistence is used by a Storage implementation to store samples
99// persistently across restarts. The methods are only goroutine-safe if
100// explicitly marked as such below. The chunk-related methods persistChunks,
101// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
102// each other if each call refers to a different fingerprint.
103type persistence struct {
104	basePath string
105
106	archivedFingerprintToMetrics   *index.FingerprintMetricIndex
107	archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
108	labelPairToFingerprints        *index.LabelPairFingerprintIndex
109	labelNameToLabelValues         *index.LabelNameLabelValuesIndex
110
111	indexingQueue   chan indexingOp
112	indexingStopped chan struct{}
113	indexingFlush   chan chan int
114
115	indexingQueueLength     prometheus.Gauge
116	indexingQueueCapacity   prometheus.Metric
117	indexingBatchSizes      prometheus.Summary
118	indexingBatchDuration   prometheus.Summary
119	checkpointDuration      prometheus.Summary
120	checkpointLastDuration  prometheus.Gauge
121	checkpointLastSize      prometheus.Gauge
122	checkpointChunksWritten prometheus.Summary
123	dirtyCounter            prometheus.Counter
124	startedDirty            prometheus.Gauge
125	checkpointing           prometheus.Gauge
126	seriesChunksPersisted   prometheus.Histogram
127
128	dirtyMtx       sync.Mutex     // Protects dirty and becameDirty.
129	dirty          bool           // true if persistence was started in dirty state.
130	becameDirty    bool           // true if an inconsistency came up during runtime.
131	pedanticChecks bool           // true if crash recovery should check each series.
132	dirtyFileName  string         // The file used for locking and to mark dirty state.
133	fLock          flock.Releaser // The file lock to protect against concurrent usage.
134
135	shouldSync syncStrategy
136
137	minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.
138
139	bufPool sync.Pool
140}
141
142// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
143func newPersistence(
144	basePath string,
145	dirty, pedanticChecks bool,
146	shouldSync syncStrategy,
147	minShrinkRatio float64,
148) (*persistence, error) {
149	dirtyPath := filepath.Join(basePath, dirtyFileName)
150	versionPath := filepath.Join(basePath, versionFileName)
151
152	if versionData, err := ioutil.ReadFile(versionPath); err == nil {
153		if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
154			return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData)
155		} else if persistedVersion != Version {
156			return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion)
157		}
158	} else if os.IsNotExist(err) {
159		// No version file found. Let's create the directory (in case
160		// it's not there yet) and then check if it is actually
161		// empty. If not, we have found an old storage directory without
162		// version file, so we have to bail out.
163		if err := os.MkdirAll(basePath, 0700); err != nil {
164			if abspath, e := filepath.Abs(basePath); e == nil {
165				return nil, fmt.Errorf("cannot create persistent directory %s: %s", abspath, err)
166			}
167			return nil, fmt.Errorf("cannot create persistent directory %s: %s", basePath, err)
168		}
169		fis, err := ioutil.ReadDir(basePath)
170		if err != nil {
171			return nil, err
172		}
173		filesPresent := len(fis)
174		for i := range fis {
175			switch {
176			case fis[i].Name() == "lost+found" && fis[i].IsDir():
177				filesPresent--
178			case strings.HasPrefix(fis[i].Name(), "."):
179				filesPresent--
180			}
181		}
182		if filesPresent > 0 {
183			return nil, fmt.Errorf("found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path")
184		}
185		// Finally we can write our own version into a new version file.
186		file, err := os.Create(versionPath)
187		if err != nil {
188			return nil, err
189		}
190		defer file.Close()
191		if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil {
192			return nil, err
193		}
194	} else {
195		return nil, err
196	}
197
198	fLock, dirtyfileExisted, err := flock.New(dirtyPath)
199	if err != nil {
200		log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
201		return nil, err
202	}
203	if dirtyfileExisted {
204		dirty = true
205	}
206
207	archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
208	if err != nil {
209		// At this point, we could simply blow away the archived
210		// fingerprint-to-metric index. However, then we would lose
211		// _all_ archived metrics. So better give the user an
212		// opportunity to repair the LevelDB with a 3rd party tool.
213		log.Errorf("Could not open the fingerprint-to-metric index for archived series. Please try a 3rd party tool to repair LevelDB in directory %q. If unsuccessful or undesired, delete the whole directory and restart Prometheus for crash recovery. You will lose all archived time series.", filepath.Join(basePath, index.FingerprintToMetricDir))
214		return nil, err
215	}
216	archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
217	if err != nil {
218		// We can recover the archived fingerprint-to-timerange index,
219		// so blow it away and set ourselves dirty. Then re-open the now
220		// empty index.
221		if err := index.DeleteFingerprintTimeRangeIndex(basePath); err != nil {
222			return nil, err
223		}
224		dirty = true
225		if archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath); err != nil {
226			return nil, err
227		}
228	}
229
230	p := &persistence{
231		basePath: basePath,
232
233		archivedFingerprintToMetrics:   archivedFingerprintToMetrics,
234		archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
235
236		indexingQueue:   make(chan indexingOp, indexingQueueCapacity),
237		indexingStopped: make(chan struct{}),
238		indexingFlush:   make(chan chan int),
239
240		indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
241			Namespace: namespace,
242			Subsystem: subsystem,
243			Name:      "indexing_queue_length",
244			Help:      "The number of metrics waiting to be indexed.",
245		}),
246		indexingQueueCapacity: prometheus.MustNewConstMetric(
247			prometheus.NewDesc(
248				prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
249				"The capacity of the indexing queue.",
250				nil, nil,
251			),
252			prometheus.GaugeValue,
253			float64(indexingQueueCapacity),
254		),
255		indexingBatchSizes: prometheus.NewSummary(
256			prometheus.SummaryOpts{
257				Namespace: namespace,
258				Subsystem: subsystem,
259				Name:      "indexing_batch_sizes",
260				Help:      "Quantiles for indexing batch sizes (number of metrics per batch).",
261			},
262		),
263		indexingBatchDuration: prometheus.NewSummary(
264			prometheus.SummaryOpts{
265				Namespace: namespace,
266				Subsystem: subsystem,
267				Name:      "indexing_batch_duration_seconds",
268				Help:      "Quantiles for batch indexing duration in seconds.",
269			},
270		),
271		checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
272			Namespace: namespace,
273			Subsystem: subsystem,
274			Name:      "checkpoint_last_duration_seconds",
275			Help:      "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
276		}),
277		checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
278			Namespace:  namespace,
279			Subsystem:  subsystem,
280			Objectives: map[float64]float64{},
281			Name:       "checkpoint_duration_seconds",
282			Help:       "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
283		}),
284		checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
285			Namespace: namespace,
286			Subsystem: subsystem,
287			Name:      "checkpoint_last_size_bytes",
288			Help:      "The size of the last checkpoint of open chunks and chunks yet to be persisted",
289		}),
290		checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
291			Namespace:  namespace,
292			Subsystem:  subsystem,
293			Objectives: map[float64]float64{},
294			Name:       "checkpoint_series_chunks_written",
295			Help:       "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
296		}),
297		dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
298			Namespace: namespace,
299			Subsystem: subsystem,
300			Name:      "inconsistencies_total",
301			Help:      "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
302		}),
303		startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{
304			Namespace: namespace,
305			Subsystem: subsystem,
306			Name:      "started_dirty",
307			Help:      "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
308		}),
309		checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
310			Namespace: namespace,
311			Subsystem: subsystem,
312			Name:      "checkpointing",
313			Help:      "1 if the storage is checkpointing, 0 otherwise.",
314		}),
315		seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
316			Namespace: namespace,
317			Subsystem: subsystem,
318			Name:      "series_chunks_persisted",
319			Help:      "The number of chunks persisted per series.",
320			// Even with 4 bytes per sample, you're not going to get more than 85
321			// chunks in 6 hours for a time series with 1s resolution.
322			Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
323		}),
324		dirty:          dirty,
325		pedanticChecks: pedanticChecks,
326		dirtyFileName:  dirtyPath,
327		fLock:          fLock,
328		shouldSync:     shouldSync,
329		minShrinkRatio: minShrinkRatio,
330		// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
331		// and at the same time enough for many uses. The contract is to never return buffer smaller than
332		// that to the pool so that callers can rely on a minimum buffer size.
333		bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
334	}
335
336	if p.dirty {
337		// Blow away the label indexes. We'll rebuild them later.
338		if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
339			return nil, err
340		}
341		if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
342			return nil, err
343		}
344	}
345	labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
346	if err != nil {
347		return nil, err
348	}
349	labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
350	if err != nil {
351		return nil, err
352	}
353	p.labelPairToFingerprints = labelPairToFingerprints
354	p.labelNameToLabelValues = labelNameToLabelValues
355
356	return p, nil
357}
358
359func (p *persistence) run() {
360	p.processIndexingQueue()
361}
362
363// Describe implements prometheus.Collector.
364func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
365	ch <- p.indexingQueueLength.Desc()
366	ch <- p.indexingQueueCapacity.Desc()
367	p.indexingBatchSizes.Describe(ch)
368	p.indexingBatchDuration.Describe(ch)
369	ch <- p.checkpointDuration.Desc()
370	ch <- p.checkpointLastDuration.Desc()
371	ch <- p.checkpointLastSize.Desc()
372	ch <- p.checkpointChunksWritten.Desc()
373	ch <- p.checkpointing.Desc()
374	ch <- p.dirtyCounter.Desc()
375	ch <- p.startedDirty.Desc()
376	ch <- p.seriesChunksPersisted.Desc()
377}
378
379// Collect implements prometheus.Collector.
380func (p *persistence) Collect(ch chan<- prometheus.Metric) {
381	p.indexingQueueLength.Set(float64(len(p.indexingQueue)))
382
383	ch <- p.indexingQueueLength
384	ch <- p.indexingQueueCapacity
385	p.indexingBatchSizes.Collect(ch)
386	p.indexingBatchDuration.Collect(ch)
387	ch <- p.checkpointDuration
388	ch <- p.checkpointLastDuration
389	ch <- p.checkpointLastSize
390	ch <- p.checkpointChunksWritten
391	ch <- p.checkpointing
392	ch <- p.dirtyCounter
393	ch <- p.startedDirty
394	ch <- p.seriesChunksPersisted
395}
396
397// isDirty returns the dirty flag in a goroutine-safe way.
398func (p *persistence) isDirty() bool {
399	p.dirtyMtx.Lock()
400	defer p.dirtyMtx.Unlock()
401	return p.dirty
402}
403
404// setDirty flags the storage as dirty in a goroutine-safe way. The provided
405// error will be logged as a reason the first time the storage is flagged as dirty.
406func (p *persistence) setDirty(err error) {
407	p.dirtyCounter.Inc()
408	p.dirtyMtx.Lock()
409	defer p.dirtyMtx.Unlock()
410	if p.becameDirty {
411		return
412	}
413	p.dirty = true
414	p.becameDirty = true
415	log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
416}
417
418// fingerprintsForLabelPair returns the fingerprints for the given label
419// pair. This method is goroutine-safe but take into account that metrics queued
420// for indexing with IndexMetric might not have made it into the index
421// yet. (Same applies correspondingly to UnindexMetric.)
422func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
423	fps, _, err := p.labelPairToFingerprints.Lookup(lp)
424	if err != nil {
425		p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
426		return nil
427	}
428	return fps
429}
430
431// labelValuesForLabelName returns the label values for the given label
432// name. This method is goroutine-safe but take into account that metrics queued
433// for indexing with IndexMetric might not have made it into the index
434// yet. (Same applies correspondingly to UnindexMetric.)
435func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
436	lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
437	if err != nil {
438		p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
439		return nil, err
440	}
441	return lvs, nil
442}
443
444// persistChunks persists a number of consecutive chunks of a series. It is the
445// caller's responsibility to not modify the chunks concurrently and to not
446// persist or drop anything for the same fingerprint concurrently. It returns
447// the (zero-based) index of the first persisted chunk within the series
448// file. In case of an error, the returned index is -1 (to avoid the
449// misconception that the chunk was written at position 0).
450//
451// Returning an error signals problems with the series file. In this case, the
452// caller should quarantine the series.
453func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {
454	f, err := p.openChunkFileForWriting(fp)
455	if err != nil {
456		return -1, err
457	}
458	defer p.closeChunkFile(f)
459
460	if err := p.writeChunks(f, chunks); err != nil {
461		return -1, err
462	}
463
464	// Determine index within the file.
465	offset, err := f.Seek(0, io.SeekCurrent)
466	if err != nil {
467		return -1, err
468	}
469	index, err = chunkIndexForOffset(offset)
470	if err != nil {
471		return -1, err
472	}
473
474	return index - len(chunks), err
475}
476
477// loadChunks loads a group of chunks of a timeseries by their index. The chunk
478// with the earliest time will have index 0, the following ones will have
479// incrementally larger indexes. The indexOffset denotes the offset to be added to
480// each index in indexes. It is the caller's responsibility to not persist or
481// drop anything for the same fingerprint concurrently.
482func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
483	f, err := p.openChunkFileForReading(fp)
484	if err != nil {
485		return nil, err
486	}
487	defer f.Close()
488
489	chunks := make([]chunk.Chunk, 0, len(indexes))
490	buf := p.bufPool.Get().([]byte)
491	defer func() {
492		// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
493		// would only put back the original buf.
494		p.bufPool.Put(buf)
495	}()
496
497	for i := 0; i < len(indexes); i++ {
498		// This loads chunks in batches. A batch is a streak of
499		// consecutive chunks, read from disk in one go.
500		batchSize := 1
501		if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), io.SeekStart); err != nil {
502			return nil, err
503		}
504
505		for ; batchSize < chunkMaxBatchSize &&
506			i+1 < len(indexes) &&
507			indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
508		}
509		readSize := batchSize * chunkLenWithHeader
510		if cap(buf) < readSize {
511			buf = make([]byte, readSize)
512		}
513		buf = buf[:readSize]
514
515		if _, err := io.ReadFull(f, buf); err != nil {
516			return nil, err
517		}
518		for c := 0; c < batchSize; c++ {
519			chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
520			if err != nil {
521				return nil, err
522			}
523			if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
524				return nil, err
525			}
526			chunks = append(chunks, chunk)
527		}
528	}
529	chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
530	atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
531	return chunks, nil
532}
533
534// loadChunkDescs loads the chunk.Descs for a series from disk. offsetFromEnd is
535// the number of chunk.Descs to skip from the end of the series file. It is the
536// caller's responsibility to not persist or drop anything for the same
537// fingerprint concurrently.
538func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
539	f, err := p.openChunkFileForReading(fp)
540	if os.IsNotExist(err) {
541		return nil, nil
542	}
543	if err != nil {
544		return nil, err
545	}
546	defer f.Close()
547
548	fi, err := f.Stat()
549	if err != nil {
550		return nil, err
551	}
552	if fi.Size()%int64(chunkLenWithHeader) != 0 {
553		// The returned error will bubble up and lead to quarantining of the whole series.
554		return nil, fmt.Errorf(
555			"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
556			fp, fi.Size(), chunkLenWithHeader,
557		)
558	}
559
560	numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
561	cds := make([]*chunk.Desc, numChunks)
562	chunkTimesBuf := make([]byte, 16)
563	for i := 0; i < numChunks; i++ {
564		_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, io.SeekStart)
565		if err != nil {
566			return nil, err
567		}
568
569		_, err = io.ReadAtLeast(f, chunkTimesBuf, 16)
570		if err != nil {
571			return nil, err
572		}
573		cds[i] = &chunk.Desc{
574			ChunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
575			ChunkLastTime:  model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
576		}
577	}
578	chunk.DescOps.WithLabelValues(chunk.Load).Add(float64(len(cds)))
579	chunk.NumMemDescs.Add(float64(len(cds)))
580	return cds, nil
581}
582
583// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
584// and all non persisted chunks. Do not call concurrently with
585// loadSeriesMapAndHeads. This method will only write heads format v2, but
586// loadSeriesMapAndHeads can also understand v1.
587//
588// Description of the file format (for both, v1 and v2):
589//
590// (1) Magic string (const headsMagicString).
591//
592// (2) Varint-encoded format version (const headsFormatVersion).
593//
594// (3) Number of series in checkpoint as big-endian uint64.
595//
596// (4) Repeated once per series:
597//
598// (4.1) A flag byte, see flag constants above. (Present but unused in v2.)
599//
600// (4.2) The fingerprint as big-endian uint64.
601//
602// (4.3) The metric as defined by codable.Metric.
603//
604// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
605//
606// (4.5) The modification time of the series file as nanoseconds elapsed since
607// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
608// exists yet. (Missing in v1.)
609//
610// (4.6) The varint-encoded chunkDescsOffset.
611//
612// (4.6) The varint-encoded savedFirstTime.
613//
614// (4.7) The varint-encoded number of chunk descriptors.
615//
616// (4.8) Repeated once per chunk descriptor, oldest to most recent, either
617// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >=
618// persistWatermark). In v1, everything is variant 4.8.1 except for a
619// non-persisted head-chunk (determined by the flags).
620//
621// (4.8.1.1) The varint-encoded first time.
622// (4.8.1.2) The varint-encoded last time.
623//
624// (4.8.2.1) A byte defining the chunk type.
625// (4.8.2.2) The chunk itself, marshaled with the Marshal() method.
626//
627// NOTE: Above, varint encoding is used consistently although uvarint would have
628// made more sense in many cases. This was simply a glitch while designing the
629// format.
630func (p *persistence) checkpointSeriesMapAndHeads(
631	ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker,
632) (err error) {
633	log.Info("Checkpointing in-memory metrics and chunks...")
634	p.checkpointing.Set(1)
635	defer p.checkpointing.Set(0)
636	begin := time.Now()
637	f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
638	if err != nil {
639		return err
640	}
641
642	defer func() {
643		defer os.Remove(p.headsTempFileName()) // Just in case it was left behind.
644
645		if err != nil {
646			// If we already had an error, do not bother to sync,
647			// just close, ignoring any further error.
648			f.Close()
649			return
650		}
651		syncErr := f.Sync()
652		closeErr := f.Close()
653		err = syncErr
654		if err != nil {
655			return
656		}
657		err = closeErr
658		if err != nil {
659			return
660		}
661		err = os.Rename(p.headsTempFileName(), p.headsFileName())
662		duration := time.Since(begin)
663		p.checkpointDuration.Observe(duration.Seconds())
664		p.checkpointLastDuration.Set(duration.Seconds())
665		log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
666	}()
667
668	w := bufio.NewWriterSize(f, fileBufSize)
669
670	if _, err = w.WriteString(headsMagicString); err != nil {
671		return err
672	}
673	var numberOfSeriesOffset int
674	if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
675		return err
676	}
677	numberOfSeriesOffset += len(headsMagicString)
678	numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
679	// We have to write the number of series as uint64 because we might need
680	// to overwrite it later, and a varint might change byte width then.
681	if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
682		return err
683	}
684
685	iter := fingerprintToSeries.iter()
686	defer func() {
687		// Consume the iterator in any case to not leak goroutines.
688		for range iter {
689		}
690	}()
691
692	var realNumberOfSeries uint64
693	for m := range iter {
694		select {
695		case <-ctx.Done():
696			return ctx.Err()
697		default:
698		}
699		func() { // Wrapped in function to use defer for unlocking the fp.
700			fpLocker.Lock(m.fp)
701			defer fpLocker.Unlock(m.fp)
702
703			chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
704			if len(m.series.chunkDescs) == 0 {
705				// This series was completely purged or archived
706				// in the meantime. Ignore.
707				return
708			}
709			realNumberOfSeries++
710
711			// Sanity checks.
712			if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
713				panic("encountered unknown chunk desc offset in combination with positive persist watermark")
714			}
715
716			// These are the values to save in the normal case.
717			var (
718				// persistWatermark is zero as we only checkpoint non-persisted chunks.
719				persistWatermark int64
720				// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
721				chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark)
722				numChunkDescs    = int64(chunksToPersist)
723			)
724			// However, in the special case of a series being fully
725			// persisted but still in memory (i.e. not archived), we
726			// need to save a "placeholder", for which we use just
727			// the chunk desc of the last chunk. Values have to be
728			// adjusted accordingly. (The reason for doing it in
729			// this weird way is to keep the checkpoint format
730			// compatible with older versions.)
731			if chunksToPersist == 0 {
732				persistWatermark = 1
733				chunkDescsOffset-- // Save one chunk desc after all.
734				numChunkDescs = 1
735			}
736
737			// seriesFlags left empty in v2.
738			if err = w.WriteByte(0); err != nil {
739				return
740			}
741			if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
742				return
743			}
744			var buf []byte
745			buf, err = codable.Metric(m.series.metric).MarshalBinary()
746			if err != nil {
747				return
748			}
749			if _, err = w.Write(buf); err != nil {
750				return
751			}
752			if _, err = codable.EncodeVarint(w, persistWatermark); err != nil {
753				return
754			}
755			if m.series.modTime.IsZero() {
756				if _, err = codable.EncodeVarint(w, -1); err != nil {
757					return
758				}
759			} else {
760				if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
761					return
762				}
763			}
764			if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil {
765				return
766			}
767			if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
768				return
769			}
770			if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil {
771				return
772			}
773			if chunksToPersist == 0 {
774				// Save the one placeholder chunk desc for a fully persisted series.
775				chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1]
776				if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
777					return
778				}
779				lt, err := chunkDesc.LastTime()
780				if err != nil {
781					return
782				}
783				if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
784					return
785				}
786			} else {
787				// Save (only) the non-persisted chunks.
788				for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
789					if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
790						return
791					}
792					if err = chunkDesc.C.Marshal(w); err != nil {
793						return
794					}
795					p.checkpointChunksWritten.Observe(float64(chunksToPersist))
796				}
797			}
798			// Series is checkpointed now, so declare it clean. In case the entire
799			// checkpoint fails later on, this is fine, as the storage's series
800			// maintenance will mark these series newly dirty again, continuously
801			// increasing the total number of dirty series as seen by the storage.
802			// This has the effect of triggering a new checkpoint attempt even
803			// earlier than if we hadn't incorrectly set "dirty" to "false" here
804			// already.
805			m.series.dirty = false
806		}()
807		if err != nil {
808			return err
809		}
810	}
811	if err = w.Flush(); err != nil {
812		return err
813	}
814	if realNumberOfSeries != numberOfSeriesInHeader {
815		// The number of series has changed in the meantime.
816		// Rewrite it in the header.
817		if _, err = f.Seek(int64(numberOfSeriesOffset), io.SeekStart); err != nil {
818			return err
819		}
820		if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
821			return err
822		}
823	}
824	info, err := f.Stat()
825	if err != nil {
826		return err
827	}
828	p.checkpointLastSize.Set(float64(info.Size()))
829	return err
830}
831
832// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
833// the chunks contained in the checkpoint (and thus not yet persisted to series
834// files). The method is capable of loading the checkpoint format v1 and v2. If
835// recoverable corruption is detected, or if the dirty flag was set from the
836// beginning, crash recovery is run, which might take a while. If an
837// unrecoverable error is encountered, it is returned. Call this method during
838// start-up while nothing else is running in storage land. This method is
839// utterly goroutine-unsafe.
840func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
841	fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
842	sm = &seriesMap{m: fingerprintToSeries}
843
844	defer func() {
845		if p.dirty {
846			log.Warn("Persistence layer appears dirty.")
847			p.startedDirty.Set(1)
848			err = p.recoverFromCrash(fingerprintToSeries)
849			if err != nil {
850				sm = nil
851			}
852		} else {
853			p.startedDirty.Set(0)
854		}
855	}()
856
857	hs := newHeadsScanner(p.headsFileName())
858	defer hs.close()
859	for hs.scan() {
860		fingerprintToSeries[hs.fp] = hs.series
861	}
862	if os.IsNotExist(hs.err) {
863		return sm, 0, nil
864	}
865	if hs.err != nil {
866		p.dirty = true
867		log.
868			With("file", p.headsFileName()).
869			With("error", hs.err).
870			Error("Error reading heads file.")
871		return sm, 0, hs.err
872	}
873	return sm, hs.chunksToPersistTotal, nil
874}
875
876// dropAndPersistChunks deletes all chunks from a series file whose last sample
877// time is before beforeTime, and then appends the provided chunks, leaving out
878// those whose last sample time is before beforeTime. It returns the timestamp
879// of the first sample in the oldest chunk _not_ dropped, the chunk offset
880// within the series file of the first chunk persisted (out of the provided
881// chunks, or - if no chunks were provided - the chunk offset where chunks would
882// have been persisted, i.e. the end of the file), the number of deleted chunks,
883// and true if all chunks of the series have been deleted (in which case the
884// returned timestamp will be 0 and must be ignored).  It is the caller's
885// responsibility to make sure nothing is persisted or loaded for the same
886// fingerprint concurrently.
887//
888// Returning an error signals problems with the series file. In this case, the
889// caller should quarantine the series.
890func (p *persistence) dropAndPersistChunks(
891	fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
892) (
893	firstTimeNotDropped model.Time,
894	offset int,
895	numDropped int,
896	allDropped bool,
897	err error,
898) {
899	// Style note: With the many return values, it was decided to use naked
900	// returns in this method. They make the method more readable, but
901	// please handle with care!
902	if len(chunks) > 0 {
903		// We have chunks to persist. First check if those are already
904		// too old. If that's the case, the chunks in the series file
905		// are all too old, too.
906		i := 0
907		for ; i < len(chunks); i++ {
908			var lt model.Time
909			lt, err = chunks[i].NewIterator().LastTimestamp()
910			if err != nil {
911				return
912			}
913			if !lt.Before(beforeTime) {
914				break
915			}
916		}
917		if i < len(chunks) {
918			firstTimeNotDropped = chunks[i].FirstTime()
919		}
920		if i > 0 || firstTimeNotDropped.Before(beforeTime) {
921			// Series file has to go.
922			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
923				return
924			}
925			numDropped += i
926			if i == len(chunks) {
927				allDropped = true
928				return
929			}
930			// Now simply persist what has to be persisted to a new file.
931			_, err = p.persistChunks(fp, chunks[i:])
932			return
933		}
934	}
935
936	// If we are here, we have to check the series file itself.
937	f, err := p.openChunkFileForReading(fp)
938	if os.IsNotExist(err) {
939		// No series file. Only need to create new file with chunks to
940		// persist, if there are any.
941		if len(chunks) == 0 {
942			allDropped = true
943			err = nil // Do not report not-exist err.
944			return
945		}
946		offset, err = p.persistChunks(fp, chunks)
947		return
948	}
949	if err != nil {
950		return
951	}
952	defer f.Close()
953
954	fi, err := f.Stat()
955	if err != nil {
956		return
957	}
958	chunksInFile := int(fi.Size()) / chunkLenWithHeader
959	totalChunks := chunksInFile + len(chunks)
960
961	// Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading.
962	chunkIndexToStartSeek := 0
963	if p.minShrinkRatio < 1 {
964		chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio))
965	}
966	if chunkIndexToStartSeek >= chunksInFile {
967		chunkIndexToStartSeek = chunksInFile - 1
968	}
969	numDropped = chunkIndexToStartSeek
970
971	headerBuf := make([]byte, chunkHeaderLen)
972	// Find the first chunk in the file that should be kept.
973	for ; ; numDropped++ {
974		_, err = f.Seek(offsetForChunkIndex(numDropped), io.SeekStart)
975		if err != nil {
976			return
977		}
978		_, err = io.ReadFull(f, headerBuf)
979		if err == io.EOF {
980			// Close the file before trying to delete it. This is necessary on Windows
981			// (this will cause the defer f.Close to fail, but the error is silently ignored)
982			f.Close()
983			// We ran into the end of the file without finding any chunks that should
984			// be kept. Remove the whole file.
985			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
986				return
987			}
988			if len(chunks) == 0 {
989				allDropped = true
990				return
991			}
992			offset, err = p.persistChunks(fp, chunks)
993			return
994		}
995		if err != nil {
996			return
997		}
998		lastTime := model.Time(
999			binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
1000		)
1001		if !lastTime.Before(beforeTime) {
1002			break
1003		}
1004	}
1005
1006	// If numDropped isn't incremented, the minShrinkRatio condition isn't satisfied.
1007	if numDropped == chunkIndexToStartSeek {
1008		// Nothing to drop. Just adjust the return values and append the chunks (if any).
1009		numDropped = 0
1010		_, err = f.Seek(offsetForChunkIndex(0), io.SeekStart)
1011		if err != nil {
1012			return
1013		}
1014		_, err = io.ReadFull(f, headerBuf)
1015		if err != nil {
1016			return
1017		}
1018		firstTimeNotDropped = model.Time(
1019			binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
1020		)
1021		if len(chunks) > 0 {
1022			offset, err = p.persistChunks(fp, chunks)
1023		} else {
1024			offset = chunksInFile
1025		}
1026		return
1027	}
1028	// If we are here, we have to drop some chunks for real. So we need to
1029	// record firstTimeNotDropped from the last read header, seek backwards
1030	// to the beginning of its header, and start copying everything from
1031	// there into a new file. Then append the chunks to the new file.
1032	firstTimeNotDropped = model.Time(
1033		binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
1034	)
1035	chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
1036	_, err = f.Seek(-chunkHeaderLen, io.SeekCurrent)
1037	if err != nil {
1038		return
1039	}
1040
1041	temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
1042	if err != nil {
1043		return
1044	}
1045	defer func() {
1046		// Close the file before trying to rename to it. This is necessary on Windows
1047		// (this will cause the defer f.Close to fail, but the error is silently ignored)
1048		f.Close()
1049		p.closeChunkFile(temp)
1050		if err == nil {
1051			err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
1052		}
1053	}()
1054
1055	written, err := io.Copy(temp, f)
1056	if err != nil {
1057		return
1058	}
1059	offset = int(written / chunkLenWithHeader)
1060
1061	if len(chunks) > 0 {
1062		if err = p.writeChunks(temp, chunks); err != nil {
1063			return
1064		}
1065	}
1066	return
1067}
1068
1069// deleteSeriesFile deletes a series file belonging to the provided
1070// fingerprint. It returns the number of chunks that were contained in the
1071// deleted file.
1072func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
1073	fname := p.fileNameForFingerprint(fp)
1074	fi, err := os.Stat(fname)
1075	if os.IsNotExist(err) {
1076		// Great. The file is already gone.
1077		return 0, nil
1078	}
1079	if err != nil {
1080		return -1, err
1081	}
1082	numChunks := int(fi.Size() / chunkLenWithHeader)
1083	if err := os.Remove(fname); err != nil {
1084		return -1, err
1085	}
1086	chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numChunks))
1087	return numChunks, nil
1088}
1089
1090// quarantineSeriesFile moves a series file to the orphaned directory. It also
1091// writes a hint file with the provided quarantine reason and, if series is
1092// non-nil, the string representation of the metric.
1093func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error {
1094	var (
1095		oldName     = p.fileNameForFingerprint(fp)
1096		orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName)))
1097		newName     = filepath.Join(orphanedDir, filepath.Base(oldName))
1098		hintName    = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix
1099	)
1100
1101	renameErr := os.MkdirAll(orphanedDir, 0700)
1102	if renameErr != nil {
1103		return renameErr
1104	}
1105	renameErr = os.Rename(oldName, newName)
1106	if os.IsNotExist(renameErr) {
1107		// Source file dosn't exist. That's normal.
1108		renameErr = nil
1109	}
1110	// Write hint file even if the rename ended in an error. At least try...
1111	// And ignore errors writing the hint file. It's best effort.
1112	if f, err := os.Create(hintName); err == nil {
1113		if metric != nil {
1114			f.WriteString(metric.String() + "\n")
1115		} else {
1116			f.WriteString("[UNKNOWN METRIC]\n")
1117		}
1118		if quarantineReason != nil {
1119			f.WriteString(quarantineReason.Error() + "\n")
1120		} else {
1121			f.WriteString("[UNKNOWN REASON]\n")
1122		}
1123		f.Close()
1124	}
1125	return renameErr
1126}
1127
1128// seriesFileModTime returns the modification time of the series file belonging
1129// to the provided fingerprint. In case of an error, the zero value of time.Time
1130// is returned.
1131func (p *persistence) seriesFileModTime(fp model.Fingerprint) time.Time {
1132	var modTime time.Time
1133	if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
1134		return fi.ModTime()
1135	}
1136	return modTime
1137}
1138
1139// indexMetric queues the given metric for addition to the indexes needed by
1140// fingerprintsForLabelPair, labelValuesForLabelName, and
1141// fingerprintsModifiedBefore.  If the queue is full, this method blocks until
1142// the metric can be queued.  This method is goroutine-safe.
1143func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) {
1144	p.indexingQueue <- indexingOp{fp, m, add}
1145}
1146
1147// unindexMetric queues references to the given metric for removal from the
1148// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and
1149// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is
1150// not affected by this removal. (In fact, never call this method for an
1151// archived metric. To purge an archived metric, call purgeArchivedMetric.)
1152// If the queue is full, this method blocks until the metric can be queued. This
1153// method is goroutine-safe.
1154func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) {
1155	p.indexingQueue <- indexingOp{fp, m, remove}
1156}
1157
1158// waitForIndexing waits until all items in the indexing queue are processed. If
1159// queue processing is currently on hold (to gather more ops for batching), this
1160// method will trigger an immediate start of processing. This method is
1161// goroutine-safe.
1162func (p *persistence) waitForIndexing() {
1163	wait := make(chan int)
1164	for {
1165		p.indexingFlush <- wait
1166		if <-wait == 0 {
1167			break
1168		}
1169	}
1170}
1171
1172// archiveMetric persists the mapping of the given fingerprint to the given
1173// metric, together with the first and last timestamp of the series belonging to
1174// the metric. The caller must have locked the fingerprint.
1175func (p *persistence) archiveMetric(
1176	fp model.Fingerprint, m model.Metric, first, last model.Time,
1177) {
1178	if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
1179		p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err))
1180		return
1181	}
1182	if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
1183		p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err))
1184	}
1185}
1186
1187// hasArchivedMetric returns whether the archived metric for the given
1188// fingerprint exists and if yes, what the first and last timestamp in the
1189// corresponding series is. This method is goroutine-safe.
1190func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
1191	hasMetric bool, firstTime, lastTime model.Time,
1192) {
1193	firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp)
1194	if err != nil {
1195		p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err))
1196		hasMetric = false
1197	}
1198	return hasMetric, firstTime, lastTime
1199}
1200
1201// updateArchivedTimeRange updates an archived time range. The caller must make
1202// sure that the fingerprint is currently archived (the time range will
1203// otherwise be added without the corresponding metric in the archive).
1204func (p *persistence) updateArchivedTimeRange(
1205	fp model.Fingerprint, first, last model.Time,
1206) error {
1207	return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last})
1208}
1209
1210// fingerprintsModifiedBefore returns the fingerprints of archived timeseries
1211// that have live samples before the provided timestamp. This method is
1212// goroutine-safe.
1213func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model.Fingerprint, error) {
1214	var fp codable.Fingerprint
1215	var tr codable.TimeRange
1216	fps := []model.Fingerprint{}
1217	err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
1218		if err := kv.Value(&tr); err != nil {
1219			return err
1220		}
1221		if tr.First.Before(beforeTime) {
1222			if err := kv.Key(&fp); err != nil {
1223				return err
1224			}
1225			fps = append(fps, model.Fingerprint(fp))
1226		}
1227		return nil
1228	})
1229	return fps, err
1230}
1231
1232// archivedMetric retrieves the archived metric with the given fingerprint. This
1233// method is goroutine-safe.
1234func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
1235	metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
1236	if err != nil {
1237		p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err))
1238		return nil, err
1239	}
1240	return metric, nil
1241}
1242
1243// purgeArchivedMetric deletes an archived fingerprint and its corresponding
1244// metric entirely. It also queues the metric for un-indexing (no need to call
1245// unindexMetric for the deleted metric.) It does not touch the series file,
1246// though. The caller must have locked the fingerprint.
1247func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
1248	defer func() {
1249		if err != nil {
1250			p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err))
1251		}
1252	}()
1253
1254	metric, err := p.archivedMetric(fp)
1255	if err != nil || metric == nil {
1256		return err
1257	}
1258	deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
1259	if err != nil {
1260		return err
1261	}
1262	if !deleted {
1263		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp)
1264	}
1265	deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
1266	if err != nil {
1267		return err
1268	}
1269	if !deleted {
1270		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
1271	}
1272	p.unindexMetric(fp, metric)
1273	return nil
1274}
1275
1276// unarchiveMetric deletes an archived fingerprint and its metric, but (in
1277// contrast to purgeArchivedMetric) does not un-index the metric.  If a metric
1278// was actually deleted, the method returns true and the first time and last
1279// time of the deleted metric. The caller must have locked the fingerprint.
1280func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) {
1281	// An error returned here will bubble up and lead to quarantining of the
1282	// series, so no setDirty required.
1283	deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
1284	if err != nil || !deleted {
1285		return false, err
1286	}
1287	deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
1288	if err != nil {
1289		return false, err
1290	}
1291	if !deleted {
1292		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
1293	}
1294	return true, nil
1295}
1296
1297// close flushes the indexing queue and other buffered data and releases any
1298// held resources. It also removes the dirty marker file if successful and if
1299// the persistence is currently not marked as dirty.
1300func (p *persistence) close() error {
1301	close(p.indexingQueue)
1302	<-p.indexingStopped
1303
1304	var lastError, dirtyFileRemoveError error
1305	if err := p.archivedFingerprintToMetrics.Close(); err != nil {
1306		lastError = err
1307		log.Error("Error closing archivedFingerprintToMetric index DB: ", err)
1308	}
1309	if err := p.archivedFingerprintToTimeRange.Close(); err != nil {
1310		lastError = err
1311		log.Error("Error closing archivedFingerprintToTimeRange index DB: ", err)
1312	}
1313	if err := p.labelPairToFingerprints.Close(); err != nil {
1314		lastError = err
1315		log.Error("Error closing labelPairToFingerprints index DB: ", err)
1316	}
1317	if err := p.labelNameToLabelValues.Close(); err != nil {
1318		lastError = err
1319		log.Error("Error closing labelNameToLabelValues index DB: ", err)
1320	}
1321	if lastError == nil && !p.isDirty() {
1322		dirtyFileRemoveError = os.Remove(p.dirtyFileName)
1323	}
1324	if err := p.fLock.Release(); err != nil {
1325		lastError = err
1326		log.Error("Error releasing file lock: ", err)
1327	}
1328	if dirtyFileRemoveError != nil {
1329		// On Windows, removing the dirty file before unlocking is not
1330		// possible.  So remove it here if it failed above.
1331		lastError = os.Remove(p.dirtyFileName)
1332	}
1333	return lastError
1334}
1335
1336func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string {
1337	fpStr := fp.String()
1338	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen])
1339}
1340
1341func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string {
1342	fpStr := fp.String()
1343	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
1344}
1345
1346func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string {
1347	fpStr := fp.String()
1348	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
1349}
1350
1351func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) {
1352	if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil {
1353		return nil, err
1354	}
1355	return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
1356	// NOTE: Although the file was opened for append,
1357	//     f.Seek(0, io.SeekCurrent)
1358	// would now return '0, nil', so we cannot check for a consistent file length right now.
1359	// However, the chunkIndexForOffset function is doing that check, so a wrong file length
1360	// would still be detected.
1361}
1362
1363// closeChunkFile first syncs the provided file if mandated so by the sync
1364// strategy. Then it closes the file. Errors are logged.
1365func (p *persistence) closeChunkFile(f *os.File) {
1366	if p.shouldSync() {
1367		if err := f.Sync(); err != nil {
1368			log.Error("Error syncing file:", err)
1369		}
1370	}
1371	if err := f.Close(); err != nil {
1372		log.Error("Error closing chunk file:", err)
1373	}
1374}
1375
1376func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, error) {
1377	return os.Open(p.fileNameForFingerprint(fp))
1378}
1379
1380func (p *persistence) headsFileName() string {
1381	return filepath.Join(p.basePath, headsFileName)
1382}
1383
1384func (p *persistence) headsTempFileName() string {
1385	return filepath.Join(p.basePath, headsTempFileName)
1386}
1387
1388func (p *persistence) mappingsFileName() string {
1389	return filepath.Join(p.basePath, mappingsFileName)
1390}
1391
1392func (p *persistence) mappingsTempFileName() string {
1393	return filepath.Join(p.basePath, mappingsTempFileName)
1394}
1395
1396func (p *persistence) processIndexingQueue() {
1397	batchSize := 0
1398	nameToValues := index.LabelNameLabelValuesMapping{}
1399	pairToFPs := index.LabelPairFingerprintsMapping{}
1400	batchTimeout := time.NewTimer(indexingBatchTimeout)
1401	defer batchTimeout.Stop()
1402
1403	commitBatch := func() {
1404		p.indexingBatchSizes.Observe(float64(batchSize))
1405		defer func(begin time.Time) {
1406			p.indexingBatchDuration.Observe(time.Since(begin).Seconds())
1407		}(time.Now())
1408
1409		if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
1410			log.Error("Error indexing label pair to fingerprints batch: ", err)
1411			p.setDirty(err)
1412		}
1413		if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil {
1414			log.Error("Error indexing label name to label values batch: ", err)
1415			p.setDirty(err)
1416		}
1417		batchSize = 0
1418		nameToValues = index.LabelNameLabelValuesMapping{}
1419		pairToFPs = index.LabelPairFingerprintsMapping{}
1420		batchTimeout.Reset(indexingBatchTimeout)
1421	}
1422
1423	var flush chan chan int
1424loop:
1425	for {
1426		// Only process flush requests if the queue is currently empty.
1427		if len(p.indexingQueue) == 0 {
1428			flush = p.indexingFlush
1429		} else {
1430			flush = nil
1431		}
1432		select {
1433		case <-batchTimeout.C:
1434			// Only commit if we have something to commit _and_
1435			// nothing is waiting in the queue to be picked up. That
1436			// prevents a death spiral if the LookupSet calls below
1437			// are slow for some reason.
1438			if batchSize > 0 && len(p.indexingQueue) == 0 {
1439				commitBatch()
1440			} else {
1441				batchTimeout.Reset(indexingBatchTimeout)
1442			}
1443		case r := <-flush:
1444			if batchSize > 0 {
1445				commitBatch()
1446			}
1447			r <- len(p.indexingQueue)
1448		case op, ok := <-p.indexingQueue:
1449			if !ok {
1450				if batchSize > 0 {
1451					commitBatch()
1452				}
1453				break loop
1454			}
1455
1456			batchSize++
1457			for ln, lv := range op.metric {
1458				lp := model.LabelPair{Name: ln, Value: lv}
1459				baseFPs, ok := pairToFPs[lp]
1460				if !ok {
1461					var err error
1462					baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp)
1463					if err != nil {
1464						log.Errorf("Error looking up label pair %v: %s", lp, err)
1465						continue
1466					}
1467					pairToFPs[lp] = baseFPs
1468				}
1469				baseValues, ok := nameToValues[ln]
1470				if !ok {
1471					var err error
1472					baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln)
1473					if err != nil {
1474						log.Errorf("Error looking up label name %v: %s", ln, err)
1475						continue
1476					}
1477					nameToValues[ln] = baseValues
1478				}
1479				switch op.opType {
1480				case add:
1481					baseFPs[op.fingerprint] = struct{}{}
1482					baseValues[lv] = struct{}{}
1483				case remove:
1484					delete(baseFPs, op.fingerprint)
1485					if len(baseFPs) == 0 {
1486						delete(baseValues, lv)
1487					}
1488				default:
1489					panic("unknown op type")
1490				}
1491			}
1492
1493			if batchSize >= indexingMaxBatchSize {
1494				commitBatch()
1495			}
1496		}
1497	}
1498	close(p.indexingStopped)
1499}
1500
1501// checkpointFPMappings persists the fingerprint mappings. The caller has to
1502// ensure that the provided mappings are not changed concurrently. This method
1503// is only called upon shutdown or during crash recovery, when no samples are
1504// ingested.
1505//
1506// Description of the file format, v1:
1507//
1508// (1) Magic string (const mappingsMagicString).
1509//
1510// (2) Uvarint-encoded format version (const mappingsFormatVersion).
1511//
1512// (3) Uvarint-encoded number of mappings in fpMappings.
1513//
1514// (4) Repeated once per mapping:
1515//
1516// (4.1) The raw fingerprint as big-endian uint64.
1517//
1518// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint.
1519//
1520// (4.3) Repeated once per sub-mapping:
1521//
1522// (4.3.1) The uvarint-encoded length of the unique metric string.
1523// (4.3.2) The unique metric string.
1524// (4.3.3) The mapped fingerprint as big-endian uint64.
1525func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) {
1526	log.Info("Checkpointing fingerprint mappings...")
1527	begin := time.Now()
1528	f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
1529	if err != nil {
1530		return
1531	}
1532
1533	defer func() {
1534		syncErr := f.Sync()
1535		closeErr := f.Close()
1536		if err != nil {
1537			return
1538		}
1539		err = syncErr
1540		if err != nil {
1541			return
1542		}
1543		err = closeErr
1544		if err != nil {
1545			return
1546		}
1547		err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName())
1548		duration := time.Since(begin)
1549		log.Infof("Done checkpointing fingerprint mappings in %v.", duration)
1550	}()
1551
1552	w := bufio.NewWriterSize(f, fileBufSize)
1553
1554	if _, err = w.WriteString(mappingsMagicString); err != nil {
1555		return
1556	}
1557	if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil {
1558		return
1559	}
1560	if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil {
1561		return
1562	}
1563
1564	for fp, mappings := range fpm {
1565		if err = codable.EncodeUint64(w, uint64(fp)); err != nil {
1566			return
1567		}
1568		if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil {
1569			return
1570		}
1571		for ms, mappedFP := range mappings {
1572			if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil {
1573				return
1574			}
1575			if _, err = w.WriteString(ms); err != nil {
1576				return
1577			}
1578			if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil {
1579				return
1580			}
1581		}
1582	}
1583	err = w.Flush()
1584	return
1585}
1586
1587// loadFPMappings loads the fingerprint mappings. It also returns the highest
1588// mapped fingerprint and any error encountered. If p.mappingsFileName is not
1589// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently
1590// with checkpointFPMappings.
1591func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
1592	fpm := fpMappings{}
1593	var highestMappedFP model.Fingerprint
1594
1595	f, err := os.Open(p.mappingsFileName())
1596	if os.IsNotExist(err) {
1597		return fpm, 0, nil
1598	}
1599	if err != nil {
1600		return nil, 0, err
1601	}
1602	defer f.Close()
1603	r := bufio.NewReaderSize(f, fileBufSize)
1604
1605	buf := make([]byte, len(mappingsMagicString))
1606	if _, err := io.ReadFull(r, buf); err != nil {
1607		return nil, 0, err
1608	}
1609	magic := string(buf)
1610	if magic != mappingsMagicString {
1611		return nil, 0, fmt.Errorf(
1612			"unexpected magic string, want %q, got %q",
1613			mappingsMagicString, magic,
1614		)
1615	}
1616	version, err := binary.ReadUvarint(r)
1617	if version != mappingsFormatVersion || err != nil {
1618		return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion)
1619	}
1620	numRawFPs, err := binary.ReadUvarint(r)
1621	if err != nil {
1622		return nil, 0, err
1623	}
1624	for ; numRawFPs > 0; numRawFPs-- {
1625		rawFP, err := codable.DecodeUint64(r)
1626		if err != nil {
1627			return nil, 0, err
1628		}
1629		numMappings, err := binary.ReadUvarint(r)
1630		if err != nil {
1631			return nil, 0, err
1632		}
1633		mappings := make(map[string]model.Fingerprint, numMappings)
1634		for ; numMappings > 0; numMappings-- {
1635			lenMS, err := binary.ReadUvarint(r)
1636			if err != nil {
1637				return nil, 0, err
1638			}
1639			buf := make([]byte, lenMS)
1640			if _, err := io.ReadFull(r, buf); err != nil {
1641				return nil, 0, err
1642			}
1643			fp, err := codable.DecodeUint64(r)
1644			if err != nil {
1645				return nil, 0, err
1646			}
1647			mappedFP := model.Fingerprint(fp)
1648			if mappedFP > highestMappedFP {
1649				highestMappedFP = mappedFP
1650			}
1651			mappings[string(buf)] = mappedFP
1652		}
1653		fpm[model.Fingerprint(rawFP)] = mappings
1654	}
1655	return fpm, highestMappedFP, nil
1656}
1657
1658func (p *persistence) writeChunks(w io.Writer, chunks []chunk.Chunk) error {
1659	b := p.bufPool.Get().([]byte)
1660	defer func() {
1661		// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
1662		// would only put back the original buf.
1663		p.bufPool.Put(b)
1664	}()
1665	numChunks := len(chunks)
1666
1667	for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
1668		if batchSize > len(chunks) {
1669			batchSize = len(chunks)
1670		}
1671		writeSize := batchSize * chunkLenWithHeader
1672		if cap(b) < writeSize {
1673			b = make([]byte, writeSize)
1674		}
1675		b = b[:writeSize]
1676
1677		for i, chunk := range chunks[:batchSize] {
1678			if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
1679				return err
1680			}
1681			if err := chunk.MarshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
1682				return err
1683			}
1684		}
1685		if _, err := w.Write(b); err != nil {
1686			return err
1687		}
1688	}
1689	p.seriesChunksPersisted.Observe(float64(numChunks))
1690	return nil
1691}
1692
1693func offsetForChunkIndex(i int) int64 {
1694	return int64(i * chunkLenWithHeader)
1695}
1696
1697func chunkIndexForOffset(offset int64) (int, error) {
1698	if int(offset)%chunkLenWithHeader != 0 {
1699		return -1, fmt.Errorf(
1700			"offset %d is not a multiple of on-disk chunk length %d",
1701			offset, chunkLenWithHeader,
1702		)
1703	}
1704	return int(offset) / chunkLenWithHeader, nil
1705}
1706
1707func writeChunkHeader(header []byte, c chunk.Chunk) error {
1708	header[chunkHeaderTypeOffset] = byte(c.Encoding())
1709	binary.LittleEndian.PutUint64(
1710		header[chunkHeaderFirstTimeOffset:],
1711		uint64(c.FirstTime()),
1712	)
1713	lt, err := c.NewIterator().LastTimestamp()
1714	if err != nil {
1715		return err
1716	}
1717	binary.LittleEndian.PutUint64(
1718		header[chunkHeaderLastTimeOffset:],
1719		uint64(lt),
1720	)
1721	return nil
1722}
1723