1// Copyright 2014 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14// Package local contains the local time series storage used by Prometheus.
15package local
16
17import (
18	"container/list"
19	"errors"
20	"fmt"
21	"math/rand"
22	"runtime"
23	"sort"
24	"sync"
25	"sync/atomic"
26	"time"
27
28	opentracing "github.com/opentracing/opentracing-go"
29	"github.com/prometheus/client_golang/prometheus"
30	"github.com/prometheus/common/log"
31	"github.com/prometheus/common/model"
32	"golang.org/x/net/context"
33
34	"github.com/prometheus/prometheus/storage/local/chunk"
35	"github.com/prometheus/prometheus/storage/metric"
36)
37
38const (
39	evictRequestsCap      = 1024
40	quarantineRequestsCap = 1024
41
42	// See waitForNextFP.
43	fpMaxSweepTime    = 6 * time.Hour
44	fpMaxWaitDuration = 10 * time.Second
45
46	// See handleEvictList. This should be clearly shorter than the usual CG
47	// interval. On the other hand, each evict check calls ReadMemStats,
48	// which involves stopping the world (at least up to Go1.8). Hence,
49	// don't just set this to a very short interval.
50	evictInterval = time.Second
51
52	// Constants to control the hysteresis of entering and leaving "rushed
53	// mode". In rushed mode, the dirty series count is ignored for
54	// checkpointing, series are maintained as frequently as possible, and
55	// series files are not synced if the adaptive sync strategy is used.
56	persintenceUrgencyScoreForEnteringRushedMode = 0.8
57	persintenceUrgencyScoreForLeavingRushedMode  = 0.7
58
59	// This factor times -storage.local.memory-chunks is the number of
60	// memory chunks we tolerate before throttling the storage. It is also a
61	// basis for calculating the persistenceUrgencyScore.
62	toleranceFactorMemChunks = 1.1
63	// This factor times -storage.local.max-chunks-to-persist is the minimum
64	// required number of chunks waiting for persistence before the number
65	// of chunks in memory may influence the persistenceUrgencyScore. (In
66	// other words: if there are no chunks to persist, it doesn't help chunk
67	// eviction if we speed up persistence.)
68	factorMinChunksToPersist = 0.2
69
70	// Threshold for when to stop using LabelMatchers to retrieve and
71	// intersect fingerprints. The rationale here is that looking up more
72	// fingerprints has diminishing returns if we already have narrowed down
73	// the possible fingerprints significantly. It is then easier to simply
74	// lookup the metrics for all the fingerprints and directly compare them
75	// to the matchers. Since a fingerprint lookup for an Equal matcher is
76	// much less expensive, there is a lower threshold for that case.
77	// TODO(beorn7): These numbers need to be tweaked, probably a bit lower.
78	// 5x higher numbers have resulted in slightly worse performance in a
79	// real-life production scenario.
80	fpEqualMatchThreshold = 1000
81	fpOtherMatchThreshold = 10000
82
83	selectorsTag = "selectors"
84	fromTag      = "from"
85	throughTag   = "through"
86	tsTag        = "ts"
87	numSeries    = "num_series"
88)
89
90type quarantineRequest struct {
91	fp     model.Fingerprint
92	metric model.Metric
93	reason error
94}
95
96// SyncStrategy is an enum to select a sync strategy for series files.
97type SyncStrategy int
98
99// String implements flag.Value.
100func (ss SyncStrategy) String() string {
101	switch ss {
102	case Adaptive:
103		return "adaptive"
104	case Always:
105		return "always"
106	case Never:
107		return "never"
108	}
109	return "<unknown>"
110}
111
112// Set implements flag.Value.
113func (ss *SyncStrategy) Set(s string) error {
114	switch s {
115	case "adaptive":
116		*ss = Adaptive
117	case "always":
118		*ss = Always
119	case "never":
120		*ss = Never
121	default:
122		return fmt.Errorf("invalid sync strategy: %s", s)
123	}
124	return nil
125}
126
127// Possible values for SyncStrategy.
128const (
129	_ SyncStrategy = iota
130	Never
131	Always
132	Adaptive
133)
134
135// A syncStrategy is a function that returns whether series files should be
136// synced or not. It does not need to be goroutine safe.
137type syncStrategy func() bool
138
139// A MemorySeriesStorage manages series in memory over time, while also
140// interfacing with a persistence layer to make time series data persistent
141// across restarts and evictable from memory.
142type MemorySeriesStorage struct {
143	// archiveHighWatermark, chunksToPersist, persistUrgency have to be aligned for atomic operations.
144	archiveHighWatermark model.Time    // No archived series has samples after this time.
145	numChunksToPersist   int64         // The number of chunks waiting for persistence.
146	persistUrgency       int32         // Persistence urgency score * 1000, int32 allows atomic operations.
147	rushed               bool          // Whether the storage is in rushed mode.
148	rushedMtx            sync.Mutex    // Protects rushed.
149	lastNumGC            uint32        // To detect if a GC cycle has run.
150	throttled            chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).
151
152	fpLocker   *fingerprintLocker
153	fpToSeries *seriesMap
154
155	options *MemorySeriesStorageOptions
156
157	loopStopping, loopStopped  chan struct{}
158	logThrottlingStopped       chan struct{}
159	targetHeapSize             uint64
160	dropAfter                  time.Duration
161	headChunkTimeout           time.Duration
162	checkpointInterval         time.Duration
163	checkpointDirtySeriesLimit int
164
165	persistence *persistence
166	mapper      *fpMapper
167
168	evictList                   *list.List
169	evictRequests               chan chunk.EvictRequest
170	evictStopping, evictStopped chan struct{}
171
172	quarantineRequests                    chan quarantineRequest
173	quarantineStopping, quarantineStopped chan struct{}
174
175	persistErrors            prometheus.Counter
176	queuedChunksToPersist    prometheus.Counter
177	chunksToPersist          prometheus.GaugeFunc
178	memorySeries             prometheus.Gauge
179	headChunks               prometheus.Gauge
180	dirtySeries              prometheus.Gauge
181	seriesOps                *prometheus.CounterVec
182	ingestedSamples          prometheus.Counter
183	discardedSamples         *prometheus.CounterVec
184	nonExistentSeriesMatches prometheus.Counter
185	memChunks                prometheus.GaugeFunc
186	maintainSeriesDuration   *prometheus.SummaryVec
187	persistenceUrgencyScore  prometheus.GaugeFunc
188	rushedMode               prometheus.GaugeFunc
189	targetHeapSizeBytes      prometheus.GaugeFunc
190}
191
192// MemorySeriesStorageOptions contains options needed by
193// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
194// values.
195type MemorySeriesStorageOptions struct {
196	TargetHeapSize             uint64        // Desired maximum heap size.
197	PersistenceStoragePath     string        // Location of persistence files.
198	PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
199	HeadChunkTimeout           time.Duration // Head chunks idle for at least that long may be closed.
200	CheckpointInterval         time.Duration // How often to checkpoint the series map and head chunks.
201	CheckpointDirtySeriesLimit int           // How many dirty series will trigger an early checkpoint.
202	Dirty                      bool          // Force the storage to consider itself dirty on startup.
203	PedanticChecks             bool          // If dirty, perform crash-recovery checks on each series file.
204	SyncStrategy               SyncStrategy  // Which sync strategy to apply to series files.
205	MinShrinkRatio             float64       // Minimum ratio a series file has to shrink during truncation.
206	NumMutexes                 int           // Number of mutexes used for stochastic fingerprint locking.
207}
208
209// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
210// has to be called to start the storage.
211func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage {
212	s := &MemorySeriesStorage{
213		fpLocker: newFingerprintLocker(o.NumMutexes),
214
215		options: o,
216
217		loopStopping:               make(chan struct{}),
218		loopStopped:                make(chan struct{}),
219		logThrottlingStopped:       make(chan struct{}),
220		throttled:                  make(chan struct{}, 1),
221		targetHeapSize:             o.TargetHeapSize,
222		dropAfter:                  o.PersistenceRetentionPeriod,
223		headChunkTimeout:           o.HeadChunkTimeout,
224		checkpointInterval:         o.CheckpointInterval,
225		checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
226		archiveHighWatermark:       model.Now().Add(-o.HeadChunkTimeout),
227
228		evictList:     list.New(),
229		evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
230		evictStopping: make(chan struct{}),
231		evictStopped:  make(chan struct{}),
232
233		quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
234		quarantineStopping: make(chan struct{}),
235		quarantineStopped:  make(chan struct{}),
236
237		persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
238			Namespace: namespace,
239			Subsystem: subsystem,
240			Name:      "persist_errors_total",
241			Help:      "The total number of errors while writing to the persistence layer.",
242		}),
243		queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
244			Namespace: namespace,
245			Subsystem: subsystem,
246			Name:      "queued_chunks_to_persist_total",
247			Help:      "The total number of chunks queued for persistence.",
248		}),
249		memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{
250			Namespace: namespace,
251			Subsystem: subsystem,
252			Name:      "memory_series",
253			Help:      "The current number of series in memory.",
254		}),
255		headChunks: prometheus.NewGauge(prometheus.GaugeOpts{
256			Namespace: namespace,
257			Subsystem: subsystem,
258			Name:      "open_head_chunks",
259			Help:      "The current number of open head chunks.",
260		}),
261		dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
262			Namespace: namespace,
263			Subsystem: subsystem,
264			Name:      "memory_dirty_series",
265			Help:      "The current number of series that would require a disk seek during crash recovery.",
266		}),
267		seriesOps: prometheus.NewCounterVec(
268			prometheus.CounterOpts{
269				Namespace: namespace,
270				Subsystem: subsystem,
271				Name:      "series_ops_total",
272				Help:      "The total number of series operations by their type.",
273			},
274			[]string{opTypeLabel},
275		),
276		ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
277			Namespace: namespace,
278			Subsystem: subsystem,
279			Name:      "ingested_samples_total",
280			Help:      "The total number of samples ingested.",
281		}),
282		discardedSamples: prometheus.NewCounterVec(
283			prometheus.CounterOpts{
284				Namespace: namespace,
285				Subsystem: subsystem,
286				Name:      "out_of_order_samples_total",
287				Help:      "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
288			},
289			[]string{discardReasonLabel},
290		),
291		nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{
292			Namespace: namespace,
293			Subsystem: subsystem,
294			Name:      "non_existent_series_matches_total",
295			Help:      "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
296		}),
297		memChunks: prometheus.NewGaugeFunc(
298			prometheus.GaugeOpts{
299				Namespace: namespace,
300				Subsystem: subsystem,
301				Name:      "memory_chunks",
302				Help:      "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).",
303			},
304			func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) },
305		),
306		maintainSeriesDuration: prometheus.NewSummaryVec(
307			prometheus.SummaryOpts{
308				Namespace: namespace,
309				Subsystem: subsystem,
310				Name:      "maintain_series_duration_seconds",
311				Help:      "The duration in seconds it took to perform maintenance on a series.",
312			},
313			[]string{seriesLocationLabel},
314		),
315	}
316
317	s.chunksToPersist = prometheus.NewGaugeFunc(
318		prometheus.GaugeOpts{
319			Namespace: namespace,
320			Subsystem: subsystem,
321			Name:      "chunks_to_persist",
322			Help:      "The current number of chunks waiting for persistence.",
323		},
324		func() float64 {
325			return float64(s.getNumChunksToPersist())
326		},
327	)
328	s.rushedMode = prometheus.NewGaugeFunc(
329		prometheus.GaugeOpts{
330			Namespace: namespace,
331			Subsystem: subsystem,
332			Name:      "rushed_mode",
333			Help:      "1 if the storage is in rushed mode, 0 otherwise.",
334		},
335		func() float64 {
336			s.rushedMtx.Lock()
337			defer s.rushedMtx.Unlock()
338			if s.rushed {
339				return 1
340			}
341			return 0
342		},
343	)
344	s.persistenceUrgencyScore = prometheus.NewGaugeFunc(
345		prometheus.GaugeOpts{
346			Namespace: namespace,
347			Subsystem: subsystem,
348			Name:      "persistence_urgency_score",
349			Help:      "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
350		},
351		func() float64 {
352			score, _ := s.getPersistenceUrgencyScore()
353			return score
354		},
355	)
356	s.targetHeapSizeBytes = prometheus.NewGaugeFunc(
357		prometheus.GaugeOpts{
358			Namespace: namespace,
359			Subsystem: subsystem,
360			Name:      "target_heap_size_bytes",
361			Help:      "The configured target heap size in bytes.",
362		},
363		func() float64 {
364			return float64(s.targetHeapSize)
365		},
366	)
367
368	// Initialize metric vectors.
369	// TODO(beorn7): Rework once we have a utility function for it in client_golang.
370	s.discardedSamples.WithLabelValues(outOfOrderTimestamp)
371	s.discardedSamples.WithLabelValues(duplicateSample)
372	s.maintainSeriesDuration.WithLabelValues(maintainInMemory)
373	s.maintainSeriesDuration.WithLabelValues(maintainArchived)
374	s.seriesOps.WithLabelValues(create)
375	s.seriesOps.WithLabelValues(archive)
376	s.seriesOps.WithLabelValues(unarchive)
377	s.seriesOps.WithLabelValues(memoryPurge)
378	s.seriesOps.WithLabelValues(archivePurge)
379	s.seriesOps.WithLabelValues(requestedPurge)
380	s.seriesOps.WithLabelValues(memoryMaintenance)
381	s.seriesOps.WithLabelValues(archiveMaintenance)
382	s.seriesOps.WithLabelValues(completedQurantine)
383	s.seriesOps.WithLabelValues(droppedQuarantine)
384	s.seriesOps.WithLabelValues(failedQuarantine)
385
386	return s
387}
388
389// Start implements Storage.
390func (s *MemorySeriesStorage) Start() (err error) {
391	var syncStrategy syncStrategy
392	switch s.options.SyncStrategy {
393	case Never:
394		syncStrategy = func() bool { return false }
395	case Always:
396		syncStrategy = func() bool { return true }
397	case Adaptive:
398		syncStrategy = func() bool {
399			_, rushed := s.getPersistenceUrgencyScore()
400			return !rushed
401		}
402	default:
403		panic("unknown sync strategy")
404	}
405
406	var p *persistence
407	p, err = newPersistence(
408		s.options.PersistenceStoragePath,
409		s.options.Dirty, s.options.PedanticChecks,
410		syncStrategy,
411		s.options.MinShrinkRatio,
412	)
413	if err != nil {
414		return err
415	}
416	s.persistence = p
417	// Persistence must start running before loadSeriesMapAndHeads() is called.
418	go s.persistence.run()
419
420	defer func() {
421		if err != nil {
422			if e := p.close(); e != nil {
423				log.Errorln("Error closing persistence:", e)
424			}
425		}
426	}()
427
428	log.Info("Loading series map and head chunks...")
429	s.fpToSeries, s.numChunksToPersist, err = p.loadSeriesMapAndHeads()
430	for _, series := range s.fpToSeries.m {
431		if !series.headChunkClosed {
432			s.headChunks.Inc()
433		}
434	}
435
436	if err != nil {
437		return err
438	}
439	log.Infof("%d series loaded.", s.fpToSeries.length())
440	s.memorySeries.Set(float64(s.fpToSeries.length()))
441
442	s.mapper, err = newFPMapper(s.fpToSeries, p)
443	if err != nil {
444		return err
445	}
446
447	go s.handleEvictList()
448	go s.handleQuarantine()
449	go s.logThrottling()
450	go s.loop()
451
452	return nil
453}
454
455// Stop implements Storage.
456func (s *MemorySeriesStorage) Stop() error {
457	log.Info("Stopping local storage...")
458
459	log.Info("Stopping maintenance loop...")
460	close(s.loopStopping)
461	<-s.loopStopped
462
463	log.Info("Stopping series quarantining...")
464	close(s.quarantineStopping)
465	<-s.quarantineStopped
466
467	log.Info("Stopping chunk eviction...")
468	close(s.evictStopping)
469	<-s.evictStopped
470
471	// One final checkpoint of the series map and the head chunks.
472	if err := s.persistence.checkpointSeriesMapAndHeads(
473		context.Background(), s.fpToSeries, s.fpLocker,
474	); err != nil {
475		return err
476	}
477	if err := s.mapper.checkpoint(); err != nil {
478		return err
479	}
480
481	if err := s.persistence.close(); err != nil {
482		return err
483	}
484	log.Info("Local storage stopped.")
485	return nil
486}
487
488type memorySeriesStorageQuerier struct {
489	*MemorySeriesStorage
490}
491
492func (memorySeriesStorageQuerier) Close() error {
493	return nil
494}
495
496// Querier implements the storage interface.
497func (s *MemorySeriesStorage) Querier() (Querier, error) {
498	return memorySeriesStorageQuerier{s}, nil
499}
500
501// WaitForIndexing implements Storage.
502func (s *MemorySeriesStorage) WaitForIndexing() {
503	s.persistence.waitForIndexing()
504}
505
506// LastSampleForLabelMatchers implements Storage.
507func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
508	mergedFPs := map[model.Fingerprint]struct{}{}
509	for _, matchers := range matcherSets {
510		fps, err := s.fpsForLabelMatchers(cutoff, model.Latest, matchers...)
511		if err != nil {
512			return nil, err
513		}
514		for fp := range fps {
515			mergedFPs[fp] = struct{}{}
516		}
517	}
518
519	res := make(model.Vector, 0, len(mergedFPs))
520	for fp := range mergedFPs {
521		s.fpLocker.Lock(fp)
522
523		series, ok := s.fpToSeries.get(fp)
524		if !ok {
525			// A series could have disappeared between resolving label matchers and here.
526			s.fpLocker.Unlock(fp)
527			continue
528		}
529		sp := series.lastSamplePair()
530		res = append(res, &model.Sample{
531			Metric:    series.metric,
532			Value:     sp.Value,
533			Timestamp: sp.Timestamp,
534		})
535		s.fpLocker.Unlock(fp)
536	}
537	return res, nil
538}
539
540// boundedIterator wraps a SeriesIterator and does not allow fetching
541// data from earlier than the configured start time.
542type boundedIterator struct {
543	it    SeriesIterator
544	start model.Time
545}
546
547// ValueAtOrBeforeTime implements the SeriesIterator interface.
548func (bit *boundedIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
549	if ts < bit.start {
550		return model.ZeroSamplePair
551	}
552	return bit.it.ValueAtOrBeforeTime(ts)
553}
554
555// RangeValues implements the SeriesIterator interface.
556func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.SamplePair {
557	if interval.NewestInclusive < bit.start {
558		return []model.SamplePair{}
559	}
560	if interval.OldestInclusive < bit.start {
561		interval.OldestInclusive = bit.start
562	}
563	return bit.it.RangeValues(interval)
564}
565
566// Metric implements SeriesIterator.
567func (bit *boundedIterator) Metric() metric.Metric {
568	return bit.it.Metric()
569}
570
571// Close implements SeriesIterator.
572func (bit *boundedIterator) Close() {
573	bit.it.Close()
574}
575
576// QueryRange implements Storage.
577func (s *MemorySeriesStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
578	span, _ := opentracing.StartSpanFromContext(ctx, "QueryRange")
579	span.SetTag(selectorsTag, metric.LabelMatchers(matchers).String())
580	span.SetTag(fromTag, int64(from))
581	span.SetTag(throughTag, int64(through))
582	defer span.Finish()
583
584	if through.Before(from) {
585		// In that case, nothing will match.
586		return nil, nil
587	}
588	fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
589	if err != nil {
590		return nil, err
591	}
592	span.SetTag(numSeries, len(fpSeriesPairs))
593	iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
594	for _, pair := range fpSeriesPairs {
595		it := s.preloadChunksForRange(pair, from, through)
596		iterators = append(iterators, it)
597	}
598	return iterators, nil
599}
600
601// QueryInstant implements Storage.
602func (s *MemorySeriesStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
603	span, _ := opentracing.StartSpanFromContext(ctx, "QueryInstant")
604	span.SetTag(selectorsTag, metric.LabelMatchers(matchers).String())
605	span.SetTag(tsTag, ts)
606	defer span.Finish()
607
608	if stalenessDelta < 0 {
609		panic("negative staleness delta")
610	}
611	from := ts.Add(-stalenessDelta)
612	through := ts
613
614	fpSeriesPairs, err := s.seriesForLabelMatchers(from, through, matchers...)
615	if err != nil {
616		return nil, err
617	}
618	iterators := make([]SeriesIterator, 0, len(fpSeriesPairs))
619	for _, pair := range fpSeriesPairs {
620		it := s.preloadChunksForInstant(pair, from, through)
621		iterators = append(iterators, it)
622	}
623	return iterators, nil
624}
625
626// fingerprintsForLabelPair returns the fingerprints with the given
627// LabelPair. If intersectWith is non-nil, the method will only return
628// fingerprints that are also contained in intersectsWith. If mergeWith is
629// non-nil, the found fingerprints are added to the given map. The returned map
630// is the same as the given one.
631func (s *MemorySeriesStorage) fingerprintsForLabelPair(
632	pair model.LabelPair,
633	mergeWith map[model.Fingerprint]struct{},
634	intersectWith map[model.Fingerprint]struct{},
635) map[model.Fingerprint]struct{} {
636	if mergeWith == nil {
637		mergeWith = map[model.Fingerprint]struct{}{}
638	}
639	for _, fp := range s.persistence.fingerprintsForLabelPair(pair) {
640		if intersectWith == nil {
641			mergeWith[fp] = struct{}{}
642			continue
643		}
644		if _, ok := intersectWith[fp]; ok {
645			mergeWith[fp] = struct{}{}
646		}
647	}
648	return mergeWith
649}
650
651// MetricsForLabelMatchers implements Storage.
652func (s *MemorySeriesStorage) MetricsForLabelMatchers(
653	_ context.Context,
654	from, through model.Time,
655	matcherSets ...metric.LabelMatchers,
656) ([]metric.Metric, error) {
657	fpToMetric := map[model.Fingerprint]metric.Metric{}
658	for _, matchers := range matcherSets {
659		metrics, err := s.metricsForLabelMatchers(from, through, matchers...)
660		if err != nil {
661			return nil, err
662		}
663		for fp, m := range metrics {
664			fpToMetric[fp] = m
665		}
666	}
667
668	metrics := make([]metric.Metric, 0, len(fpToMetric))
669	for _, m := range fpToMetric {
670		metrics = append(metrics, m)
671	}
672	return metrics, nil
673}
674
675// candidateFPsForLabelMatchers returns candidate FPs for given matchers and remaining matchers to be checked.
676func (s *MemorySeriesStorage) candidateFPsForLabelMatchers(
677	matchers ...*metric.LabelMatcher,
678) (map[model.Fingerprint]struct{}, []*metric.LabelMatcher, error) {
679	sort.Sort(metric.LabelMatchers(matchers))
680
681	if len(matchers) == 0 || matchers[0].MatchesEmptyString() {
682		// No matchers at all or even the best matcher matches the empty string.
683		return nil, nil, nil
684	}
685
686	var (
687		matcherIdx   int
688		candidateFPs map[model.Fingerprint]struct{}
689	)
690
691	// Equal matchers.
692	for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpEqualMatchThreshold); matcherIdx++ {
693		m := matchers[matcherIdx]
694		if m.Type != metric.Equal || m.MatchesEmptyString() {
695			break
696		}
697		candidateFPs = s.fingerprintsForLabelPair(
698			model.LabelPair{
699				Name:  m.Name,
700				Value: m.Value,
701			},
702			nil,
703			candidateFPs,
704		)
705		if len(candidateFPs) == 0 {
706			return nil, nil, nil
707		}
708	}
709
710	// Other matchers.
711	for ; matcherIdx < len(matchers) && (candidateFPs == nil || len(candidateFPs) > fpOtherMatchThreshold); matcherIdx++ {
712		m := matchers[matcherIdx]
713		if m.MatchesEmptyString() {
714			break
715		}
716
717		lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
718		if err != nil {
719			return nil, nil, err
720		}
721		lvs = m.Filter(lvs)
722		if len(lvs) == 0 {
723			return nil, nil, nil
724		}
725		fps := map[model.Fingerprint]struct{}{}
726		for _, lv := range lvs {
727			s.fingerprintsForLabelPair(
728				model.LabelPair{
729					Name:  m.Name,
730					Value: lv,
731				},
732				fps,
733				candidateFPs,
734			)
735		}
736		candidateFPs = fps
737		if len(candidateFPs) == 0 {
738			return nil, nil, nil
739		}
740	}
741	return candidateFPs, matchers[matcherIdx:], nil
742}
743
744func (s *MemorySeriesStorage) seriesForLabelMatchers(
745	from, through model.Time,
746	matchers ...*metric.LabelMatcher,
747) ([]fingerprintSeriesPair, error) {
748	candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
749	if err != nil {
750		return nil, err
751	}
752
753	result := []fingerprintSeriesPair{}
754FPLoop:
755	for fp := range candidateFPs {
756		s.fpLocker.Lock(fp)
757		series := s.seriesForRange(fp, from, through)
758		s.fpLocker.Unlock(fp)
759
760		if series == nil {
761			continue FPLoop
762		}
763
764		for _, m := range matchersToCheck {
765			if !m.Match(series.metric[m.Name]) {
766				continue FPLoop
767			}
768		}
769		result = append(result, fingerprintSeriesPair{fp, series})
770	}
771	return result, nil
772}
773
774func (s *MemorySeriesStorage) fpsForLabelMatchers(
775	from, through model.Time,
776	matchers ...*metric.LabelMatcher,
777) (map[model.Fingerprint]struct{}, error) {
778	candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
779	if err != nil {
780		return nil, err
781	}
782
783FPLoop:
784	for fp := range candidateFPs {
785		s.fpLocker.Lock(fp)
786		met, _, ok := s.metricForRange(fp, from, through)
787		s.fpLocker.Unlock(fp)
788
789		if !ok {
790			delete(candidateFPs, fp)
791			continue FPLoop
792		}
793
794		for _, m := range matchersToCheck {
795			if !m.Match(met[m.Name]) {
796				delete(candidateFPs, fp)
797				continue FPLoop
798			}
799		}
800	}
801	return candidateFPs, nil
802}
803
804func (s *MemorySeriesStorage) metricsForLabelMatchers(
805	from, through model.Time,
806	matchers ...*metric.LabelMatcher,
807) (map[model.Fingerprint]metric.Metric, error) {
808
809	candidateFPs, matchersToCheck, err := s.candidateFPsForLabelMatchers(matchers...)
810	if err != nil {
811		return nil, err
812	}
813
814	result := map[model.Fingerprint]metric.Metric{}
815FPLoop:
816	for fp := range candidateFPs {
817		s.fpLocker.Lock(fp)
818		met, _, ok := s.metricForRange(fp, from, through)
819		s.fpLocker.Unlock(fp)
820
821		if !ok {
822			continue FPLoop
823		}
824
825		for _, m := range matchersToCheck {
826			if !m.Match(met[m.Name]) {
827				continue FPLoop
828			}
829		}
830		result[fp] = metric.Metric{Metric: met}
831	}
832	return result, nil
833}
834
835// metricForRange returns the metric for the given fingerprint if the
836// corresponding time series has samples between 'from' and 'through', together
837// with a pointer to the series if it is in memory already. For a series that
838// does not have samples between 'from' and 'through', the returned bool is
839// false. For an archived series that does contain samples between 'from' and
840// 'through', it returns (metric, nil, true).
841//
842// The caller must have locked the fp.
843func (s *MemorySeriesStorage) metricForRange(
844	fp model.Fingerprint,
845	from, through model.Time,
846) (model.Metric, *memorySeries, bool) {
847	series, ok := s.fpToSeries.get(fp)
848	if ok {
849		if series.lastTime.Before(from) || series.firstTime().After(through) {
850			return nil, nil, false
851		}
852		return series.metric, series, true
853	}
854	// From here on, we are only concerned with archived metrics.
855	// If the high watermark of archived series is before 'from', we are done.
856	watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
857	if watermark < from {
858		return nil, nil, false
859	}
860	if from.After(model.Earliest) || through.Before(model.Latest) {
861		// The range lookup is relatively cheap, so let's do it first if
862		// we have a chance the archived metric is not in the range.
863		has, first, last := s.persistence.hasArchivedMetric(fp)
864		if !has {
865			s.nonExistentSeriesMatches.Inc()
866			return nil, nil, false
867		}
868		if first.After(through) || last.Before(from) {
869			return nil, nil, false
870		}
871	}
872
873	metric, err := s.persistence.archivedMetric(fp)
874	if err != nil {
875		// archivedMetric has already flagged the storage as dirty in this case.
876		return nil, nil, false
877	}
878	return metric, nil, true
879}
880
881// LabelValuesForLabelName implements Storage.
882func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelName model.LabelName) (model.LabelValues, error) {
883	return s.persistence.labelValuesForLabelName(labelName)
884}
885
886// DropMetricsForLabelMatchers implements Storage.
887func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) {
888	fps, err := s.fpsForLabelMatchers(model.Earliest, model.Latest, matchers...)
889	if err != nil {
890		return 0, err
891	}
892	for fp := range fps {
893		s.purgeSeries(fp, nil, nil)
894	}
895	return len(fps), nil
896}
897
898var (
899	// ErrOutOfOrderSample is returned if a sample has a timestamp before the latest
900	// timestamp in the series it is appended to.
901	ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order")
902	// ErrDuplicateSampleForTimestamp is returned if a sample has the same
903	// timestamp as the latest sample in the series it is appended to but a
904	// different value. (Appending an identical sample is a no-op and does
905	// not cause an error.)
906	ErrDuplicateSampleForTimestamp = fmt.Errorf("sample with repeated timestamp but different value")
907)
908
909// Append implements Storage.
910func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
911	for ln, lv := range sample.Metric {
912		if len(lv) == 0 {
913			delete(sample.Metric, ln)
914		}
915	}
916	rawFP := sample.Metric.FastFingerprint()
917	s.fpLocker.Lock(rawFP)
918	fp := s.mapper.mapFP(rawFP, sample.Metric)
919	defer func() {
920		s.fpLocker.Unlock(fp)
921	}() // Func wrapper because fp might change below.
922	if fp != rawFP {
923		// Switch locks.
924		s.fpLocker.Unlock(rawFP)
925		s.fpLocker.Lock(fp)
926	}
927	series, err := s.getOrCreateSeries(fp, sample.Metric)
928	if err != nil {
929		return err // getOrCreateSeries took care of quarantining already.
930	}
931
932	if sample.Timestamp == series.lastTime {
933		// Don't report "no-op appends", i.e. where timestamp and sample
934		// value are the same as for the last append, as they are a
935		// common occurrence when using client-side timestamps
936		// (e.g. Pushgateway or federation).
937		if sample.Timestamp == series.lastTime &&
938			series.lastSampleValueSet &&
939			sample.Value.Equal(series.lastSampleValue) {
940			return nil
941		}
942		s.discardedSamples.WithLabelValues(duplicateSample).Inc()
943		return ErrDuplicateSampleForTimestamp // Caused by the caller.
944	}
945	if sample.Timestamp < series.lastTime {
946		s.discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc()
947		return ErrOutOfOrderSample // Caused by the caller.
948	}
949	headChunkWasClosed := series.headChunkClosed
950	completedChunksCount, err := series.add(model.SamplePair{
951		Value:     sample.Value,
952		Timestamp: sample.Timestamp,
953	})
954	if err != nil {
955		s.quarantineSeries(fp, sample.Metric, err)
956		return err
957	}
958	if headChunkWasClosed {
959		// Appending to a series with a closed head chunk creates an
960		// additional open head chunk.
961		s.headChunks.Inc()
962	}
963	s.ingestedSamples.Inc()
964	s.incNumChunksToPersist(completedChunksCount)
965
966	return nil
967}
968
969// NeedsThrottling implements Storage.
970func (s *MemorySeriesStorage) NeedsThrottling() bool {
971	if score, _ := s.getPersistenceUrgencyScore(); score >= 1 {
972		select {
973		case s.throttled <- struct{}{}:
974		default: // Do nothing, signal already pending.
975		}
976		return true
977	}
978	return false
979}
980
981// logThrottling handles logging of throttled events and has to be started as a
982// goroutine. It stops once s.loopStopping is closed.
983//
984// Logging strategy: Whenever Throttle() is called and returns true, an signal
985// is sent to s.throttled. If that happens for the first time, an Error is
986// logged that the storage is now throttled. As long as signals continues to be
987// sent via s.throttled at least once per minute, nothing else is logged. Once
988// no signal has arrived for a minute, an Info is logged that the storage is not
989// throttled anymore. This resets things to the initial state, i.e. once a
990// signal arrives again, the Error will be logged again.
991func (s *MemorySeriesStorage) logThrottling() {
992	timer := time.NewTimer(time.Minute)
993	timer.Stop()
994
995	// Signal exit of the goroutine. Currently only needed by test code.
996	defer close(s.logThrottlingStopped)
997
998	for {
999		select {
1000		case <-s.throttled:
1001			if !timer.Stop() {
1002				select {
1003				case <-timer.C:
1004				default:
1005				}
1006				score, _ := s.getPersistenceUrgencyScore()
1007				log.
1008					With("urgencyScore", score).
1009					With("chunksToPersist", s.getNumChunksToPersist()).
1010					With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
1011					Error("Storage needs throttling. Scrapes and rule evaluations will be skipped.")
1012			}
1013			timer.Reset(time.Minute)
1014		case <-timer.C:
1015			score, _ := s.getPersistenceUrgencyScore()
1016			log.
1017				With("urgencyScore", score).
1018				With("chunksToPersist", s.getNumChunksToPersist()).
1019				With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
1020				Info("Storage does not need throttling anymore.")
1021		case <-s.loopStopping:
1022			return
1023		}
1024	}
1025}
1026
1027func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
1028	series, ok := s.fpToSeries.get(fp)
1029	if !ok {
1030		var cds []*chunk.Desc
1031		var modTime time.Time
1032		unarchived, err := s.persistence.unarchiveMetric(fp)
1033		if err != nil {
1034			log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
1035			return nil, err
1036		}
1037		if unarchived {
1038			s.seriesOps.WithLabelValues(unarchive).Inc()
1039			// We have to load chunk.Descs anyway to do anything with
1040			// the series, so let's do it right now so that we don't
1041			// end up with a series without any chunk.Descs for a
1042			// while (which is confusing as it makes the series
1043			// appear as archived or purged).
1044			cds, err = s.loadChunkDescs(fp, 0)
1045			if err == nil && len(cds) == 0 {
1046				err = fmt.Errorf("unarchived fingerprint %v (metric %v) has no chunks on disk", fp, m)
1047			}
1048			if err != nil {
1049				s.quarantineSeries(fp, m, err)
1050				return nil, err
1051			}
1052			modTime = s.persistence.seriesFileModTime(fp)
1053		} else {
1054			// This was a genuinely new series, so index the metric.
1055			s.persistence.indexMetric(fp, m)
1056			s.seriesOps.WithLabelValues(create).Inc()
1057		}
1058		series, err = newMemorySeries(m, cds, modTime)
1059		if err != nil {
1060			s.quarantineSeries(fp, m, err)
1061			return nil, err
1062		}
1063		s.fpToSeries.put(fp, series)
1064		s.memorySeries.Inc()
1065		if !series.headChunkClosed {
1066			s.headChunks.Inc()
1067		}
1068	}
1069	return series, nil
1070}
1071
1072// seriesForRange is a helper method for seriesForLabelMatchers.
1073//
1074// The caller must have locked the fp.
1075func (s *MemorySeriesStorage) seriesForRange(
1076	fp model.Fingerprint,
1077	from model.Time, through model.Time,
1078) *memorySeries {
1079	metric, series, ok := s.metricForRange(fp, from, through)
1080	if !ok {
1081		return nil
1082	}
1083	if series == nil {
1084		series, _ = s.getOrCreateSeries(fp, metric)
1085		// getOrCreateSeries took care of quarantining already, so ignore the error.
1086	}
1087	return series
1088}
1089
1090func (s *MemorySeriesStorage) preloadChunksForRange(
1091	pair fingerprintSeriesPair,
1092	from model.Time, through model.Time,
1093) SeriesIterator {
1094	fp, series := pair.fp, pair.series
1095	if series == nil {
1096		return nopIter
1097	}
1098
1099	s.fpLocker.Lock(fp)
1100	defer s.fpLocker.Unlock(fp)
1101
1102	iter, err := series.preloadChunksForRange(fp, from, through, s)
1103	if err != nil {
1104		s.quarantineSeries(fp, series.metric, err)
1105		return nopIter
1106	}
1107	return iter
1108}
1109
1110func (s *MemorySeriesStorage) preloadChunksForInstant(
1111	pair fingerprintSeriesPair,
1112	from model.Time, through model.Time,
1113) SeriesIterator {
1114	fp, series := pair.fp, pair.series
1115	if series == nil {
1116		return nopIter
1117	}
1118
1119	s.fpLocker.Lock(fp)
1120	defer s.fpLocker.Unlock(fp)
1121
1122	iter, err := series.preloadChunksForInstant(fp, from, through, s)
1123	if err != nil {
1124		s.quarantineSeries(fp, series.metric, err)
1125		return nopIter
1126	}
1127	return iter
1128}
1129
1130func (s *MemorySeriesStorage) handleEvictList() {
1131	// This ticker is supposed to tick at least once per GC cyle. Ideally,
1132	// we would handle the evict list after each finished GC cycle, but I
1133	// don't know of a way to "subscribe" to that kind of event.
1134	ticker := time.NewTicker(evictInterval)
1135
1136	for {
1137		select {
1138		case req := <-s.evictRequests:
1139			if req.Evict {
1140				req.Desc.EvictListElement = s.evictList.PushBack(req.Desc)
1141			} else {
1142				if req.Desc.EvictListElement != nil {
1143					s.evictList.Remove(req.Desc.EvictListElement)
1144					req.Desc.EvictListElement = nil
1145				}
1146			}
1147		case <-ticker.C:
1148			s.maybeEvict()
1149		case <-s.evictStopping:
1150			// Drain evictRequests forever in a goroutine to not let
1151			// requesters hang.
1152			go func() {
1153				for {
1154					<-s.evictRequests
1155				}
1156			}()
1157			ticker.Stop()
1158			log.Info("Chunk eviction stopped.")
1159			close(s.evictStopped)
1160			return
1161		}
1162	}
1163}
1164
1165// maybeEvict is a local helper method. Must only be called by handleEvictList.
1166func (s *MemorySeriesStorage) maybeEvict() {
1167	ms := runtime.MemStats{}
1168	runtime.ReadMemStats(&ms)
1169	numChunksToEvict := s.calculatePersistUrgency(&ms)
1170
1171	if numChunksToEvict <= 0 {
1172		return
1173	}
1174
1175	chunkDescsToEvict := make([]*chunk.Desc, numChunksToEvict)
1176	for i := range chunkDescsToEvict {
1177		e := s.evictList.Front()
1178		if e == nil {
1179			break
1180		}
1181		cd := e.Value.(*chunk.Desc)
1182		cd.EvictListElement = nil
1183		chunkDescsToEvict[i] = cd
1184		s.evictList.Remove(e)
1185	}
1186	// Do the actual eviction in a goroutine as we might otherwise deadlock,
1187	// in the following way: A chunk was Unpinned completely and therefore
1188	// scheduled for eviction. At the time we actually try to evict it,
1189	// another goroutine is pinning the chunk. The pinning goroutine has
1190	// currently locked the chunk and tries to send the evict request (to
1191	// remove the chunk from the evict list) to the evictRequests
1192	// channel. The send blocks because evictRequests is full. However, the
1193	// goroutine that is supposed to empty the channel is waiting for the
1194	// Chunk.Desc lock to try to evict the chunk.
1195	go func() {
1196		for _, cd := range chunkDescsToEvict {
1197			if cd == nil {
1198				break
1199			}
1200			cd.MaybeEvict()
1201			// We don't care if the eviction succeeds. If the chunk
1202			// was pinned in the meantime, it will be added to the
1203			// evict list once it gets Unpinned again.
1204		}
1205	}()
1206}
1207
1208// calculatePersistUrgency calculates and sets s.persistUrgency. Based on the
1209// calculation, it returns the number of chunks to evict. The runtime.MemStats
1210// are passed in here for testability.
1211//
1212// The persist urgency is calculated by the following formula:
1213//
1214//                      n(toPersist)           MAX( h(nextGC), h(current) )
1215//   p = MIN( 1, --------------------------- * ---------------------------- )
1216//               n(toPersist) + n(evictable)             h(target)
1217//
1218// where:
1219//
1220//    n(toPersist): Number of chunks waiting for persistence.
1221//    n(evictable): Number of evictable chunks.
1222//    h(nextGC):    Heap size at which the next GC will kick in (ms.NextGC).
1223//    h(current):   Current heap size (ms.HeapAlloc).
1224//    h(target):    Configured target heap size.
1225//
1226// Note that the actual value stored in s.persistUrgency is 1000 times the value
1227// calculated as above to allow using an int32, which supports atomic
1228// operations.
1229//
1230// If no GC has run after the last call of this method, it will always return 0
1231// (no reason to try to evict any more chunks before we have seen the effect of
1232// the previous eviction). It will also not decrease the persist urgency in this
1233// case (but it will increase the persist urgency if a higher value was calculated).
1234//
1235// If a GC has run after the last call of this method, the following cases apply:
1236//
1237// - If MAX( h(nextGC), h(current) ) < h(target), simply return 0. Nothing to
1238//   evict if the heap is still small enough.
1239//
1240// - Otherwise, if n(evictable) is 0, also return 0, but set the urgency score
1241//   to 1 to signal that we want to evict chunk but have no evictable chunks
1242//   available.
1243//
1244// - Otherwise, calculate the number of chunks to evict and return it:
1245//
1246//                                   MAX( h(nextGC), h(current) ) - h(target)
1247//   n(toEvict) = MIN( n(evictable), ---------------------------------------- )
1248//                                                        c
1249//
1250//   where c is the size of a chunk.
1251//
1252// - In the latter case, the persist urgency might be increased. The final value
1253//   is the following:
1254//
1255//            n(toEvict)
1256//   MAX( p, ------------ )
1257//           n(evictable)
1258//
1259// Broadly speaking, the persist urgency is based on the ratio of the number of
1260// chunks we want to evict and the number of chunks that are actually
1261// evictable. However, in particular for the case where we don't need to evict
1262// chunks yet, it also takes into account how close the heap has already grown
1263// to the configured target size, and how big the pool of chunks to persist is
1264// compared to the number of chunks already evictable.
1265//
1266// This is a helper method only to be called by MemorySeriesStorage.maybeEvict.
1267func (s *MemorySeriesStorage) calculatePersistUrgency(ms *runtime.MemStats) int {
1268	var (
1269		oldUrgency         = atomic.LoadInt32(&s.persistUrgency)
1270		newUrgency         int32
1271		numChunksToPersist = s.getNumChunksToPersist()
1272	)
1273	defer func() {
1274		if newUrgency > 1000 {
1275			newUrgency = 1000
1276		}
1277		atomic.StoreInt32(&s.persistUrgency, newUrgency)
1278	}()
1279
1280	// Take the NextGC as the relevant heap size because the heap will grow
1281	// to that size before GC kicks in. However, at times the current heap
1282	// is already larger than NextGC, in which case we take that worse case.
1283	heapSize := ms.NextGC
1284	if ms.HeapAlloc > ms.NextGC {
1285		heapSize = ms.HeapAlloc
1286	}
1287
1288	if numChunksToPersist > 0 {
1289		newUrgency = int32(1000 * uint64(numChunksToPersist) / uint64(numChunksToPersist+s.evictList.Len()) * heapSize / s.targetHeapSize)
1290	}
1291
1292	// Only continue if a GC has happened since we were here last time.
1293	if ms.NumGC == s.lastNumGC {
1294		if oldUrgency > newUrgency {
1295			// Never reduce urgency without a GC run.
1296			newUrgency = oldUrgency
1297		}
1298		return 0
1299	}
1300	s.lastNumGC = ms.NumGC
1301
1302	if heapSize <= s.targetHeapSize {
1303		return 0 // Heap still small enough, don't evict.
1304	}
1305	if s.evictList.Len() == 0 {
1306		// We want to reduce heap size but there is nothing to evict.
1307		newUrgency = 1000
1308		return 0
1309	}
1310	numChunksToEvict := int((heapSize - s.targetHeapSize) / chunk.ChunkLen)
1311	if numChunksToEvict > s.evictList.Len() {
1312		numChunksToEvict = s.evictList.Len()
1313	}
1314	if u := int32(numChunksToEvict * 1000 / s.evictList.Len()); u > newUrgency {
1315		newUrgency = u
1316	}
1317	return numChunksToEvict
1318}
1319
1320// waitForNextFP waits an estimated duration, after which we want to process
1321// another fingerprint so that we will process all fingerprints in a tenth of
1322// s.dropAfter assuming that the system is doing nothing else, e.g. if we want
1323// to drop chunks after 40h, we want to cycle through all fingerprints within
1324// 4h.  The estimation is based on the total number of fingerprints as passed
1325// in. However, the maximum sweep time is capped at fpMaxSweepTime. Also, the
1326// method will never wait for longer than fpMaxWaitDuration.
1327//
1328// The maxWaitDurationFactor can be used to reduce the waiting time if a faster
1329// processing is required (for example because unpersisted chunks pile up too
1330// much).
1331//
1332// Normally, the method returns true once the wait duration has passed. However,
1333// if s.loopStopped is closed, it will return false immediately.
1334func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool {
1335	d := fpMaxWaitDuration
1336	if numberOfFPs != 0 {
1337		sweepTime := s.dropAfter / 10
1338		if sweepTime > fpMaxSweepTime {
1339			sweepTime = fpMaxSweepTime
1340		}
1341		calculatedWait := time.Duration(float64(sweepTime) / float64(numberOfFPs) * maxWaitDurationFactor)
1342		if calculatedWait < d {
1343			d = calculatedWait
1344		}
1345	}
1346	if d == 0 {
1347		return true
1348	}
1349	t := time.NewTimer(d)
1350	select {
1351	case <-t.C:
1352		return true
1353	case <-s.loopStopping:
1354		return false
1355	}
1356}
1357
1358// cycleThroughMemoryFingerprints returns a channel that emits fingerprints for
1359// series in memory in a throttled fashion. It continues to cycle through all
1360// fingerprints in memory until s.loopStopping is closed.
1361func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
1362	memoryFingerprints := make(chan model.Fingerprint)
1363	go func() {
1364		defer close(memoryFingerprints)
1365		firstPass := true
1366
1367		for {
1368			// Initial wait, also important if there are no FPs yet.
1369			if !s.waitForNextFP(s.fpToSeries.length(), 1) {
1370				return
1371			}
1372			begin := time.Now()
1373			fps := s.fpToSeries.sortedFPs()
1374			if firstPass && len(fps) > 0 {
1375				// Start first pass at a random location in the
1376				// key space to cover the whole key space even
1377				// in the case of frequent restarts.
1378				fps = fps[rand.Intn(len(fps)):]
1379			}
1380			count := 0
1381			for _, fp := range fps {
1382				select {
1383				case memoryFingerprints <- fp:
1384				case <-s.loopStopping:
1385					return
1386				}
1387				// Reduce the wait time according to the urgency score.
1388				score, rushed := s.getPersistenceUrgencyScore()
1389				if rushed {
1390					score = 1
1391				}
1392				s.waitForNextFP(s.fpToSeries.length(), 1-score)
1393				count++
1394			}
1395			if count > 0 {
1396				msg := "full"
1397				if firstPass {
1398					msg = "initial partial"
1399				}
1400				log.Infof(
1401					"Completed %s maintenance sweep through %d in-memory fingerprints in %v.",
1402					msg, count, time.Since(begin),
1403				)
1404			}
1405			firstPass = false
1406		}
1407	}()
1408
1409	return memoryFingerprints
1410}
1411
1412// cycleThroughArchivedFingerprints returns a channel that emits fingerprints
1413// for archived series in a throttled fashion. It continues to cycle through all
1414// archived fingerprints until s.loopStopping is closed.
1415func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint {
1416	archivedFingerprints := make(chan model.Fingerprint)
1417	go func() {
1418		defer close(archivedFingerprints)
1419
1420		for {
1421			archivedFPs, err := s.persistence.fingerprintsModifiedBefore(
1422				model.Now().Add(-s.dropAfter),
1423			)
1424			if err != nil {
1425				log.Error("Failed to lookup archived fingerprint ranges: ", err)
1426				s.waitForNextFP(0, 1)
1427				continue
1428			}
1429			// Initial wait, also important if there are no FPs yet.
1430			if !s.waitForNextFP(len(archivedFPs), 1) {
1431				return
1432			}
1433			begin := time.Now()
1434			for _, fp := range archivedFPs {
1435				select {
1436				case archivedFingerprints <- fp:
1437				case <-s.loopStopping:
1438					return
1439				}
1440				// Never speed up maintenance of archived FPs.
1441				s.waitForNextFP(len(archivedFPs), 1)
1442			}
1443			if len(archivedFPs) > 0 {
1444				log.Infof(
1445					"Completed maintenance sweep through %d archived fingerprints in %v.",
1446					len(archivedFPs), time.Since(begin),
1447				)
1448			}
1449		}
1450	}()
1451	return archivedFingerprints
1452}
1453
1454func (s *MemorySeriesStorage) loop() {
1455	checkpointTimer := time.NewTimer(s.checkpointInterval)
1456	checkpointMinTimer := time.NewTimer(0)
1457
1458	var dirtySeriesCount int64
1459
1460	defer func() {
1461		checkpointTimer.Stop()
1462		checkpointMinTimer.Stop()
1463		log.Info("Maintenance loop stopped.")
1464		close(s.loopStopped)
1465	}()
1466
1467	memoryFingerprints := s.cycleThroughMemoryFingerprints()
1468	archivedFingerprints := s.cycleThroughArchivedFingerprints()
1469
1470	checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
1471	checkpointNow := make(chan struct{}, 1)
1472
1473	doCheckpoint := func() time.Duration {
1474		start := time.Now()
1475		// We clear this before the checkpoint so that dirtySeriesCount
1476		// is an upper bound.
1477		atomic.StoreInt64(&dirtySeriesCount, 0)
1478		s.dirtySeries.Set(0)
1479		select {
1480		case <-checkpointNow:
1481			// Signal cleared.
1482		default:
1483			// No signal pending.
1484		}
1485		err := s.persistence.checkpointSeriesMapAndHeads(
1486			checkpointCtx, s.fpToSeries, s.fpLocker,
1487		)
1488		if err == context.Canceled {
1489			log.Info("Checkpoint canceled.")
1490		} else if err != nil {
1491			s.persistErrors.Inc()
1492			log.Errorln("Error while checkpointing:", err)
1493		}
1494		return time.Since(start)
1495	}
1496
1497	// Checkpoints can happen concurrently with maintenance so even with heavy
1498	// checkpointing there will still be sufficient progress on maintenance.
1499	checkpointLoopStopped := make(chan struct{})
1500	go func() {
1501		for {
1502			select {
1503			case <-checkpointCtx.Done():
1504				checkpointLoopStopped <- struct{}{}
1505				return
1506			case <-checkpointMinTimer.C:
1507				var took time.Duration
1508				select {
1509				case <-checkpointCtx.Done():
1510					checkpointLoopStopped <- struct{}{}
1511					return
1512				case <-checkpointTimer.C:
1513					took = doCheckpoint()
1514				case <-checkpointNow:
1515					if !checkpointTimer.Stop() {
1516						<-checkpointTimer.C
1517					}
1518					took = doCheckpoint()
1519				}
1520				checkpointMinTimer.Reset(took)
1521				checkpointTimer.Reset(s.checkpointInterval)
1522			}
1523		}
1524	}()
1525
1526loop:
1527	for {
1528		select {
1529		case <-s.loopStopping:
1530			checkpointCancel()
1531			break loop
1532		case fp := <-memoryFingerprints:
1533			if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
1534				dirty := atomic.AddInt64(&dirtySeriesCount, 1)
1535				s.dirtySeries.Set(float64(dirty))
1536				// Check if we have enough "dirty" series so that we need an early checkpoint.
1537				// However, if we are already behind persisting chunks, creating a checkpoint
1538				// would be counterproductive, as it would slow down chunk persisting even more,
1539				// while in a situation like that, where we are clearly lacking speed of disk
1540				// maintenance, the best we can do for crash recovery is to persist chunks as
1541				// quickly as possible. So only checkpoint if we are not in rushed mode.
1542				if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
1543					dirty >= int64(s.checkpointDirtySeriesLimit) {
1544					select {
1545					case checkpointNow <- struct{}{}:
1546						// Signal sent.
1547					default:
1548						// Signal already pending.
1549					}
1550				}
1551			}
1552		case fp := <-archivedFingerprints:
1553			s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter))
1554		}
1555	}
1556	// Wait until both channels are closed.
1557	for range memoryFingerprints {
1558	}
1559	for range archivedFingerprints {
1560	}
1561	<-checkpointLoopStopped
1562}
1563
1564// maintainMemorySeries maintains a series that is in memory (i.e. not
1565// archived). It returns true if the method has changed from clean to dirty
1566// (i.e. it is inconsistent with the latest checkpoint now so that in case of a
1567// crash a recovery operation that requires a disk seek needed to be applied).
1568//
1569// The method first closes the head chunk if it was not touched for the duration
1570// of headChunkTimeout.
1571//
1572// Then it determines the chunks that need to be purged and the chunks that need
1573// to be persisted. Depending on the result, it does the following:
1574//
1575// - If all chunks of a series need to be purged, the whole series is deleted
1576// for good and the method returns false. (Detecting non-existence of a series
1577// file does not require a disk seek.)
1578//
1579// - If any chunks need to be purged (but not all of them), it purges those
1580// chunks from memory and rewrites the series file on disk, leaving out the
1581// purged chunks and appending all chunks not yet persisted (with the exception
1582// of a still open head chunk).
1583//
1584// - If no chunks on disk need to be purged, but chunks need to be persisted,
1585// those chunks are simply appended to the existing series file (or the file is
1586// created if it does not exist yet).
1587//
1588// - If no chunks need to be purged and no chunks need to be persisted, nothing
1589// happens in this step.
1590//
1591// Next, the method checks if all chunks in the series are evicted. In that
1592// case, it archives the series and returns true.
1593//
1594// Finally, it evicts chunk.Descs if there are too many.
1595func (s *MemorySeriesStorage) maintainMemorySeries(
1596	fp model.Fingerprint, beforeTime model.Time,
1597) (becameDirty bool) {
1598	defer func(begin time.Time) {
1599		s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(
1600			time.Since(begin).Seconds(),
1601		)
1602	}(time.Now())
1603
1604	s.fpLocker.Lock(fp)
1605	defer s.fpLocker.Unlock(fp)
1606
1607	series, ok := s.fpToSeries.get(fp)
1608	if !ok {
1609		// Series is actually not in memory, perhaps archived or dropped in the meantime.
1610		return false
1611	}
1612
1613	defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
1614
1615	closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout)
1616	if err != nil {
1617		s.quarantineSeries(fp, series.metric, err)
1618		s.persistErrors.Inc()
1619	}
1620	if closed {
1621		s.incNumChunksToPersist(1)
1622		s.headChunks.Dec()
1623	}
1624
1625	seriesWasDirty := series.dirty
1626
1627	if s.writeMemorySeries(fp, series, beforeTime) {
1628		// Series is gone now, we are done.
1629		return false
1630	}
1631
1632	iOldestNotEvicted := -1
1633	for i, cd := range series.chunkDescs {
1634		if !cd.IsEvicted() {
1635			iOldestNotEvicted = i
1636			break
1637		}
1638	}
1639
1640	// Archive if all chunks are evicted. Also make sure the last sample has
1641	// an age of at least headChunkTimeout (which is very likely anyway).
1642	if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout {
1643		s.fpToSeries.del(fp)
1644		s.memorySeries.Dec()
1645		s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
1646		s.seriesOps.WithLabelValues(archive).Inc()
1647		oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
1648		if oldWatermark < int64(series.lastTime) {
1649			if !atomic.CompareAndSwapInt64(
1650				(*int64)(&s.archiveHighWatermark),
1651				oldWatermark, int64(series.lastTime),
1652			) {
1653				panic("s.archiveHighWatermark modified outside of maintainMemorySeries")
1654			}
1655		}
1656		return
1657	}
1658	// If we are here, the series is not archived, so check for chunk.Desc
1659	// eviction next.
1660	series.evictChunkDescs(iOldestNotEvicted)
1661
1662	return series.dirty && !seriesWasDirty
1663}
1664
1665// writeMemorySeries (re-)writes a memory series file. While doing so, it drops
1666// chunks older than beforeTime from both the series file (if it exists) as well
1667// as from memory. The provided chunksToPersist are appended to the newly
1668// written series file. If no chunks need to be purged, but chunksToPersist is
1669// not empty, those chunks are simply appended to the series file. If the series
1670// contains no chunks after dropping old chunks, it is purged entirely. In that
1671// case, the method returns true.
1672//
1673// If a persist error is encountered, the series is queued for quarantine. In
1674// that case, the method returns true, too, because the series should not be
1675// processed anymore (even if it will only be gone for real once quarantining
1676// has been completed).
1677//
1678// The caller must have locked the fp.
1679func (s *MemorySeriesStorage) writeMemorySeries(
1680	fp model.Fingerprint, series *memorySeries, beforeTime model.Time,
1681) bool {
1682	var (
1683		persistErr error
1684		cds        = series.chunksToPersist()
1685	)
1686
1687	defer func() {
1688		if persistErr != nil {
1689			s.quarantineSeries(fp, series.metric, persistErr)
1690			s.persistErrors.Inc()
1691		}
1692		// The following is done even in case of an error to ensure
1693		// correct counter bookkeeping and to not pin chunks in memory
1694		// that belong to a series that is scheduled for quarantine
1695		// anyway.
1696		for _, cd := range cds {
1697			cd.Unpin(s.evictRequests)
1698		}
1699		s.incNumChunksToPersist(-len(cds))
1700		chunk.Ops.WithLabelValues(chunk.PersistAndUnpin).Add(float64(len(cds)))
1701		series.modTime = s.persistence.seriesFileModTime(fp)
1702	}()
1703
1704	// Get the actual chunks from underneath the chunk.Descs.
1705	// No lock required as chunks still to persist cannot be evicted.
1706	chunks := make([]chunk.Chunk, len(cds))
1707	for i, cd := range cds {
1708		chunks[i] = cd.C
1709	}
1710
1711	if !series.firstTime().Before(beforeTime) {
1712		// Oldest sample not old enough, just append chunks, if any.
1713		if len(cds) == 0 {
1714			return false
1715		}
1716		var offset int
1717		offset, persistErr = s.persistence.persistChunks(fp, chunks)
1718		if persistErr != nil {
1719			return true
1720		}
1721		if series.chunkDescsOffset == -1 {
1722			// This is the first chunk persisted for a newly created
1723			// series that had prior chunks on disk. Finally, we can
1724			// set the chunkDescsOffset.
1725			series.chunkDescsOffset = offset
1726		}
1727		return false
1728	}
1729
1730	newFirstTime, offset, numDroppedFromPersistence, allDroppedFromPersistence, persistErr :=
1731		s.persistence.dropAndPersistChunks(fp, beforeTime, chunks)
1732	if persistErr != nil {
1733		return true
1734	}
1735	if persistErr = series.dropChunks(beforeTime); persistErr != nil {
1736		return true
1737	}
1738	if len(series.chunkDescs) == 0 && allDroppedFromPersistence {
1739		// All chunks dropped from both memory and persistence. Delete the series for good.
1740		s.fpToSeries.del(fp)
1741		s.memorySeries.Dec()
1742		s.seriesOps.WithLabelValues(memoryPurge).Inc()
1743		s.persistence.unindexMetric(fp, series.metric)
1744		return true
1745	}
1746	series.savedFirstTime = newFirstTime
1747	if series.chunkDescsOffset == -1 {
1748		series.chunkDescsOffset = offset
1749	} else {
1750		series.chunkDescsOffset -= numDroppedFromPersistence
1751		if series.chunkDescsOffset < 0 {
1752			persistErr = errors.New("dropped more chunks from persistence than from memory")
1753			series.chunkDescsOffset = 0
1754			return true
1755		}
1756	}
1757	return false
1758}
1759
1760// maintainArchivedSeries drops chunks older than beforeTime from an archived
1761// series. If the series contains no chunks after that, it is purged entirely.
1762func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) {
1763	defer func(begin time.Time) {
1764		s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe(
1765			time.Since(begin).Seconds(),
1766		)
1767	}(time.Now())
1768
1769	s.fpLocker.Lock(fp)
1770	defer s.fpLocker.Unlock(fp)
1771
1772	has, firstTime, lastTime := s.persistence.hasArchivedMetric(fp)
1773	if !has || !firstTime.Before(beforeTime) {
1774		// Oldest sample not old enough, or metric purged or unarchived in the meantime.
1775		return
1776	}
1777
1778	defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc()
1779
1780	newFirstTime, _, _, allDropped, err := s.persistence.dropAndPersistChunks(fp, beforeTime, nil)
1781	if err != nil {
1782		// TODO(beorn7): Should quarantine the series.
1783		s.persistErrors.Inc()
1784		log.Error("Error dropping persisted chunks: ", err)
1785	}
1786	if allDropped {
1787		if err := s.persistence.purgeArchivedMetric(fp); err != nil {
1788			s.persistErrors.Inc()
1789			// purgeArchivedMetric logs the error already.
1790		}
1791		s.seriesOps.WithLabelValues(archivePurge).Inc()
1792		return
1793	}
1794	if err := s.persistence.updateArchivedTimeRange(fp, newFirstTime, lastTime); err != nil {
1795		s.persistErrors.Inc()
1796		log.Errorf("Error updating archived time range for fingerprint %v: %s", fp, err)
1797	}
1798}
1799
1800// See persistence.loadChunks for detailed explanation.
1801func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
1802	return s.persistence.loadChunks(fp, indexes, indexOffset)
1803}
1804
1805// See persistence.loadChunkDescs for detailed explanation.
1806func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunk.Desc, error) {
1807	return s.persistence.loadChunkDescs(fp, offsetFromEnd)
1808}
1809
1810// getNumChunksToPersist returns chunksToPersist in a goroutine-safe way.
1811func (s *MemorySeriesStorage) getNumChunksToPersist() int {
1812	return int(atomic.LoadInt64(&s.numChunksToPersist))
1813}
1814
1815// incNumChunksToPersist increments chunksToPersist in a goroutine-safe way. Use a
1816// negative 'by' to decrement.
1817func (s *MemorySeriesStorage) incNumChunksToPersist(by int) {
1818	atomic.AddInt64(&s.numChunksToPersist, int64(by))
1819	if by > 0 {
1820		s.queuedChunksToPersist.Add(float64(by))
1821	}
1822}
1823
1824// getPersistenceUrgencyScore returns an urgency score for the speed of
1825// persisting chunks. The score is between 0 and 1, where 0 means no urgency at
1826// all and 1 means highest urgency. It also returns if the storage is in
1827// "rushed mode".
1828//
1829// The storage enters "rushed mode" if the score exceeds
1830// persintenceUrgencyScoreForEnteringRushedMode at the time this method is
1831// called. It will leave "rushed mode" if, at a later time this method is
1832// called, the score is below persintenceUrgencyScoreForLeavingRushedMode.
1833// "Rushed mode" plays a role for the adaptive series-sync-strategy. It also
1834// switches off early checkpointing (due to dirty series), and it makes series
1835// maintenance happen as quickly as possible.
1836//
1837// A score of 1 will trigger throttling of sample ingestion.
1838//
1839// It is safe to call this method concurrently.
1840func (s *MemorySeriesStorage) getPersistenceUrgencyScore() (float64, bool) {
1841	s.rushedMtx.Lock()
1842	defer s.rushedMtx.Unlock()
1843
1844	score := float64(atomic.LoadInt32(&s.persistUrgency)) / 1000
1845	if score > 1 {
1846		score = 1
1847	}
1848
1849	if s.rushed {
1850		// We are already in rushed mode. If the score is still above
1851		// persintenceUrgencyScoreForLeavingRushedMode, return the score
1852		// and leave things as they are.
1853		if score > persintenceUrgencyScoreForLeavingRushedMode {
1854			return score, true
1855		}
1856		// We are out of rushed mode!
1857		s.rushed = false
1858		log.
1859			With("urgencyScore", score).
1860			With("chunksToPersist", s.getNumChunksToPersist()).
1861			With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
1862			Info("Storage has left rushed mode.")
1863		return score, false
1864	}
1865	if score > persintenceUrgencyScoreForEnteringRushedMode {
1866		// Enter rushed mode.
1867		s.rushed = true
1868		log.
1869			With("urgencyScore", score).
1870			With("chunksToPersist", s.getNumChunksToPersist()).
1871			With("memoryChunks", atomic.LoadInt64(&chunk.NumMemChunks)).
1872			Warn("Storage has entered rushed mode.")
1873	}
1874	return score, s.rushed
1875}
1876
1877// quarantineSeries registers the provided fingerprint for quarantining. It
1878// always returns immediately. Quarantine requests are processed
1879// asynchronously. If there are too many requests queued, they are simply
1880// dropped.
1881//
1882// Quarantining means that the series file is moved to the orphaned directory,
1883// and all its traces are removed from indices. Call this method if an
1884// unrecoverable error is detected while dealing with a series, and pass in the
1885// encountered error. It will be saved as a hint in the orphaned directory.
1886func (s *MemorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) {
1887	req := quarantineRequest{fp: fp, metric: metric, reason: err}
1888	select {
1889	case s.quarantineRequests <- req:
1890		// Request submitted.
1891	default:
1892		log.
1893			With("fingerprint", fp).
1894			With("metric", metric).
1895			With("reason", err).
1896			Warn("Quarantine queue full. Dropped quarantine request.")
1897		s.seriesOps.WithLabelValues(droppedQuarantine).Inc()
1898	}
1899}
1900
1901func (s *MemorySeriesStorage) handleQuarantine() {
1902	for {
1903		select {
1904		case req := <-s.quarantineRequests:
1905			s.purgeSeries(req.fp, req.metric, req.reason)
1906			log.
1907				With("fingerprint", req.fp).
1908				With("metric", req.metric).
1909				With("reason", req.reason).
1910				Warn("Series quarantined.")
1911		case <-s.quarantineStopping:
1912			log.Info("Series quarantining stopped.")
1913			close(s.quarantineStopped)
1914			return
1915		}
1916	}
1917
1918}
1919
1920// purgeSeries removes all traces of a series. If a non-nil quarantine reason is
1921// provided, the series file will not be deleted completely, but moved to the
1922// orphaned directory with the reason and the metric in a hint file. The
1923// provided metric might be nil if unknown.
1924func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) {
1925	s.fpLocker.Lock(fp)
1926
1927	var (
1928		series *memorySeries
1929		ok     bool
1930	)
1931
1932	if series, ok = s.fpToSeries.get(fp); ok {
1933		s.fpToSeries.del(fp)
1934		s.memorySeries.Dec()
1935		m = series.metric
1936
1937		// Adjust s.chunksToPersist and chunk.NumMemChunks down by
1938		// the number of chunks in this series that are not
1939		// persisted yet. Persisted chunks will be deducted from
1940		// chunk.NumMemChunks upon eviction.
1941		numChunksNotYetPersisted := len(series.chunkDescs) - series.persistWatermark
1942		atomic.AddInt64(&chunk.NumMemChunks, int64(-numChunksNotYetPersisted))
1943		if !series.headChunkClosed {
1944			// Head chunk wasn't counted as waiting for persistence yet.
1945			// (But it was counted as a chunk in memory.)
1946			numChunksNotYetPersisted--
1947		}
1948		s.incNumChunksToPersist(-numChunksNotYetPersisted)
1949
1950	} else {
1951		s.persistence.purgeArchivedMetric(fp) // Ignoring error. There is nothing we can do.
1952	}
1953	if m != nil {
1954		// If we know a metric now, unindex it in any case.
1955		// purgeArchivedMetric might have done so already, but we cannot
1956		// be sure. Unindexing in idempotent, though.
1957		s.persistence.unindexMetric(fp, m)
1958	}
1959	// Attempt to delete/quarantine the series file in any case.
1960	if quarantineReason == nil {
1961		// No reason stated, simply delete the file.
1962		if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
1963			log.
1964				With("fingerprint", fp).
1965				With("metric", m).
1966				With("error", err).
1967				Error("Error deleting series file.")
1968		}
1969		s.seriesOps.WithLabelValues(requestedPurge).Inc()
1970	} else {
1971		if err := s.persistence.quarantineSeriesFile(fp, quarantineReason, m); err == nil {
1972			s.seriesOps.WithLabelValues(completedQurantine).Inc()
1973		} else {
1974			s.seriesOps.WithLabelValues(failedQuarantine).Inc()
1975			log.
1976				With("fingerprint", fp).
1977				With("metric", m).
1978				With("reason", quarantineReason).
1979				With("error", err).
1980				Error("Error quarantining series file.")
1981		}
1982	}
1983
1984	s.fpLocker.Unlock(fp)
1985}
1986
1987// Describe implements prometheus.Collector.
1988func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
1989	s.persistence.Describe(ch)
1990	s.mapper.Describe(ch)
1991
1992	ch <- s.persistErrors.Desc()
1993	ch <- s.queuedChunksToPersist.Desc()
1994	ch <- s.chunksToPersist.Desc()
1995	ch <- s.memorySeries.Desc()
1996	ch <- s.headChunks.Desc()
1997	ch <- s.dirtySeries.Desc()
1998	s.seriesOps.Describe(ch)
1999	ch <- s.ingestedSamples.Desc()
2000	s.discardedSamples.Describe(ch)
2001	ch <- s.nonExistentSeriesMatches.Desc()
2002	ch <- s.memChunks.Desc()
2003	s.maintainSeriesDuration.Describe(ch)
2004	ch <- s.persistenceUrgencyScore.Desc()
2005	ch <- s.rushedMode.Desc()
2006	ch <- s.targetHeapSizeBytes.Desc()
2007}
2008
2009// Collect implements prometheus.Collector.
2010func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
2011	s.persistence.Collect(ch)
2012	s.mapper.Collect(ch)
2013
2014	ch <- s.persistErrors
2015	ch <- s.queuedChunksToPersist
2016	ch <- s.chunksToPersist
2017	ch <- s.memorySeries
2018	ch <- s.headChunks
2019	ch <- s.dirtySeries
2020	s.seriesOps.Collect(ch)
2021	ch <- s.ingestedSamples
2022	s.discardedSamples.Collect(ch)
2023	ch <- s.nonExistentSeriesMatches
2024	ch <- s.memChunks
2025	s.maintainSeriesDuration.Collect(ch)
2026	ch <- s.persistenceUrgencyScore
2027	ch <- s.rushedMode
2028	ch <- s.targetHeapSizeBytes
2029}
2030