1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"fmt"
18	"io"
19	"math"
20	"path/filepath"
21	"sync"
22	"time"
23
24	"github.com/go-kit/log"
25	"github.com/go-kit/log/level"
26	"github.com/oklog/ulid"
27	"github.com/pkg/errors"
28	"github.com/prometheus/client_golang/prometheus"
29	"go.uber.org/atomic"
30
31	"github.com/prometheus/prometheus/config"
32	"github.com/prometheus/prometheus/pkg/exemplar"
33	"github.com/prometheus/prometheus/pkg/labels"
34	"github.com/prometheus/prometheus/storage"
35	"github.com/prometheus/prometheus/tsdb/chunkenc"
36	"github.com/prometheus/prometheus/tsdb/chunks"
37	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
38	"github.com/prometheus/prometheus/tsdb/index"
39	"github.com/prometheus/prometheus/tsdb/record"
40	"github.com/prometheus/prometheus/tsdb/tombstones"
41	"github.com/prometheus/prometheus/tsdb/tsdbutil"
42	"github.com/prometheus/prometheus/tsdb/wal"
43)
44
45var (
46	// ErrInvalidSample is returned if an appended sample is not valid and can't
47	// be ingested.
48	ErrInvalidSample = errors.New("invalid sample")
49	// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't
50	// be ingested.
51	ErrInvalidExemplar = errors.New("invalid exemplar")
52	// ErrAppenderClosed is returned if an appender has already be successfully
53	// rolled back or committed.
54	ErrAppenderClosed = errors.New("appender closed")
55)
56
57// Head handles reads and writes of time series data within a time window.
58type Head struct {
59	chunkRange               atomic.Int64
60	numSeries                atomic.Uint64
61	minTime, maxTime         atomic.Int64 // Current min and max of the samples included in the head.
62	minValidTime             atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
63	lastWALTruncationTime    atomic.Int64
64	lastMemoryTruncationTime atomic.Int64
65	lastSeriesID             atomic.Uint64
66
67	metrics         *headMetrics
68	opts            *HeadOptions
69	wal             *wal.WAL
70	exemplarMetrics *ExemplarMetrics
71	exemplars       ExemplarStorage
72	logger          log.Logger
73	appendPool      sync.Pool
74	exemplarsPool   sync.Pool
75	seriesPool      sync.Pool
76	bytesPool       sync.Pool
77	memChunkPool    sync.Pool
78
79	// All series addressable by their ID or hash.
80	series *stripeSeries
81
82	deletedMtx sync.Mutex
83	deleted    map[uint64]int // Deleted series, and what WAL segment they must be kept until.
84
85	postings *index.MemPostings // Postings lists for terms.
86
87	tombstones *tombstones.MemTombstones
88
89	iso *isolation
90
91	cardinalityMutex      sync.Mutex
92	cardinalityCache      *index.PostingsStats // Posting stats cache which will expire after 30sec.
93	lastPostingsStatsCall time.Duration        // Last posting stats call (PostingsCardinalityStats()) time for caching.
94
95	// chunkDiskMapper is used to write and read Head chunks to/from disk.
96	chunkDiskMapper *chunks.ChunkDiskMapper
97
98	chunkSnapshotMtx sync.Mutex
99
100	closedMtx sync.Mutex
101	closed    bool
102
103	stats *HeadStats
104	reg   prometheus.Registerer
105
106	memTruncationInProcess atomic.Bool
107}
108
109type ExemplarStorage interface {
110	storage.ExemplarQueryable
111	AddExemplar(labels.Labels, exemplar.Exemplar) error
112	ValidateExemplar(labels.Labels, exemplar.Exemplar) error
113	IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
114}
115
116// HeadOptions are parameters for the Head block.
117type HeadOptions struct {
118	// Runtime reloadable option. At the top of the struct for 32 bit OS:
119	// https://pkg.go.dev/sync/atomic#pkg-note-BUG
120	MaxExemplars atomic.Int64
121
122	ChunkRange int64
123	// ChunkDirRoot is the parent directory of the chunks directory.
124	ChunkDirRoot         string
125	ChunkPool            chunkenc.Pool
126	ChunkWriteBufferSize int
127	// StripeSize sets the number of entries in the hash map, it must be a power of 2.
128	// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
129	// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
130	StripeSize                     int
131	SeriesCallback                 SeriesLifecycleCallback
132	EnableExemplarStorage          bool
133	EnableMemorySnapshotOnShutdown bool
134}
135
136func DefaultHeadOptions() *HeadOptions {
137	return &HeadOptions{
138		ChunkRange:           DefaultBlockDuration,
139		ChunkDirRoot:         "",
140		ChunkPool:            chunkenc.NewPool(),
141		ChunkWriteBufferSize: chunks.DefaultWriteBufferSize,
142		StripeSize:           DefaultStripeSize,
143		SeriesCallback:       &noopSeriesLifecycleCallback{},
144	}
145}
146
147// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
148// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
149// All the callbacks should be safe to be called concurrently.
150// It is up to the user to implement soft or hard consistency by making the callbacks
151// atomic or non-atomic. Atomic callbacks can cause degradation performance.
152type SeriesLifecycleCallback interface {
153	// PreCreation is called before creating a series to indicate if the series can be created.
154	// A non nil error means the series should not be created.
155	PreCreation(labels.Labels) error
156	// PostCreation is called after creating a series to indicate a creation of series.
157	PostCreation(labels.Labels)
158	// PostDeletion is called after deletion of series.
159	PostDeletion(...labels.Labels)
160}
161
162// NewHead opens the head block in dir.
163func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) {
164	var err error
165	if l == nil {
166		l = log.NewNopLogger()
167	}
168	if opts.ChunkRange < 1 {
169		return nil, errors.Errorf("invalid chunk range %d", opts.ChunkRange)
170	}
171	if opts.SeriesCallback == nil {
172		opts.SeriesCallback = &noopSeriesLifecycleCallback{}
173	}
174
175	if stats == nil {
176		stats = NewHeadStats()
177	}
178
179	if !opts.EnableExemplarStorage {
180		opts.MaxExemplars.Store(0)
181	}
182
183	h := &Head{
184		wal:    wal,
185		logger: l,
186		opts:   opts,
187		memChunkPool: sync.Pool{
188			New: func() interface{} {
189				return &memChunk{}
190			},
191		},
192		stats: stats,
193		reg:   r,
194	}
195	if err := h.resetInMemoryState(); err != nil {
196		return nil, err
197	}
198	h.metrics = newHeadMetrics(h, r)
199
200	if opts.ChunkPool == nil {
201		opts.ChunkPool = chunkenc.NewPool()
202	}
203
204	h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
205		mmappedChunksDir(opts.ChunkDirRoot),
206		opts.ChunkPool,
207		opts.ChunkWriteBufferSize,
208	)
209	if err != nil {
210		return nil, err
211	}
212
213	return h, nil
214}
215
216func (h *Head) resetInMemoryState() error {
217	var err error
218	var em *ExemplarMetrics
219	if h.exemplars != nil {
220		ce, ok := h.exemplars.(*CircularExemplarStorage)
221		if ok {
222			em = ce.metrics
223		}
224	}
225	if em == nil {
226		em = NewExemplarMetrics(h.reg)
227	}
228	es, err := NewCircularExemplarStorage(h.opts.MaxExemplars.Load(), em)
229	if err != nil {
230		return err
231	}
232
233	h.exemplarMetrics = em
234	h.exemplars = es
235	h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
236	h.postings = index.NewUnorderedMemPostings()
237	h.tombstones = tombstones.NewMemTombstones()
238	h.iso = newIsolation()
239	h.deleted = map[uint64]int{}
240	h.chunkRange.Store(h.opts.ChunkRange)
241	h.minTime.Store(math.MaxInt64)
242	h.maxTime.Store(math.MinInt64)
243	h.lastWALTruncationTime.Store(math.MinInt64)
244	h.lastMemoryTruncationTime.Store(math.MinInt64)
245	return nil
246}
247
248type headMetrics struct {
249	activeAppenders          prometheus.Gauge
250	series                   prometheus.GaugeFunc
251	seriesCreated            prometheus.Counter
252	seriesRemoved            prometheus.Counter
253	seriesNotFound           prometheus.Counter
254	chunks                   prometheus.Gauge
255	chunksCreated            prometheus.Counter
256	chunksRemoved            prometheus.Counter
257	gcDuration               prometheus.Summary
258	samplesAppended          prometheus.Counter
259	outOfBoundSamples        prometheus.Counter
260	outOfOrderSamples        prometheus.Counter
261	walTruncateDuration      prometheus.Summary
262	walCorruptionsTotal      prometheus.Counter
263	walTotalReplayDuration   prometheus.Gauge
264	headTruncateFail         prometheus.Counter
265	headTruncateTotal        prometheus.Counter
266	checkpointDeleteFail     prometheus.Counter
267	checkpointDeleteTotal    prometheus.Counter
268	checkpointCreationFail   prometheus.Counter
269	checkpointCreationTotal  prometheus.Counter
270	mmapChunkCorruptionTotal prometheus.Counter
271	snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
272}
273
274func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
275	m := &headMetrics{
276		activeAppenders: prometheus.NewGauge(prometheus.GaugeOpts{
277			Name: "prometheus_tsdb_head_active_appenders",
278			Help: "Number of currently active appender transactions",
279		}),
280		series: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
281			Name: "prometheus_tsdb_head_series",
282			Help: "Total number of series in the head block.",
283		}, func() float64 {
284			return float64(h.NumSeries())
285		}),
286		seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
287			Name: "prometheus_tsdb_head_series_created_total",
288			Help: "Total number of series created in the head",
289		}),
290		seriesRemoved: prometheus.NewCounter(prometheus.CounterOpts{
291			Name: "prometheus_tsdb_head_series_removed_total",
292			Help: "Total number of series removed in the head",
293		}),
294		seriesNotFound: prometheus.NewCounter(prometheus.CounterOpts{
295			Name: "prometheus_tsdb_head_series_not_found_total",
296			Help: "Total number of requests for series that were not found.",
297		}),
298		chunks: prometheus.NewGauge(prometheus.GaugeOpts{
299			Name: "prometheus_tsdb_head_chunks",
300			Help: "Total number of chunks in the head block.",
301		}),
302		chunksCreated: prometheus.NewCounter(prometheus.CounterOpts{
303			Name: "prometheus_tsdb_head_chunks_created_total",
304			Help: "Total number of chunks created in the head",
305		}),
306		chunksRemoved: prometheus.NewCounter(prometheus.CounterOpts{
307			Name: "prometheus_tsdb_head_chunks_removed_total",
308			Help: "Total number of chunks removed in the head",
309		}),
310		gcDuration: prometheus.NewSummary(prometheus.SummaryOpts{
311			Name: "prometheus_tsdb_head_gc_duration_seconds",
312			Help: "Runtime of garbage collection in the head block.",
313		}),
314		walTruncateDuration: prometheus.NewSummary(prometheus.SummaryOpts{
315			Name: "prometheus_tsdb_wal_truncate_duration_seconds",
316			Help: "Duration of WAL truncation.",
317		}),
318		walCorruptionsTotal: prometheus.NewCounter(prometheus.CounterOpts{
319			Name: "prometheus_tsdb_wal_corruptions_total",
320			Help: "Total number of WAL corruptions.",
321		}),
322		walTotalReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{
323			Name: "prometheus_tsdb_data_replay_duration_seconds",
324			Help: "Time taken to replay the data on disk.",
325		}),
326		samplesAppended: prometheus.NewCounter(prometheus.CounterOpts{
327			Name: "prometheus_tsdb_head_samples_appended_total",
328			Help: "Total number of appended samples.",
329		}),
330		outOfBoundSamples: prometheus.NewCounter(prometheus.CounterOpts{
331			Name: "prometheus_tsdb_out_of_bound_samples_total",
332			Help: "Total number of out of bound samples ingestion failed attempts.",
333		}),
334		outOfOrderSamples: prometheus.NewCounter(prometheus.CounterOpts{
335			Name: "prometheus_tsdb_out_of_order_samples_total",
336			Help: "Total number of out of order samples ingestion failed attempts.",
337		}),
338		headTruncateFail: prometheus.NewCounter(prometheus.CounterOpts{
339			Name: "prometheus_tsdb_head_truncations_failed_total",
340			Help: "Total number of head truncations that failed.",
341		}),
342		headTruncateTotal: prometheus.NewCounter(prometheus.CounterOpts{
343			Name: "prometheus_tsdb_head_truncations_total",
344			Help: "Total number of head truncations attempted.",
345		}),
346		checkpointDeleteFail: prometheus.NewCounter(prometheus.CounterOpts{
347			Name: "prometheus_tsdb_checkpoint_deletions_failed_total",
348			Help: "Total number of checkpoint deletions that failed.",
349		}),
350		checkpointDeleteTotal: prometheus.NewCounter(prometheus.CounterOpts{
351			Name: "prometheus_tsdb_checkpoint_deletions_total",
352			Help: "Total number of checkpoint deletions attempted.",
353		}),
354		checkpointCreationFail: prometheus.NewCounter(prometheus.CounterOpts{
355			Name: "prometheus_tsdb_checkpoint_creations_failed_total",
356			Help: "Total number of checkpoint creations that failed.",
357		}),
358		checkpointCreationTotal: prometheus.NewCounter(prometheus.CounterOpts{
359			Name: "prometheus_tsdb_checkpoint_creations_total",
360			Help: "Total number of checkpoint creations attempted.",
361		}),
362		mmapChunkCorruptionTotal: prometheus.NewCounter(prometheus.CounterOpts{
363			Name: "prometheus_tsdb_mmap_chunk_corruptions_total",
364			Help: "Total number of memory-mapped chunk corruptions.",
365		}),
366		snapshotReplayErrorTotal: prometheus.NewCounter(prometheus.CounterOpts{
367			Name: "prometheus_tsdb_snapshot_replay_error_total",
368			Help: "Total number snapshot replays that failed.",
369		}),
370	}
371
372	if r != nil {
373		r.MustRegister(
374			m.activeAppenders,
375			m.series,
376			m.chunks,
377			m.chunksCreated,
378			m.chunksRemoved,
379			m.seriesCreated,
380			m.seriesRemoved,
381			m.seriesNotFound,
382			m.gcDuration,
383			m.walTruncateDuration,
384			m.walCorruptionsTotal,
385			m.walTotalReplayDuration,
386			m.samplesAppended,
387			m.outOfBoundSamples,
388			m.outOfOrderSamples,
389			m.headTruncateFail,
390			m.headTruncateTotal,
391			m.checkpointDeleteFail,
392			m.checkpointDeleteTotal,
393			m.checkpointCreationFail,
394			m.checkpointCreationTotal,
395			m.mmapChunkCorruptionTotal,
396			m.snapshotReplayErrorTotal,
397			// Metrics bound to functions and not needed in tests
398			// can be created and registered on the spot.
399			prometheus.NewGaugeFunc(prometheus.GaugeOpts{
400				Name: "prometheus_tsdb_head_max_time",
401				Help: "Maximum timestamp of the head block. The unit is decided by the library consumer.",
402			}, func() float64 {
403				return float64(h.MaxTime())
404			}),
405			prometheus.NewGaugeFunc(prometheus.GaugeOpts{
406				Name: "prometheus_tsdb_head_min_time",
407				Help: "Minimum time bound of the head block. The unit is decided by the library consumer.",
408			}, func() float64 {
409				return float64(h.MinTime())
410			}),
411			prometheus.NewGaugeFunc(prometheus.GaugeOpts{
412				Name: "prometheus_tsdb_isolation_low_watermark",
413				Help: "The lowest TSDB append ID that is still referenced.",
414			}, func() float64 {
415				return float64(h.iso.lowWatermark())
416			}),
417			prometheus.NewGaugeFunc(prometheus.GaugeOpts{
418				Name: "prometheus_tsdb_isolation_high_watermark",
419				Help: "The highest TSDB append ID that has been given out.",
420			}, func() float64 {
421				return float64(h.iso.lastAppendID())
422			}),
423		)
424	}
425	return m
426}
427
428func mmappedChunksDir(dir string) string { return filepath.Join(dir, "chunks_head") }
429
430// HeadStats are the statistics for the head component of the DB.
431type HeadStats struct {
432	WALReplayStatus *WALReplayStatus
433}
434
435// NewHeadStats returns a new HeadStats object.
436func NewHeadStats() *HeadStats {
437	return &HeadStats{
438		WALReplayStatus: &WALReplayStatus{},
439	}
440}
441
442// WALReplayStatus contains status information about the WAL replay.
443type WALReplayStatus struct {
444	sync.RWMutex
445	Min     int
446	Max     int
447	Current int
448}
449
450// GetWALReplayStatus returns the WAL replay status information.
451func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus {
452	s.RLock()
453	defer s.RUnlock()
454
455	return WALReplayStatus{
456		Min:     s.Min,
457		Max:     s.Max,
458		Current: s.Current,
459	}
460}
461
462const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
463
464// Init loads data from the write ahead log and prepares the head for writes.
465// It should be called before using an appender so that it
466// limits the ingested samples to the head min valid time.
467func (h *Head) Init(minValidTime int64) error {
468	h.minValidTime.Store(minValidTime)
469	defer h.postings.EnsureOrder()
470	defer h.gc() // After loading the wal remove the obsolete data from the head.
471	defer func() {
472		// Loading of m-mapped chunks and snapshot can make the mint of the Head
473		// to go below minValidTime.
474		if h.MinTime() < h.minValidTime.Load() {
475			h.minTime.Store(h.minValidTime.Load())
476		}
477	}()
478
479	level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
480	start := time.Now()
481
482	snapIdx, snapOffset := -1, 0
483	refSeries := make(map[uint64]*memSeries)
484
485	if h.opts.EnableMemorySnapshotOnShutdown {
486		level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
487		var err error
488		snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
489		if err != nil {
490			snapIdx, snapOffset = -1, 0
491			h.metrics.snapshotReplayErrorTotal.Inc()
492			level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
493			// We clear the partially loaded data to replay fresh from the WAL.
494			if err := h.resetInMemoryState(); err != nil {
495				return err
496			}
497		}
498		level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
499	}
500
501	mmapChunkReplayStart := time.Now()
502	mmappedChunks, err := h.loadMmappedChunks(refSeries)
503	if err != nil {
504		level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
505		if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
506			h.metrics.mmapChunkCorruptionTotal.Inc()
507		}
508		// If this fails, data will be recovered from WAL.
509		// Hence we wont lose any data (given WAL is not corrupt).
510		mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries)
511	}
512
513	level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
514	if h.wal == nil {
515		level.Info(h.logger).Log("msg", "WAL not found")
516		return nil
517	}
518
519	level.Info(h.logger).Log("msg", "Replaying WAL, this may take a while")
520
521	checkpointReplayStart := time.Now()
522	// Backfill the checkpoint first if it exists.
523	dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir())
524	if err != nil && err != record.ErrNotFound {
525		return errors.Wrap(err, "find last checkpoint")
526	}
527
528	// Find the last segment.
529	_, endAt, e := wal.Segments(h.wal.Dir())
530	if e != nil {
531		return errors.Wrap(e, "finding WAL segments")
532	}
533
534	h.startWALReplayStatus(startFrom, endAt)
535
536	multiRef := map[uint64]uint64{}
537	if err == nil && startFrom >= snapIdx {
538		sr, err := wal.NewSegmentsReader(dir)
539		if err != nil {
540			return errors.Wrap(err, "open checkpoint")
541		}
542		defer func() {
543			if err := sr.Close(); err != nil {
544				level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
545			}
546		}()
547
548		// A corrupted checkpoint is a hard error for now and requires user
549		// intervention. There's likely little data that can be recovered anyway.
550		if err := h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks); err != nil {
551			return errors.Wrap(err, "backfill checkpoint")
552		}
553		h.updateWALReplayStatusRead(startFrom)
554		startFrom++
555		level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
556	}
557	checkpointReplayDuration := time.Since(checkpointReplayStart)
558
559	walReplayStart := time.Now()
560
561	if snapIdx > startFrom {
562		startFrom = snapIdx
563	}
564	// Backfill segments from the most recent checkpoint onwards.
565	for i := startFrom; i <= endAt; i++ {
566		s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
567		if err != nil {
568			return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
569		}
570
571		offset := 0
572		if i == snapIdx {
573			offset = snapOffset
574		}
575		sr, err := wal.NewSegmentBufReaderWithOffset(offset, s)
576		if errors.Cause(err) == io.EOF {
577			// File does not exist.
578			continue
579		}
580		if err != nil {
581			return errors.Wrapf(err, "segment reader (offset=%d)", offset)
582		}
583		err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks)
584		if err := sr.Close(); err != nil {
585			level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
586		}
587		if err != nil {
588			return err
589		}
590		level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt)
591		h.updateWALReplayStatusRead(i)
592	}
593
594	walReplayDuration := time.Since(start)
595	h.metrics.walTotalReplayDuration.Set(walReplayDuration.Seconds())
596	level.Info(h.logger).Log(
597		"msg", "WAL replay completed",
598		"checkpoint_replay_duration", checkpointReplayDuration.String(),
599		"wal_replay_duration", time.Since(walReplayStart).String(),
600		"total_replay_duration", walReplayDuration.String(),
601	)
602
603	return nil
604}
605
606func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) {
607	mmappedChunks := map[uint64][]*mmappedChunk{}
608	if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
609		if maxt < h.minValidTime.Load() {
610			return nil
611		}
612		ms, ok := refSeries[seriesRef]
613		if !ok {
614			slice := mmappedChunks[seriesRef]
615			if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint {
616				return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
617			}
618
619			slice = append(slice, &mmappedChunk{
620				ref:        chunkRef,
621				minTime:    mint,
622				maxTime:    maxt,
623				numSamples: numSamples,
624			})
625			mmappedChunks[seriesRef] = slice
626			return nil
627		}
628
629		if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint {
630			return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
631		}
632
633		h.metrics.chunks.Inc()
634		h.metrics.chunksCreated.Inc()
635		ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{
636			ref:        chunkRef,
637			minTime:    mint,
638			maxTime:    maxt,
639			numSamples: numSamples,
640		})
641		h.updateMinMaxTime(mint, maxt)
642		if ms.headChunk != nil && maxt >= ms.headChunk.minTime {
643			// The head chunk was completed and was m-mapped after taking the snapshot.
644			// Hence remove this chunk.
645			ms.nextAt = 0
646			ms.headChunk = nil
647			ms.app = nil
648		}
649		return nil
650	}); err != nil {
651		return nil, errors.Wrap(err, "iterate on on-disk chunks")
652	}
653	return mmappedChunks, nil
654}
655
656// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
657// loaded mmapped chunks.
658func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[uint64]*memSeries) map[uint64][]*mmappedChunk {
659	level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
660
661	if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
662		level.Info(h.logger).Log("msg", "Deletion of mmap chunk files failed, discarding chunk files completely", "err", err)
663		return map[uint64][]*mmappedChunk{}
664	}
665
666	level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
667	mmappedChunks, err := h.loadMmappedChunks(refSeries)
668	if err != nil {
669		level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
670		mmappedChunks = map[uint64][]*mmappedChunk{}
671	}
672
673	return mmappedChunks
674}
675
676func (h *Head) ApplyConfig(cfg *config.Config) error {
677	if !h.opts.EnableExemplarStorage {
678		return nil
679	}
680
681	// Head uses opts.MaxExemplars in combination with opts.EnableExemplarStorage
682	// to decide if it should pass exemplars along to it's exemplar storage, so we
683	// need to update opts.MaxExemplars here.
684	prevSize := h.opts.MaxExemplars.Load()
685	h.opts.MaxExemplars.Store(cfg.StorageConfig.ExemplarsConfig.MaxExemplars)
686
687	if prevSize == h.opts.MaxExemplars.Load() {
688		return nil
689	}
690
691	migrated := h.exemplars.(*CircularExemplarStorage).Resize(h.opts.MaxExemplars.Load())
692	level.Info(h.logger).Log("msg", "Exemplar storage resized", "from", prevSize, "to", h.opts.MaxExemplars, "migrated", migrated)
693	return nil
694}
695
696// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
697func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats {
698	h.cardinalityMutex.Lock()
699	defer h.cardinalityMutex.Unlock()
700	currentTime := time.Duration(time.Now().Unix()) * time.Second
701	seconds := currentTime - h.lastPostingsStatsCall
702	if seconds > cardinalityCacheExpirationTime {
703		h.cardinalityCache = nil
704	}
705	if h.cardinalityCache != nil {
706		return h.cardinalityCache
707	}
708	h.cardinalityCache = h.postings.Stats(statsByLabelName)
709	h.lastPostingsStatsCall = time.Duration(time.Now().Unix()) * time.Second
710
711	return h.cardinalityCache
712}
713
714func (h *Head) updateMinMaxTime(mint, maxt int64) {
715	for {
716		lt := h.MinTime()
717		if mint >= lt {
718			break
719		}
720		if h.minTime.CAS(lt, mint) {
721			break
722		}
723	}
724	for {
725		ht := h.MaxTime()
726		if maxt <= ht {
727			break
728		}
729		if h.maxTime.CAS(ht, maxt) {
730			break
731		}
732	}
733}
734
735// SetMinValidTime sets the minimum timestamp the head can ingest.
736func (h *Head) SetMinValidTime(minValidTime int64) {
737	h.minValidTime.Store(minValidTime)
738}
739
740// Truncate removes old data before mint from the head and WAL.
741func (h *Head) Truncate(mint int64) (err error) {
742	initialize := h.MinTime() == math.MaxInt64
743	if err := h.truncateMemory(mint); err != nil {
744		return err
745	}
746	if initialize {
747		return nil
748	}
749	return h.truncateWAL(mint)
750}
751
752// OverlapsClosedInterval returns true if the head overlaps [mint, maxt].
753func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool {
754	return h.MinTime() <= maxt && mint <= h.MaxTime()
755}
756
757// truncateMemory removes old data before mint from the head.
758func (h *Head) truncateMemory(mint int64) (err error) {
759	h.chunkSnapshotMtx.Lock()
760	defer h.chunkSnapshotMtx.Unlock()
761
762	defer func() {
763		if err != nil {
764			h.metrics.headTruncateFail.Inc()
765		}
766	}()
767
768	initialize := h.MinTime() == math.MaxInt64
769
770	if h.MinTime() >= mint && !initialize {
771		return nil
772	}
773
774	// The order of these two Store() should not be changed,
775	// i.e. truncation time is set before in-process boolean.
776	h.lastMemoryTruncationTime.Store(mint)
777	h.memTruncationInProcess.Store(true)
778	defer h.memTruncationInProcess.Store(false)
779
780	// We wait for pending queries to end that overlap with this truncation.
781	if !initialize {
782		h.WaitForPendingReadersInTimeRange(h.MinTime(), mint)
783	}
784
785	h.minTime.Store(mint)
786	h.minValidTime.Store(mint)
787
788	// Ensure that max time is at least as high as min time.
789	for h.MaxTime() < mint {
790		h.maxTime.CAS(h.MaxTime(), mint)
791	}
792
793	// This was an initial call to Truncate after loading blocks on startup.
794	// We haven't read back the WAL yet, so do not attempt to truncate it.
795	if initialize {
796		return nil
797	}
798
799	h.metrics.headTruncateTotal.Inc()
800	start := time.Now()
801
802	actualMint := h.gc()
803	level.Info(h.logger).Log("msg", "Head GC completed", "duration", time.Since(start))
804	h.metrics.gcDuration.Observe(time.Since(start).Seconds())
805	if actualMint > h.minTime.Load() {
806		// The actual mint of the Head is higher than the one asked to truncate.
807		appendableMinValidTime := h.appendableMinValidTime()
808		if actualMint < appendableMinValidTime {
809			h.minTime.Store(actualMint)
810			h.minValidTime.Store(actualMint)
811		} else {
812			// The actual min time is in the appendable window.
813			// So we set the mint to the appendableMinValidTime.
814			h.minTime.Store(appendableMinValidTime)
815			h.minValidTime.Store(appendableMinValidTime)
816		}
817	}
818
819	// Truncate the chunk m-mapper.
820	if err := h.chunkDiskMapper.Truncate(mint); err != nil {
821		return errors.Wrap(err, "truncate chunks.HeadReadWriter")
822	}
823	return nil
824}
825
826// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
827// The query timeout limits the max wait time of this function implicitly.
828// The mint is inclusive and maxt is the truncation time hence exclusive.
829func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
830	maxt-- // Making it inclusive before checking overlaps.
831	overlaps := func() bool {
832		o := false
833		h.iso.TraverseOpenReads(func(s *isolationState) bool {
834			if s.mint <= maxt && mint <= s.maxt {
835				// Overlaps with the truncation range.
836				o = true
837				return false
838			}
839			return true
840		})
841		return o
842	}
843	for overlaps() {
844		time.Sleep(500 * time.Millisecond)
845	}
846}
847
848// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier
849// has to be created. In the latter case, the method also returns the new mint to be used for creating the
850// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
851//
852// NOTE: The querier should already be taken before calling this.
853func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose bool, getNew bool, newMint int64) {
854	if !h.memTruncationInProcess.Load() {
855		return false, false, 0
856	}
857	// Head truncation is in process. It also means that the block that was
858	// created for this truncation range is also available.
859	// Check if we took a querier that overlaps with this truncation.
860	memTruncTime := h.lastMemoryTruncationTime.Load()
861	if querierMaxt < memTruncTime {
862		// Head compaction has happened and this time range is being truncated.
863		// This query doesn't overlap with the Head any longer.
864		// We should close this querier to avoid races and the data would be
865		// available with the blocks below.
866		// Cases:
867		// 1.     |------truncation------|
868		//   |---query---|
869		// 2.     |------truncation------|
870		//              |---query---|
871		return true, false, 0
872	}
873	if querierMint < memTruncTime {
874		// The truncation time is not same as head mint that we saw above but the
875		// query still overlaps with the Head.
876		// The truncation started after we got the querier. So it is not safe
877		// to use this querier and/or might block truncation. We should get
878		// a new querier for the new Head range while remaining will be available
879		// in the blocks below.
880		// Case:
881		//      |------truncation------|
882		//                        |----query----|
883		// Turns into
884		//      |------truncation------|
885		//                             |---qu---|
886		return true, true, memTruncTime
887	}
888
889	// Other case is this, which is a no-op
890	//      |------truncation------|
891	//                              |---query---|
892	return false, false, 0
893}
894
895// truncateWAL removes old data before mint from the WAL.
896func (h *Head) truncateWAL(mint int64) error {
897	h.chunkSnapshotMtx.Lock()
898	defer h.chunkSnapshotMtx.Unlock()
899
900	if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
901		return nil
902	}
903	start := time.Now()
904	h.lastWALTruncationTime.Store(mint)
905
906	first, last, err := wal.Segments(h.wal.Dir())
907	if err != nil {
908		return errors.Wrap(err, "get segment range")
909	}
910	// Start a new segment, so low ingestion volume TSDB don't have more WAL than
911	// needed.
912	if err := h.wal.NextSegment(); err != nil {
913		return errors.Wrap(err, "next segment")
914	}
915	last-- // Never consider last segment for checkpoint.
916	if last < 0 {
917		return nil // no segments yet.
918	}
919	// The lower two thirds of segments should contain mostly obsolete samples.
920	// If we have less than two segments, it's not worth checkpointing yet.
921	// With the default 2h blocks, this will keeping up to around 3h worth
922	// of WAL segments.
923	last = first + (last-first)*2/3
924	if last <= first {
925		return nil
926	}
927
928	keep := func(id uint64) bool {
929		if h.series.getByID(id) != nil {
930			return true
931		}
932		h.deletedMtx.Lock()
933		_, ok := h.deleted[id]
934		h.deletedMtx.Unlock()
935		return ok
936	}
937	h.metrics.checkpointCreationTotal.Inc()
938	if _, err = wal.Checkpoint(h.logger, h.wal, first, last, keep, mint); err != nil {
939		h.metrics.checkpointCreationFail.Inc()
940		if _, ok := errors.Cause(err).(*wal.CorruptionErr); ok {
941			h.metrics.walCorruptionsTotal.Inc()
942		}
943		return errors.Wrap(err, "create checkpoint")
944	}
945	if err := h.wal.Truncate(last + 1); err != nil {
946		// If truncating fails, we'll just try again at the next checkpoint.
947		// Leftover segments will just be ignored in the future if there's a checkpoint
948		// that supersedes them.
949		level.Error(h.logger).Log("msg", "truncating segments failed", "err", err)
950	}
951
952	// The checkpoint is written and segments before it is truncated, so we no
953	// longer need to track deleted series that are before it.
954	h.deletedMtx.Lock()
955	for ref, segment := range h.deleted {
956		if segment < first {
957			delete(h.deleted, ref)
958		}
959	}
960	h.deletedMtx.Unlock()
961
962	h.metrics.checkpointDeleteTotal.Inc()
963	if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil {
964		// Leftover old checkpoints do not cause problems down the line beyond
965		// occupying disk space.
966		// They will just be ignored since a higher checkpoint exists.
967		level.Error(h.logger).Log("msg", "delete old checkpoints", "err", err)
968		h.metrics.checkpointDeleteFail.Inc()
969	}
970	h.metrics.walTruncateDuration.Observe(time.Since(start).Seconds())
971
972	level.Info(h.logger).Log("msg", "WAL checkpoint complete",
973		"first", first, "last", last, "duration", time.Since(start))
974
975	return nil
976}
977
978type Stats struct {
979	NumSeries         uint64
980	MinTime, MaxTime  int64
981	IndexPostingStats *index.PostingsStats
982}
983
984// Stats returns important current HEAD statistics. Note that it is expensive to
985// calculate these.
986func (h *Head) Stats(statsByLabelName string) *Stats {
987	return &Stats{
988		NumSeries:         h.NumSeries(),
989		MaxTime:           h.MaxTime(),
990		MinTime:           h.MinTime(),
991		IndexPostingStats: h.PostingsCardinalityStats(statsByLabelName),
992	}
993}
994
995type RangeHead struct {
996	head       *Head
997	mint, maxt int64
998}
999
1000// NewRangeHead returns a *RangeHead.
1001func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
1002	return &RangeHead{
1003		head: head,
1004		mint: mint,
1005		maxt: maxt,
1006	}
1007}
1008
1009func (h *RangeHead) Index() (IndexReader, error) {
1010	return h.head.indexRange(h.mint, h.maxt), nil
1011}
1012
1013func (h *RangeHead) Chunks() (ChunkReader, error) {
1014	return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt))
1015}
1016
1017func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
1018	return h.head.tombstones, nil
1019}
1020
1021func (h *RangeHead) MinTime() int64 {
1022	return h.mint
1023}
1024
1025// MaxTime returns the max time of actual data fetch-able from the head.
1026// This controls the chunks time range which is closed [b.MinTime, b.MaxTime].
1027func (h *RangeHead) MaxTime() int64 {
1028	return h.maxt
1029}
1030
1031// BlockMaxTime returns the max time of the potential block created from this head.
1032// It's different to MaxTime as we need to add +1 millisecond to block maxt because block
1033// intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
1034func (h *RangeHead) BlockMaxTime() int64 {
1035	return h.MaxTime() + 1
1036}
1037
1038func (h *RangeHead) NumSeries() uint64 {
1039	return h.head.NumSeries()
1040}
1041
1042func (h *RangeHead) Meta() BlockMeta {
1043	return BlockMeta{
1044		MinTime: h.MinTime(),
1045		MaxTime: h.MaxTime(),
1046		ULID:    h.head.Meta().ULID,
1047		Stats: BlockStats{
1048			NumSeries: h.NumSeries(),
1049		},
1050	}
1051}
1052
1053// String returns an human readable representation of the range head. It's important to
1054// keep this function in order to avoid the struct dump when the head is stringified in
1055// errors or logs.
1056func (h *RangeHead) String() string {
1057	return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
1058}
1059
1060// Delete all samples in the range of [mint, maxt] for series that satisfy the given
1061// label matchers.
1062func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
1063	// Do not delete anything beyond the currently valid range.
1064	mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
1065
1066	ir := h.indexRange(mint, maxt)
1067
1068	p, err := PostingsForMatchers(ir, ms...)
1069	if err != nil {
1070		return errors.Wrap(err, "select series")
1071	}
1072
1073	var stones []tombstones.Stone
1074	for p.Next() {
1075		series := h.series.getByID(p.At())
1076
1077		series.RLock()
1078		t0, t1 := series.minTime(), series.maxTime()
1079		series.RUnlock()
1080		if t0 == math.MinInt64 || t1 == math.MinInt64 {
1081			continue
1082		}
1083		// Delete only until the current values and not beyond.
1084		t0, t1 = clampInterval(mint, maxt, t0, t1)
1085		stones = append(stones, tombstones.Stone{Ref: p.At(), Intervals: tombstones.Intervals{{Mint: t0, Maxt: t1}}})
1086	}
1087	if p.Err() != nil {
1088		return p.Err()
1089	}
1090	if h.wal != nil {
1091		var enc record.Encoder
1092		if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
1093			return err
1094		}
1095	}
1096	for _, s := range stones {
1097		h.tombstones.AddInterval(s.Ref, s.Intervals[0])
1098	}
1099
1100	return nil
1101}
1102
1103// gc removes data before the minimum timestamp from the head.
1104// It returns the actual min times of the chunks present in the Head.
1105func (h *Head) gc() int64 {
1106	// Only data strictly lower than this timestamp must be deleted.
1107	mint := h.MinTime()
1108
1109	// Drop old chunks and remember series IDs and hashes if they can be
1110	// deleted entirely.
1111	deleted, chunksRemoved, actualMint := h.series.gc(mint)
1112	seriesRemoved := len(deleted)
1113
1114	h.metrics.seriesRemoved.Add(float64(seriesRemoved))
1115	h.metrics.chunksRemoved.Add(float64(chunksRemoved))
1116	h.metrics.chunks.Sub(float64(chunksRemoved))
1117	h.numSeries.Sub(uint64(seriesRemoved))
1118
1119	// Remove deleted series IDs from the postings lists.
1120	h.postings.Delete(deleted)
1121
1122	// Remove tombstones referring to the deleted series.
1123	h.tombstones.DeleteTombstones(deleted)
1124	h.tombstones.TruncateBefore(mint)
1125
1126	if h.wal != nil {
1127		_, last, _ := wal.Segments(h.wal.Dir())
1128		h.deletedMtx.Lock()
1129		// Keep series records until we're past segment 'last'
1130		// because the WAL will still have samples records with
1131		// this ref ID. If we didn't keep these series records then
1132		// on start up when we replay the WAL, or any other code
1133		// that reads the WAL, wouldn't be able to use those
1134		// samples since we would have no labels for that ref ID.
1135		for ref := range deleted {
1136			h.deleted[ref] = last
1137		}
1138		h.deletedMtx.Unlock()
1139	}
1140
1141	return actualMint
1142}
1143
1144// Tombstones returns a new reader over the head's tombstones
1145func (h *Head) Tombstones() (tombstones.Reader, error) {
1146	return h.tombstones, nil
1147}
1148
1149// NumSeries returns the number of active series in the head.
1150func (h *Head) NumSeries() uint64 {
1151	return h.numSeries.Load()
1152}
1153
1154// Meta returns meta information about the head.
1155// The head is dynamic so will return dynamic results.
1156func (h *Head) Meta() BlockMeta {
1157	var id [16]byte
1158	copy(id[:], "______head______")
1159	return BlockMeta{
1160		MinTime: h.MinTime(),
1161		MaxTime: h.MaxTime(),
1162		ULID:    ulid.ULID(id),
1163		Stats: BlockStats{
1164			NumSeries: h.NumSeries(),
1165		},
1166	}
1167}
1168
1169// MinTime returns the lowest time bound on visible data in the head.
1170func (h *Head) MinTime() int64 {
1171	return h.minTime.Load()
1172}
1173
1174// MaxTime returns the highest timestamp seen in data of the head.
1175func (h *Head) MaxTime() int64 {
1176	return h.maxTime.Load()
1177}
1178
1179// compactable returns whether the head has a compactable range.
1180// The head has a compactable range when the head time range is 1.5 times the chunk range.
1181// The 0.5 acts as a buffer of the appendable window.
1182func (h *Head) compactable() bool {
1183	return h.MaxTime()-h.MinTime() > h.chunkRange.Load()/2*3
1184}
1185
1186// Close flushes the WAL and closes the head.
1187// It also takes a snapshot of in-memory chunks if enabled.
1188func (h *Head) Close() error {
1189	h.closedMtx.Lock()
1190	defer h.closedMtx.Unlock()
1191	h.closed = true
1192	errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
1193	if h.wal != nil {
1194		errs.Add(h.wal.Close())
1195	}
1196	if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
1197		errs.Add(h.performChunkSnapshot())
1198	}
1199	return errs.Err()
1200
1201}
1202
1203// String returns an human readable representation of the TSDB head. It's important to
1204// keep this function in order to avoid the struct dump when the head is stringified in
1205// errors or logs.
1206func (h *Head) String() string {
1207	return "head"
1208}
1209
1210func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
1211	// Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create
1212	// a new series on every sample inserted via Add(), which causes allocations
1213	// and makes our series IDs rather random and harder to compress in postings.
1214	s := h.series.getByHash(hash, lset)
1215	if s != nil {
1216		return s, false, nil
1217	}
1218
1219	// Optimistically assume that we are the first one to create the series.
1220	id := h.lastSeriesID.Inc()
1221
1222	return h.getOrCreateWithID(id, hash, lset)
1223}
1224
1225func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
1226	s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
1227		return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool)
1228	})
1229	if err != nil {
1230		return nil, false, err
1231	}
1232	if !created {
1233		return s, false, nil
1234	}
1235
1236	h.metrics.seriesCreated.Inc()
1237	h.numSeries.Inc()
1238
1239	h.postings.Add(id, lset)
1240	return s, true, nil
1241}
1242
1243// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
1244// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
1245// Its methods require the hash to be submitted with it to avoid re-computations throughout
1246// the code.
1247type seriesHashmap map[uint64][]*memSeries
1248
1249func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries {
1250	for _, s := range m[hash] {
1251		if labels.Equal(s.lset, lset) {
1252			return s
1253		}
1254	}
1255	return nil
1256}
1257
1258func (m seriesHashmap) set(hash uint64, s *memSeries) {
1259	l := m[hash]
1260	for i, prev := range l {
1261		if labels.Equal(prev.lset, s.lset) {
1262			l[i] = s
1263			return
1264		}
1265	}
1266	m[hash] = append(l, s)
1267}
1268
1269func (m seriesHashmap) del(hash uint64, lset labels.Labels) {
1270	var rem []*memSeries
1271	for _, s := range m[hash] {
1272		if !labels.Equal(s.lset, lset) {
1273			rem = append(rem, s)
1274		}
1275	}
1276	if len(rem) == 0 {
1277		delete(m, hash)
1278	} else {
1279		m[hash] = rem
1280	}
1281}
1282
1283const (
1284	// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
1285	DefaultStripeSize = 1 << 14
1286)
1287
1288// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention.
1289// The locks are padded to not be on the same cache line. Filling the padded space
1290// with the maps was profiled to be slower – likely due to the additional pointer
1291// dereferences.
1292type stripeSeries struct {
1293	size                    int
1294	series                  []map[uint64]*memSeries
1295	hashes                  []seriesHashmap
1296	locks                   []stripeLock
1297	seriesLifecycleCallback SeriesLifecycleCallback
1298}
1299
1300type stripeLock struct {
1301	sync.RWMutex
1302	// Padding to avoid multiple locks being on the same cache line.
1303	_ [40]byte
1304}
1305
1306func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *stripeSeries {
1307	s := &stripeSeries{
1308		size:                    stripeSize,
1309		series:                  make([]map[uint64]*memSeries, stripeSize),
1310		hashes:                  make([]seriesHashmap, stripeSize),
1311		locks:                   make([]stripeLock, stripeSize),
1312		seriesLifecycleCallback: seriesCallback,
1313	}
1314
1315	for i := range s.series {
1316		s.series[i] = map[uint64]*memSeries{}
1317	}
1318	for i := range s.hashes {
1319		s.hashes[i] = seriesHashmap{}
1320	}
1321	return s
1322}
1323
1324// gc garbage collects old chunks that are strictly before mint and removes
1325// series entirely that have no chunks left.
1326func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int, int64) {
1327	var (
1328		deleted                  = map[uint64]struct{}{}
1329		deletedForCallback       = []labels.Labels{}
1330		rmChunks                 = 0
1331		actualMint         int64 = math.MaxInt64
1332	)
1333	// Run through all series and truncate old chunks. Mark those with no
1334	// chunks left as deleted and store their ID.
1335	for i := 0; i < s.size; i++ {
1336		s.locks[i].Lock()
1337
1338		for hash, all := range s.hashes[i] {
1339			for _, series := range all {
1340				series.Lock()
1341				rmChunks += series.truncateChunksBefore(mint)
1342
1343				if len(series.mmappedChunks) > 0 || series.headChunk != nil || series.pendingCommit {
1344					seriesMint := series.minTime()
1345					if seriesMint < actualMint {
1346						actualMint = seriesMint
1347					}
1348					series.Unlock()
1349					continue
1350				}
1351
1352				// The series is gone entirely. We need to keep the series lock
1353				// and make sure we have acquired the stripe locks for hash and ID of the
1354				// series alike.
1355				// If we don't hold them all, there's a very small chance that a series receives
1356				// samples again while we are half-way into deleting it.
1357				j := int(series.ref) & (s.size - 1)
1358
1359				if i != j {
1360					s.locks[j].Lock()
1361				}
1362
1363				deleted[series.ref] = struct{}{}
1364				s.hashes[i].del(hash, series.lset)
1365				delete(s.series[j], series.ref)
1366				deletedForCallback = append(deletedForCallback, series.lset)
1367
1368				if i != j {
1369					s.locks[j].Unlock()
1370				}
1371
1372				series.Unlock()
1373			}
1374		}
1375
1376		s.locks[i].Unlock()
1377
1378		s.seriesLifecycleCallback.PostDeletion(deletedForCallback...)
1379		deletedForCallback = deletedForCallback[:0]
1380	}
1381
1382	if actualMint == math.MaxInt64 {
1383		actualMint = mint
1384	}
1385
1386	return deleted, rmChunks, actualMint
1387}
1388
1389func (s *stripeSeries) getByID(id uint64) *memSeries {
1390	i := id & uint64(s.size-1)
1391
1392	s.locks[i].RLock()
1393	series := s.series[i][id]
1394	s.locks[i].RUnlock()
1395
1396	return series
1397}
1398
1399func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
1400	i := hash & uint64(s.size-1)
1401
1402	s.locks[i].RLock()
1403	series := s.hashes[i].get(hash, lset)
1404	s.locks[i].RUnlock()
1405
1406	return series
1407}
1408
1409func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) {
1410	// PreCreation is called here to avoid calling it inside the lock.
1411	// It is not necessary to call it just before creating a series,
1412	// rather it gives a 'hint' whether to create a series or not.
1413	preCreationErr := s.seriesLifecycleCallback.PreCreation(lset)
1414
1415	// Create the series, unless the PreCreation() callback as failed.
1416	// If failed, we'll not allow to create a new series anyway.
1417	var series *memSeries
1418	if preCreationErr == nil {
1419		series = createSeries()
1420	}
1421
1422	i := hash & uint64(s.size-1)
1423	s.locks[i].Lock()
1424
1425	if prev := s.hashes[i].get(hash, lset); prev != nil {
1426		s.locks[i].Unlock()
1427		return prev, false, nil
1428	}
1429	if preCreationErr == nil {
1430		s.hashes[i].set(hash, series)
1431	}
1432	s.locks[i].Unlock()
1433
1434	if preCreationErr != nil {
1435		// The callback prevented creation of series.
1436		return nil, false, preCreationErr
1437	}
1438	// Setting the series in the s.hashes marks the creation of series
1439	// as any further calls to this methods would return that series.
1440	s.seriesLifecycleCallback.PostCreation(series.lset)
1441
1442	i = series.ref & uint64(s.size-1)
1443
1444	s.locks[i].Lock()
1445	s.series[i][series.ref] = series
1446	s.locks[i].Unlock()
1447
1448	return series, true, nil
1449}
1450
1451type sample struct {
1452	t int64
1453	v float64
1454}
1455
1456func newSample(t int64, v float64) tsdbutil.Sample { return sample{t, v} }
1457func (s sample) T() int64                          { return s.t }
1458func (s sample) V() float64                        { return s.v }
1459
1460// memSeries is the in-memory representation of a series. None of its methods
1461// are goroutine safe and it is the caller's responsibility to lock it.
1462type memSeries struct {
1463	sync.RWMutex
1464
1465	ref           uint64
1466	lset          labels.Labels
1467	mmappedChunks []*mmappedChunk
1468	mmMaxTime     int64 // Max time of any mmapped chunk, only used during WAL replay.
1469	headChunk     *memChunk
1470	chunkRange    int64
1471	firstChunkID  int
1472
1473	nextAt        int64 // Timestamp at which to cut the next chunk.
1474	sampleBuf     [4]sample
1475	pendingCommit bool // Whether there are samples waiting to be committed to this series.
1476
1477	app chunkenc.Appender // Current appender for the chunk.
1478
1479	memChunkPool *sync.Pool
1480
1481	txs *txRing
1482}
1483
1484func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries {
1485	s := &memSeries{
1486		lset:         lset,
1487		ref:          id,
1488		chunkRange:   chunkRange,
1489		nextAt:       math.MinInt64,
1490		txs:          newTxRing(4),
1491		memChunkPool: memChunkPool,
1492	}
1493	return s
1494}
1495
1496func (s *memSeries) minTime() int64 {
1497	if len(s.mmappedChunks) > 0 {
1498		return s.mmappedChunks[0].minTime
1499	}
1500	if s.headChunk != nil {
1501		return s.headChunk.minTime
1502	}
1503	return math.MinInt64
1504}
1505
1506func (s *memSeries) maxTime() int64 {
1507	c := s.head()
1508	if c != nil {
1509		return c.maxTime
1510	}
1511	if len(s.mmappedChunks) > 0 {
1512		return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime
1513	}
1514	return math.MinInt64
1515}
1516
1517// truncateChunksBefore removes all chunks from the series that
1518// have no timestamp at or after mint.
1519// Chunk IDs remain unchanged.
1520func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
1521	if s.headChunk != nil && s.headChunk.maxTime < mint {
1522		// If head chunk is truncated, we can truncate all mmapped chunks.
1523		removed = 1 + len(s.mmappedChunks)
1524		s.firstChunkID += removed
1525		s.headChunk = nil
1526		s.mmappedChunks = nil
1527		return removed
1528	}
1529	if len(s.mmappedChunks) > 0 {
1530		for i, c := range s.mmappedChunks {
1531			if c.maxTime >= mint {
1532				break
1533			}
1534			removed = i + 1
1535		}
1536		s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...)
1537		s.firstChunkID += removed
1538	}
1539	return removed
1540}
1541
1542// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
1543// acquiring lock.
1544func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
1545	s.txs.cleanupAppendIDsBelow(bound)
1546}
1547
1548func (s *memSeries) head() *memChunk {
1549	return s.headChunk
1550}
1551
1552type memChunk struct {
1553	chunk            chunkenc.Chunk
1554	minTime, maxTime int64
1555}
1556
1557// OverlapsClosedInterval returns true if the chunk overlaps [mint, maxt].
1558func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
1559	return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
1560}
1561
1562func overlapsClosedInterval(mint1, maxt1, mint2, maxt2 int64) bool {
1563	return mint1 <= maxt2 && mint2 <= maxt1
1564}
1565
1566type mmappedChunk struct {
1567	ref              uint64
1568	numSamples       uint16
1569	minTime, maxTime int64
1570}
1571
1572// Returns true if the chunk overlaps [mint, maxt].
1573func (mc *mmappedChunk) OverlapsClosedInterval(mint, maxt int64) bool {
1574	return overlapsClosedInterval(mc.minTime, mc.maxTime, mint, maxt)
1575}
1576
1577type noopSeriesLifecycleCallback struct{}
1578
1579func (noopSeriesLifecycleCallback) PreCreation(labels.Labels) error { return nil }
1580func (noopSeriesLifecycleCallback) PostCreation(labels.Labels)      {}
1581func (noopSeriesLifecycleCallback) PostDeletion(...labels.Labels)   {}
1582
1583func (h *Head) Size() int64 {
1584	var walSize int64
1585	if h.wal != nil {
1586		walSize, _ = h.wal.Size()
1587	}
1588	cdmSize, _ := h.chunkDiskMapper.Size()
1589	return walSize + cdmSize
1590}
1591
1592func (h *RangeHead) Size() int64 {
1593	return h.head.Size()
1594}
1595
1596func (h *Head) startWALReplayStatus(startFrom, last int) {
1597	h.stats.WALReplayStatus.Lock()
1598	defer h.stats.WALReplayStatus.Unlock()
1599
1600	h.stats.WALReplayStatus.Min = startFrom
1601	h.stats.WALReplayStatus.Max = last
1602	h.stats.WALReplayStatus.Current = startFrom
1603}
1604
1605func (h *Head) updateWALReplayStatusRead(current int) {
1606	h.stats.WALReplayStatus.Lock()
1607	defer h.stats.WALReplayStatus.Unlock()
1608
1609	h.stats.WALReplayStatus.Current = current
1610}
1611