1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"context"
18	"fmt"
19	"io"
20	"math"
21	"math/rand"
22	"os"
23	"path/filepath"
24	"sort"
25	"time"
26
27	"github.com/go-kit/kit/log"
28	"github.com/go-kit/kit/log/level"
29	"github.com/oklog/ulid"
30	"github.com/pkg/errors"
31	"github.com/prometheus/client_golang/prometheus"
32	"github.com/prometheus/prometheus/pkg/labels"
33	"github.com/prometheus/prometheus/tsdb/chunkenc"
34	"github.com/prometheus/prometheus/tsdb/chunks"
35	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
36	"github.com/prometheus/prometheus/tsdb/fileutil"
37	"github.com/prometheus/prometheus/tsdb/index"
38	"github.com/prometheus/prometheus/tsdb/tombstones"
39)
40
41// ExponentialBlockRanges returns the time ranges based on the stepSize.
42func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
43	ranges := make([]int64, 0, steps)
44	curRange := minSize
45	for i := 0; i < steps; i++ {
46		ranges = append(ranges, curRange)
47		curRange = curRange * int64(stepSize)
48	}
49
50	return ranges
51}
52
53// Compactor provides compaction against an underlying storage
54// of time series data.
55type Compactor interface {
56	// Plan returns a set of directories that can be compacted concurrently.
57	// The directories can be overlapping.
58	// Results returned when compactions are in progress are undefined.
59	Plan(dir string) ([]string, error)
60
61	// Write persists a Block into a directory.
62	// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
63	Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
64
65	// Compact runs compaction against the provided directories. Must
66	// only be called concurrently with results of Plan().
67	// Can optionally pass a list of already open blocks,
68	// to avoid having to reopen them.
69	// When resulting Block has 0 samples
70	//  * No block is written.
71	//  * The source dirs are marked Deletable.
72	//  * Returns empty ulid.ULID{}.
73	Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
74}
75
76// LeveledCompactor implements the Compactor interface.
77type LeveledCompactor struct {
78	metrics   *compactorMetrics
79	logger    log.Logger
80	ranges    []int64
81	chunkPool chunkenc.Pool
82	ctx       context.Context
83}
84
85type compactorMetrics struct {
86	ran               prometheus.Counter
87	populatingBlocks  prometheus.Gauge
88	overlappingBlocks prometheus.Counter
89	duration          prometheus.Histogram
90	chunkSize         prometheus.Histogram
91	chunkSamples      prometheus.Histogram
92	chunkRange        prometheus.Histogram
93}
94
95func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
96	m := &compactorMetrics{}
97
98	m.ran = prometheus.NewCounter(prometheus.CounterOpts{
99		Name: "prometheus_tsdb_compactions_total",
100		Help: "Total number of compactions that were executed for the partition.",
101	})
102	m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
103		Name: "prometheus_tsdb_compaction_populating_block",
104		Help: "Set to 1 when a block is currently being written to the disk.",
105	})
106	m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
107		Name: "prometheus_tsdb_vertical_compactions_total",
108		Help: "Total number of compactions done on overlapping blocks.",
109	})
110	m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
111		Name:    "prometheus_tsdb_compaction_duration_seconds",
112		Help:    "Duration of compaction runs",
113		Buckets: prometheus.ExponentialBuckets(1, 2, 10),
114	})
115	m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
116		Name:    "prometheus_tsdb_compaction_chunk_size_bytes",
117		Help:    "Final size of chunks on their first compaction",
118		Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
119	})
120	m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
121		Name:    "prometheus_tsdb_compaction_chunk_samples",
122		Help:    "Final number of samples on their first compaction",
123		Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
124	})
125	m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
126		Name:    "prometheus_tsdb_compaction_chunk_range_seconds",
127		Help:    "Final time range of chunks on their first compaction",
128		Buckets: prometheus.ExponentialBuckets(100, 4, 10),
129	})
130
131	if r != nil {
132		r.MustRegister(
133			m.ran,
134			m.populatingBlocks,
135			m.overlappingBlocks,
136			m.duration,
137			m.chunkRange,
138			m.chunkSamples,
139			m.chunkSize,
140		)
141	}
142	return m
143}
144
145// NewLeveledCompactor returns a LeveledCompactor.
146func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
147	if len(ranges) == 0 {
148		return nil, errors.Errorf("at least one range must be provided")
149	}
150	if pool == nil {
151		pool = chunkenc.NewPool()
152	}
153	if l == nil {
154		l = log.NewNopLogger()
155	}
156	return &LeveledCompactor{
157		ranges:    ranges,
158		chunkPool: pool,
159		logger:    l,
160		metrics:   newCompactorMetrics(r),
161		ctx:       ctx,
162	}, nil
163}
164
165type dirMeta struct {
166	dir  string
167	meta *BlockMeta
168}
169
170// Plan returns a list of compactable blocks in the provided directory.
171func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
172	dirs, err := blockDirs(dir)
173	if err != nil {
174		return nil, err
175	}
176	if len(dirs) < 1 {
177		return nil, nil
178	}
179
180	var dms []dirMeta
181	for _, dir := range dirs {
182		meta, _, err := readMetaFile(dir)
183		if err != nil {
184			return nil, err
185		}
186		dms = append(dms, dirMeta{dir, meta})
187	}
188	return c.plan(dms)
189}
190
191func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
192	sort.Slice(dms, func(i, j int) bool {
193		return dms[i].meta.MinTime < dms[j].meta.MinTime
194	})
195
196	res := c.selectOverlappingDirs(dms)
197	if len(res) > 0 {
198		return res, nil
199	}
200	// No overlapping blocks, do compaction the usual way.
201	// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
202	// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
203	dms = dms[:len(dms)-1]
204
205	for _, dm := range c.selectDirs(dms) {
206		res = append(res, dm.dir)
207	}
208	if len(res) > 0 {
209		return res, nil
210	}
211
212	// Compact any blocks with big enough time range that have >5% tombstones.
213	for i := len(dms) - 1; i >= 0; i-- {
214		meta := dms[i].meta
215		if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
216			break
217		}
218		if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
219			return []string{dms[i].dir}, nil
220		}
221	}
222
223	return nil, nil
224}
225
226// selectDirs returns the dir metas that should be compacted into a single new block.
227// If only a single block range is configured, the result is always nil.
228func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
229	if len(c.ranges) < 2 || len(ds) < 1 {
230		return nil
231	}
232
233	highTime := ds[len(ds)-1].meta.MinTime
234
235	for _, iv := range c.ranges[1:] {
236		parts := splitByRange(ds, iv)
237		if len(parts) == 0 {
238			continue
239		}
240
241	Outer:
242		for _, p := range parts {
243			// Do not select the range if it has a block whose compaction failed.
244			for _, dm := range p {
245				if dm.meta.Compaction.Failed {
246					continue Outer
247				}
248			}
249
250			mint := p[0].meta.MinTime
251			maxt := p[len(p)-1].meta.MaxTime
252			// Pick the range of blocks if it spans the full range (potentially with gaps)
253			// or is before the most recent block.
254			// This ensures we don't compact blocks prematurely when another one of the same
255			// size still fits in the range.
256			if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
257				return p
258			}
259		}
260	}
261
262	return nil
263}
264
265// selectOverlappingDirs returns all dirs with overlapping time ranges.
266// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
267func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
268	if len(ds) < 2 {
269		return nil
270	}
271	var overlappingDirs []string
272	globalMaxt := ds[0].meta.MaxTime
273	for i, d := range ds[1:] {
274		if d.meta.MinTime < globalMaxt {
275			if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
276				overlappingDirs = append(overlappingDirs, ds[i].dir)
277			}
278			overlappingDirs = append(overlappingDirs, d.dir)
279		} else if len(overlappingDirs) > 0 {
280			break
281		}
282		if d.meta.MaxTime > globalMaxt {
283			globalMaxt = d.meta.MaxTime
284		}
285	}
286	return overlappingDirs
287}
288
289// splitByRange splits the directories by the time range. The range sequence starts at 0.
290//
291// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
292// it returns [0-10, 10-20], [50-60], [90-100].
293func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
294	var splitDirs [][]dirMeta
295
296	for i := 0; i < len(ds); {
297		var (
298			group []dirMeta
299			t0    int64
300			m     = ds[i].meta
301		)
302		// Compute start of aligned time range of size tr closest to the current block's start.
303		if m.MinTime >= 0 {
304			t0 = tr * (m.MinTime / tr)
305		} else {
306			t0 = tr * ((m.MinTime - tr + 1) / tr)
307		}
308		// Skip blocks that don't fall into the range. This can happen via mis-alignment or
309		// by being the multiple of the intended range.
310		if m.MaxTime > t0+tr {
311			i++
312			continue
313		}
314
315		// Add all dirs to the current group that are within [t0, t0+tr].
316		for ; i < len(ds); i++ {
317			// Either the block falls into the next range or doesn't fit at all (checked above).
318			if ds[i].meta.MaxTime > t0+tr {
319				break
320			}
321			group = append(group, ds[i])
322		}
323
324		if len(group) > 0 {
325			splitDirs = append(splitDirs, group)
326		}
327	}
328
329	return splitDirs
330}
331
332func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
333	res := &BlockMeta{
334		ULID:    uid,
335		MinTime: blocks[0].MinTime,
336	}
337
338	sources := map[ulid.ULID]struct{}{}
339	// For overlapping blocks, the Maxt can be
340	// in any block so we track it globally.
341	maxt := int64(math.MinInt64)
342
343	for _, b := range blocks {
344		if b.MaxTime > maxt {
345			maxt = b.MaxTime
346		}
347		if b.Compaction.Level > res.Compaction.Level {
348			res.Compaction.Level = b.Compaction.Level
349		}
350		for _, s := range b.Compaction.Sources {
351			sources[s] = struct{}{}
352		}
353		res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
354			ULID:    b.ULID,
355			MinTime: b.MinTime,
356			MaxTime: b.MaxTime,
357		})
358	}
359	res.Compaction.Level++
360
361	for s := range sources {
362		res.Compaction.Sources = append(res.Compaction.Sources, s)
363	}
364	sort.Slice(res.Compaction.Sources, func(i, j int) bool {
365		return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
366	})
367
368	res.MaxTime = maxt
369	return res
370}
371
372// Compact creates a new block in the compactor's directory from the blocks in the
373// provided directories.
374func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
375	var (
376		blocks []BlockReader
377		bs     []*Block
378		metas  []*BlockMeta
379		uids   []string
380	)
381	start := time.Now()
382
383	for _, d := range dirs {
384		meta, _, err := readMetaFile(d)
385		if err != nil {
386			return uid, err
387		}
388
389		var b *Block
390
391		// Use already open blocks if we can, to avoid
392		// having the index data in memory twice.
393		for _, o := range open {
394			if meta.ULID == o.Meta().ULID {
395				b = o
396				break
397			}
398		}
399
400		if b == nil {
401			var err error
402			b, err = OpenBlock(c.logger, d, c.chunkPool)
403			if err != nil {
404				return uid, err
405			}
406			defer b.Close()
407		}
408
409		metas = append(metas, meta)
410		blocks = append(blocks, b)
411		bs = append(bs, b)
412		uids = append(uids, meta.ULID.String())
413	}
414
415	entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
416	uid = ulid.MustNew(ulid.Now(), entropy)
417
418	meta := compactBlockMetas(uid, metas...)
419	err = c.write(dest, meta, blocks...)
420	if err == nil {
421		if meta.Stats.NumSamples == 0 {
422			for _, b := range bs {
423				b.meta.Compaction.Deletable = true
424				n, err := writeMetaFile(c.logger, b.dir, &b.meta)
425				if err != nil {
426					level.Error(c.logger).Log(
427						"msg", "Failed to write 'Deletable' to meta file after compaction",
428						"ulid", b.meta.ULID,
429					)
430				}
431				b.numBytesMeta = n
432			}
433			uid = ulid.ULID{}
434			level.Info(c.logger).Log(
435				"msg", "compact blocks resulted in empty block",
436				"count", len(blocks),
437				"sources", fmt.Sprintf("%v", uids),
438				"duration", time.Since(start),
439			)
440		} else {
441			level.Info(c.logger).Log(
442				"msg", "compact blocks",
443				"count", len(blocks),
444				"mint", meta.MinTime,
445				"maxt", meta.MaxTime,
446				"ulid", meta.ULID,
447				"sources", fmt.Sprintf("%v", uids),
448				"duration", time.Since(start),
449			)
450		}
451		return uid, nil
452	}
453
454	var merr tsdb_errors.MultiError
455	merr.Add(err)
456	if err != context.Canceled {
457		for _, b := range bs {
458			if err := b.setCompactionFailed(); err != nil {
459				merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
460			}
461		}
462	}
463
464	return uid, merr
465}
466
467func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
468	start := time.Now()
469
470	entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
471	uid := ulid.MustNew(ulid.Now(), entropy)
472
473	meta := &BlockMeta{
474		ULID:    uid,
475		MinTime: mint,
476		MaxTime: maxt,
477	}
478	meta.Compaction.Level = 1
479	meta.Compaction.Sources = []ulid.ULID{uid}
480
481	if parent != nil {
482		meta.Compaction.Parents = []BlockDesc{
483			{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
484		}
485	}
486
487	err := c.write(dest, meta, b)
488	if err != nil {
489		return uid, err
490	}
491
492	if meta.Stats.NumSamples == 0 {
493		return ulid.ULID{}, nil
494	}
495
496	level.Info(c.logger).Log(
497		"msg", "write block",
498		"mint", meta.MinTime,
499		"maxt", meta.MaxTime,
500		"ulid", meta.ULID,
501		"duration", time.Since(start),
502	)
503	return uid, nil
504}
505
506// instrumentedChunkWriter is used for level 1 compactions to record statistics
507// about compacted chunks.
508type instrumentedChunkWriter struct {
509	ChunkWriter
510
511	size    prometheus.Histogram
512	samples prometheus.Histogram
513	trange  prometheus.Histogram
514}
515
516func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
517	for _, c := range chunks {
518		w.size.Observe(float64(len(c.Chunk.Bytes())))
519		w.samples.Observe(float64(c.Chunk.NumSamples()))
520		w.trange.Observe(float64(c.MaxTime - c.MinTime))
521	}
522	return w.ChunkWriter.WriteChunks(chunks...)
523}
524
525// write creates a new block that is the union of the provided blocks into dir.
526// It cleans up all files of the old blocks after completing successfully.
527func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
528	dir := filepath.Join(dest, meta.ULID.String())
529	tmp := dir + ".tmp"
530	var closers []io.Closer
531	defer func(t time.Time) {
532		var merr tsdb_errors.MultiError
533		merr.Add(err)
534		merr.Add(closeAll(closers))
535		err = merr.Err()
536
537		// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
538		if err := os.RemoveAll(tmp); err != nil {
539			level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
540		}
541		c.metrics.ran.Inc()
542		c.metrics.duration.Observe(time.Since(t).Seconds())
543	}(time.Now())
544
545	if err = os.RemoveAll(tmp); err != nil {
546		return err
547	}
548
549	if err = os.MkdirAll(tmp, 0777); err != nil {
550		return err
551	}
552
553	// Populate chunk and index files into temporary directory with
554	// data of all blocks.
555	var chunkw ChunkWriter
556
557	chunkw, err = chunks.NewWriter(chunkDir(tmp))
558	if err != nil {
559		return errors.Wrap(err, "open chunk writer")
560	}
561	closers = append(closers, chunkw)
562	// Record written chunk sizes on level 1 compactions.
563	if meta.Compaction.Level == 1 {
564		chunkw = &instrumentedChunkWriter{
565			ChunkWriter: chunkw,
566			size:        c.metrics.chunkSize,
567			samples:     c.metrics.chunkSamples,
568			trange:      c.metrics.chunkRange,
569		}
570	}
571
572	indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
573	if err != nil {
574		return errors.Wrap(err, "open index writer")
575	}
576	closers = append(closers, indexw)
577
578	if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
579		return errors.Wrap(err, "write compaction")
580	}
581
582	select {
583	case <-c.ctx.Done():
584		return c.ctx.Err()
585	default:
586	}
587
588	// We are explicitly closing them here to check for error even
589	// though these are covered under defer. This is because in Windows,
590	// you cannot delete these unless they are closed and the defer is to
591	// make sure they are closed if the function exits due to an error above.
592	var merr tsdb_errors.MultiError
593	for _, w := range closers {
594		merr.Add(w.Close())
595	}
596	closers = closers[:0] // Avoid closing the writers twice in the defer.
597	if merr.Err() != nil {
598		return merr.Err()
599	}
600
601	// Populated block is empty, so exit early.
602	if meta.Stats.NumSamples == 0 {
603		return nil
604	}
605
606	if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
607		return errors.Wrap(err, "write merged meta")
608	}
609
610	// Create an empty tombstones file.
611	if _, err := tombstones.WriteFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
612		return errors.Wrap(err, "write new tombstones file")
613	}
614
615	df, err := fileutil.OpenDir(tmp)
616	if err != nil {
617		return errors.Wrap(err, "open temporary block dir")
618	}
619	defer func() {
620		if df != nil {
621			df.Close()
622		}
623	}()
624
625	if err := df.Sync(); err != nil {
626		return errors.Wrap(err, "sync temporary dir file")
627	}
628
629	// Close temp dir before rename block dir (for windows platform).
630	if err = df.Close(); err != nil {
631		return errors.Wrap(err, "close temporary dir")
632	}
633	df = nil
634
635	// Block successfully written, make visible and remove old ones.
636	if err := fileutil.Replace(tmp, dir); err != nil {
637		return errors.Wrap(err, "rename block dir")
638	}
639
640	return nil
641}
642
643// populateBlock fills the index and chunk writers with new data gathered as the union
644// of the provided blocks. It returns meta information for the new block.
645// It expects sorted blocks input by mint.
646func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
647	if len(blocks) == 0 {
648		return errors.New("cannot populate block from no readers")
649	}
650
651	var (
652		set         ChunkSeriesSet
653		symbols     index.StringIter
654		closers     = []io.Closer{}
655		overlapping bool
656	)
657	defer func() {
658		var merr tsdb_errors.MultiError
659		merr.Add(err)
660		merr.Add(closeAll(closers))
661		err = merr.Err()
662		c.metrics.populatingBlocks.Set(0)
663	}()
664	c.metrics.populatingBlocks.Set(1)
665
666	globalMaxt := blocks[0].Meta().MaxTime
667	for i, b := range blocks {
668		select {
669		case <-c.ctx.Done():
670			return c.ctx.Err()
671		default:
672		}
673
674		if !overlapping {
675			if i > 0 && b.Meta().MinTime < globalMaxt {
676				c.metrics.overlappingBlocks.Inc()
677				overlapping = true
678				level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
679			}
680			if b.Meta().MaxTime > globalMaxt {
681				globalMaxt = b.Meta().MaxTime
682			}
683		}
684
685		indexr, err := b.Index()
686		if err != nil {
687			return errors.Wrapf(err, "open index reader for block %s", b)
688		}
689		closers = append(closers, indexr)
690
691		chunkr, err := b.Chunks()
692		if err != nil {
693			return errors.Wrapf(err, "open chunk reader for block %s", b)
694		}
695		closers = append(closers, chunkr)
696
697		tombsr, err := b.Tombstones()
698		if err != nil {
699			return errors.Wrapf(err, "open tombstone reader for block %s", b)
700		}
701		closers = append(closers, tombsr)
702
703		k, v := index.AllPostingsKey()
704		all, err := indexr.Postings(k, v)
705		if err != nil {
706			return err
707		}
708		all = indexr.SortedPostings(all)
709
710		s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
711		syms := indexr.Symbols()
712
713		if i == 0 {
714			set = s
715			symbols = syms
716			continue
717		}
718		set, err = newCompactionMerger(set, s)
719		if err != nil {
720			return err
721		}
722		symbols = newMergedStringIter(symbols, syms)
723	}
724
725	for symbols.Next() {
726		if err := indexw.AddSymbol(symbols.At()); err != nil {
727			return errors.Wrap(err, "add symbol")
728		}
729	}
730	if symbols.Err() != nil {
731		return errors.Wrap(symbols.Err(), "next symbol")
732	}
733
734	delIter := &deletedIterator{}
735	ref := uint64(0)
736	for set.Next() {
737		select {
738		case <-c.ctx.Done():
739			return c.ctx.Err()
740		default:
741		}
742
743		lset, chks, dranges := set.At() // The chunks here are not fully deleted.
744		if overlapping {
745			// If blocks are overlapping, it is possible to have unsorted chunks.
746			sort.Slice(chks, func(i, j int) bool {
747				return chks[i].MinTime < chks[j].MinTime
748			})
749		}
750
751		// Skip the series with all deleted chunks.
752		if len(chks) == 0 {
753			continue
754		}
755
756		for i, chk := range chks {
757			// Re-encode head chunks that are still open (being appended to) or
758			// outside the compacted MaxTime range.
759			// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
760			// This happens when snapshotting the head block.
761			//
762			// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
763			// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
764			//
765			// TODO think how to avoid the typecasting to verify when it is head block.
766			if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
767				dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
768
769			} else
770			// Sanity check for disk blocks.
771			// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
772			if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
773				return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
774					chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
775			}
776
777			if len(dranges) > 0 {
778				// Re-encode the chunk to not have deleted values.
779				if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
780					continue
781				}
782				newChunk := chunkenc.NewXORChunk()
783				app, err := newChunk.Appender()
784				if err != nil {
785					return err
786				}
787
788				delIter.it = chk.Chunk.Iterator(delIter.it)
789				delIter.intervals = dranges
790
791				var (
792					t int64
793					v float64
794				)
795				for delIter.Next() {
796					t, v = delIter.At()
797					app.Append(t, v)
798				}
799				if err := delIter.Err(); err != nil {
800					return errors.Wrap(err, "iterate chunk while re-encoding")
801				}
802
803				chks[i].Chunk = newChunk
804				chks[i].MaxTime = t
805			}
806		}
807
808		mergedChks := chks
809		if overlapping {
810			mergedChks, err = chunks.MergeOverlappingChunks(chks)
811			if err != nil {
812				return errors.Wrap(err, "merge overlapping chunks")
813			}
814		}
815		if err := chunkw.WriteChunks(mergedChks...); err != nil {
816			return errors.Wrap(err, "write chunks")
817		}
818
819		if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
820			return errors.Wrap(err, "add series")
821		}
822
823		meta.Stats.NumChunks += uint64(len(mergedChks))
824		meta.Stats.NumSeries++
825		for _, chk := range mergedChks {
826			meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
827		}
828
829		for _, chk := range mergedChks {
830			if err := c.chunkPool.Put(chk.Chunk); err != nil {
831				return errors.Wrap(err, "put chunk")
832			}
833		}
834
835		ref++
836	}
837	if set.Err() != nil {
838		return errors.Wrap(set.Err(), "iterate compaction set")
839	}
840
841	return nil
842}
843
844type compactionSeriesSet struct {
845	p          index.Postings
846	index      IndexReader
847	chunks     ChunkReader
848	tombstones tombstones.Reader
849
850	l         labels.Labels
851	c         []chunks.Meta
852	intervals tombstones.Intervals
853	err       error
854}
855
856func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet {
857	return &compactionSeriesSet{
858		index:      i,
859		chunks:     c,
860		tombstones: t,
861		p:          p,
862	}
863}
864
865func (c *compactionSeriesSet) Next() bool {
866	if !c.p.Next() {
867		return false
868	}
869	var err error
870
871	c.intervals, err = c.tombstones.Get(c.p.At())
872	if err != nil {
873		c.err = errors.Wrap(err, "get tombstones")
874		return false
875	}
876
877	if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
878		c.err = errors.Wrapf(err, "get series %d", c.p.At())
879		return false
880	}
881
882	// Remove completely deleted chunks.
883	if len(c.intervals) > 0 {
884		chks := make([]chunks.Meta, 0, len(c.c))
885		for _, chk := range c.c {
886			if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(c.intervals)) {
887				chks = append(chks, chk)
888			}
889		}
890
891		c.c = chks
892	}
893
894	for i := range c.c {
895		chk := &c.c[i]
896
897		chk.Chunk, err = c.chunks.Chunk(chk.Ref)
898		if err != nil {
899			c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref)
900			return false
901		}
902	}
903
904	return true
905}
906
907func (c *compactionSeriesSet) Err() error {
908	if c.err != nil {
909		return c.err
910	}
911	return c.p.Err()
912}
913
914func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
915	return c.l, c.c, c.intervals
916}
917
918type compactionMerger struct {
919	a, b ChunkSeriesSet
920
921	aok, bok  bool
922	l         labels.Labels
923	c         []chunks.Meta
924	intervals tombstones.Intervals
925}
926
927func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
928	c := &compactionMerger{
929		a: a,
930		b: b,
931	}
932	// Initialize first elements of both sets as Next() needs
933	// one element look-ahead.
934	c.aok = c.a.Next()
935	c.bok = c.b.Next()
936
937	return c, c.Err()
938}
939
940func (c *compactionMerger) compare() int {
941	if !c.aok {
942		return 1
943	}
944	if !c.bok {
945		return -1
946	}
947	a, _, _ := c.a.At()
948	b, _, _ := c.b.At()
949	return labels.Compare(a, b)
950}
951
952func (c *compactionMerger) Next() bool {
953	if !c.aok && !c.bok || c.Err() != nil {
954		return false
955	}
956	// While advancing child iterators the memory used for labels and chunks
957	// may be reused. When picking a series we have to store the result.
958	var lset labels.Labels
959	var chks []chunks.Meta
960
961	d := c.compare()
962	if d > 0 {
963		lset, chks, c.intervals = c.b.At()
964		c.l = append(c.l[:0], lset...)
965		c.c = append(c.c[:0], chks...)
966
967		c.bok = c.b.Next()
968	} else if d < 0 {
969		lset, chks, c.intervals = c.a.At()
970		c.l = append(c.l[:0], lset...)
971		c.c = append(c.c[:0], chks...)
972
973		c.aok = c.a.Next()
974	} else {
975		// Both sets contain the current series. Chain them into a single one.
976		l, ca, ra := c.a.At()
977		_, cb, rb := c.b.At()
978
979		for _, r := range rb {
980			ra = ra.Add(r)
981		}
982
983		c.l = append(c.l[:0], l...)
984		c.c = append(append(c.c[:0], ca...), cb...)
985		c.intervals = ra
986
987		c.aok = c.a.Next()
988		c.bok = c.b.Next()
989	}
990
991	return true
992}
993
994func (c *compactionMerger) Err() error {
995	if c.a.Err() != nil {
996		return c.a.Err()
997	}
998	return c.b.Err()
999}
1000
1001func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
1002	return c.l, c.c, c.intervals
1003}
1004
1005func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {
1006	return &mergedStringIter{a: a, b: b, aok: a.Next(), bok: b.Next()}
1007}
1008
1009type mergedStringIter struct {
1010	a        index.StringIter
1011	b        index.StringIter
1012	aok, bok bool
1013	cur      string
1014}
1015
1016func (m *mergedStringIter) Next() bool {
1017	if (!m.aok && !m.bok) || (m.Err() != nil) {
1018		return false
1019	}
1020
1021	if !m.aok {
1022		m.cur = m.b.At()
1023		m.bok = m.b.Next()
1024	} else if !m.bok {
1025		m.cur = m.a.At()
1026		m.aok = m.a.Next()
1027	} else if m.b.At() > m.a.At() {
1028		m.cur = m.a.At()
1029		m.aok = m.a.Next()
1030	} else if m.a.At() > m.b.At() {
1031		m.cur = m.b.At()
1032		m.bok = m.b.Next()
1033	} else { // Equal.
1034		m.cur = m.b.At()
1035		m.aok = m.a.Next()
1036		m.bok = m.b.Next()
1037	}
1038
1039	return true
1040}
1041func (m mergedStringIter) At() string { return m.cur }
1042func (m mergedStringIter) Err() error {
1043	if m.a.Err() != nil {
1044		return m.a.Err()
1045	}
1046	return m.b.Err()
1047}
1048