1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"context"
18	"fmt"
19	"io"
20	"math"
21	"math/rand"
22	"os"
23	"path/filepath"
24	"sort"
25	"time"
26
27	"github.com/go-kit/kit/log"
28	"github.com/go-kit/kit/log/level"
29	"github.com/oklog/ulid"
30	"github.com/pkg/errors"
31	"github.com/prometheus/client_golang/prometheus"
32	"github.com/prometheus/tsdb/chunkenc"
33	"github.com/prometheus/tsdb/chunks"
34	tsdb_errors "github.com/prometheus/tsdb/errors"
35	"github.com/prometheus/tsdb/fileutil"
36	"github.com/prometheus/tsdb/index"
37	"github.com/prometheus/tsdb/labels"
38)
39
40// ExponentialBlockRanges returns the time ranges based on the stepSize.
41func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
42	ranges := make([]int64, 0, steps)
43	curRange := minSize
44	for i := 0; i < steps; i++ {
45		ranges = append(ranges, curRange)
46		curRange = curRange * int64(stepSize)
47	}
48
49	return ranges
50}
51
52// Compactor provides compaction against an underlying storage
53// of time series data.
54type Compactor interface {
55	// Plan returns a set of directories that can be compacted concurrently.
56	// The directories can be overlapping.
57	// Results returned when compactions are in progress are undefined.
58	Plan(dir string) ([]string, error)
59
60	// Write persists a Block into a directory.
61	// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
62	Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
63
64	// Compact runs compaction against the provided directories. Must
65	// only be called concurrently with results of Plan().
66	// Can optionally pass a list of already open blocks,
67	// to avoid having to reopen them.
68	// When resulting Block has 0 samples
69	//  * No block is written.
70	//  * The source dirs are marked Deletable.
71	//  * Returns empty ulid.ULID{}.
72	Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
73}
74
75// LeveledCompactor implements the Compactor interface.
76type LeveledCompactor struct {
77	metrics   *compactorMetrics
78	logger    log.Logger
79	ranges    []int64
80	chunkPool chunkenc.Pool
81	ctx       context.Context
82}
83
84type compactorMetrics struct {
85	ran               prometheus.Counter
86	populatingBlocks  prometheus.Gauge
87	failed            prometheus.Counter
88	overlappingBlocks prometheus.Counter
89	duration          prometheus.Histogram
90	chunkSize         prometheus.Histogram
91	chunkSamples      prometheus.Histogram
92	chunkRange        prometheus.Histogram
93}
94
95func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
96	m := &compactorMetrics{}
97
98	m.ran = prometheus.NewCounter(prometheus.CounterOpts{
99		Name: "prometheus_tsdb_compactions_total",
100		Help: "Total number of compactions that were executed for the partition.",
101	})
102	m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{
103		Name: "prometheus_tsdb_compaction_populating_block",
104		Help: "Set to 1 when a block is currently being written to the disk.",
105	})
106	m.failed = prometheus.NewCounter(prometheus.CounterOpts{
107		Name: "prometheus_tsdb_compactions_failed_total",
108		Help: "Total number of compactions that failed for the partition.",
109	})
110	m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
111		Name: "prometheus_tsdb_vertical_compactions_total",
112		Help: "Total number of compactions done on overlapping blocks.",
113	})
114	m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
115		Name:    "prometheus_tsdb_compaction_duration_seconds",
116		Help:    "Duration of compaction runs",
117		Buckets: prometheus.ExponentialBuckets(1, 2, 10),
118	})
119	m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
120		Name:    "prometheus_tsdb_compaction_chunk_size_bytes",
121		Help:    "Final size of chunks on their first compaction",
122		Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
123	})
124	m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
125		Name:    "prometheus_tsdb_compaction_chunk_samples",
126		Help:    "Final number of samples on their first compaction",
127		Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
128	})
129	m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
130		Name:    "prometheus_tsdb_compaction_chunk_range_seconds",
131		Help:    "Final time range of chunks on their first compaction",
132		Buckets: prometheus.ExponentialBuckets(100, 4, 10),
133	})
134
135	if r != nil {
136		r.MustRegister(
137			m.ran,
138			m.populatingBlocks,
139			m.failed,
140			m.overlappingBlocks,
141			m.duration,
142			m.chunkRange,
143			m.chunkSamples,
144			m.chunkSize,
145		)
146	}
147	return m
148}
149
150// NewLeveledCompactor returns a LeveledCompactor.
151func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
152	if len(ranges) == 0 {
153		return nil, errors.Errorf("at least one range must be provided")
154	}
155	if pool == nil {
156		pool = chunkenc.NewPool()
157	}
158	if l == nil {
159		l = log.NewNopLogger()
160	}
161	return &LeveledCompactor{
162		ranges:    ranges,
163		chunkPool: pool,
164		logger:    l,
165		metrics:   newCompactorMetrics(r),
166		ctx:       ctx,
167	}, nil
168}
169
170type dirMeta struct {
171	dir  string
172	meta *BlockMeta
173}
174
175// Plan returns a list of compactable blocks in the provided directory.
176func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
177	dirs, err := blockDirs(dir)
178	if err != nil {
179		return nil, err
180	}
181	if len(dirs) < 1 {
182		return nil, nil
183	}
184
185	var dms []dirMeta
186	for _, dir := range dirs {
187		meta, err := readMetaFile(dir)
188		if err != nil {
189			return nil, err
190		}
191		dms = append(dms, dirMeta{dir, meta})
192	}
193	return c.plan(dms)
194}
195
196func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
197	sort.Slice(dms, func(i, j int) bool {
198		return dms[i].meta.MinTime < dms[j].meta.MinTime
199	})
200
201	res := c.selectOverlappingDirs(dms)
202	if len(res) > 0 {
203		return res, nil
204	}
205	// No overlapping blocks, do compaction the usual way.
206	// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
207	// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
208	dms = dms[:len(dms)-1]
209
210	for _, dm := range c.selectDirs(dms) {
211		res = append(res, dm.dir)
212	}
213	if len(res) > 0 {
214		return res, nil
215	}
216
217	// Compact any blocks with big enough time range that have >5% tombstones.
218	for i := len(dms) - 1; i >= 0; i-- {
219		meta := dms[i].meta
220		if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
221			break
222		}
223		if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
224			return []string{dms[i].dir}, nil
225		}
226	}
227
228	return nil, nil
229}
230
231// selectDirs returns the dir metas that should be compacted into a single new block.
232// If only a single block range is configured, the result is always nil.
233func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
234	if len(c.ranges) < 2 || len(ds) < 1 {
235		return nil
236	}
237
238	highTime := ds[len(ds)-1].meta.MinTime
239
240	for _, iv := range c.ranges[1:] {
241		parts := splitByRange(ds, iv)
242		if len(parts) == 0 {
243			continue
244		}
245
246	Outer:
247		for _, p := range parts {
248			// Do not select the range if it has a block whose compaction failed.
249			for _, dm := range p {
250				if dm.meta.Compaction.Failed {
251					continue Outer
252				}
253			}
254
255			mint := p[0].meta.MinTime
256			maxt := p[len(p)-1].meta.MaxTime
257			// Pick the range of blocks if it spans the full range (potentially with gaps)
258			// or is before the most recent block.
259			// This ensures we don't compact blocks prematurely when another one of the same
260			// size still fits in the range.
261			if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
262				return p
263			}
264		}
265	}
266
267	return nil
268}
269
270// selectOverlappingDirs returns all dirs with overlapping time ranges.
271// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
272func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
273	if len(ds) < 2 {
274		return nil
275	}
276	var overlappingDirs []string
277	globalMaxt := ds[0].meta.MaxTime
278	for i, d := range ds[1:] {
279		if d.meta.MinTime < globalMaxt {
280			if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
281				overlappingDirs = append(overlappingDirs, ds[i].dir)
282			}
283			overlappingDirs = append(overlappingDirs, d.dir)
284		} else if len(overlappingDirs) > 0 {
285			break
286		}
287		if d.meta.MaxTime > globalMaxt {
288			globalMaxt = d.meta.MaxTime
289		}
290	}
291	return overlappingDirs
292}
293
294// splitByRange splits the directories by the time range. The range sequence starts at 0.
295//
296// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
297// it returns [0-10, 10-20], [50-60], [90-100].
298func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
299	var splitDirs [][]dirMeta
300
301	for i := 0; i < len(ds); {
302		var (
303			group []dirMeta
304			t0    int64
305			m     = ds[i].meta
306		)
307		// Compute start of aligned time range of size tr closest to the current block's start.
308		if m.MinTime >= 0 {
309			t0 = tr * (m.MinTime / tr)
310		} else {
311			t0 = tr * ((m.MinTime - tr + 1) / tr)
312		}
313		// Skip blocks that don't fall into the range. This can happen via mis-alignment or
314		// by being the multiple of the intended range.
315		if m.MaxTime > t0+tr {
316			i++
317			continue
318		}
319
320		// Add all dirs to the current group that are within [t0, t0+tr].
321		for ; i < len(ds); i++ {
322			// Either the block falls into the next range or doesn't fit at all (checked above).
323			if ds[i].meta.MaxTime > t0+tr {
324				break
325			}
326			group = append(group, ds[i])
327		}
328
329		if len(group) > 0 {
330			splitDirs = append(splitDirs, group)
331		}
332	}
333
334	return splitDirs
335}
336
337func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
338	res := &BlockMeta{
339		ULID:    uid,
340		MinTime: blocks[0].MinTime,
341	}
342
343	sources := map[ulid.ULID]struct{}{}
344	// For overlapping blocks, the Maxt can be
345	// in any block so we track it globally.
346	maxt := int64(math.MinInt64)
347
348	for _, b := range blocks {
349		if b.MaxTime > maxt {
350			maxt = b.MaxTime
351		}
352		if b.Compaction.Level > res.Compaction.Level {
353			res.Compaction.Level = b.Compaction.Level
354		}
355		for _, s := range b.Compaction.Sources {
356			sources[s] = struct{}{}
357		}
358		res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
359			ULID:    b.ULID,
360			MinTime: b.MinTime,
361			MaxTime: b.MaxTime,
362		})
363	}
364	res.Compaction.Level++
365
366	for s := range sources {
367		res.Compaction.Sources = append(res.Compaction.Sources, s)
368	}
369	sort.Slice(res.Compaction.Sources, func(i, j int) bool {
370		return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
371	})
372
373	res.MaxTime = maxt
374	return res
375}
376
377// Compact creates a new block in the compactor's directory from the blocks in the
378// provided directories.
379func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) {
380	var (
381		blocks []BlockReader
382		bs     []*Block
383		metas  []*BlockMeta
384		uids   []string
385	)
386	start := time.Now()
387
388	for _, d := range dirs {
389		meta, err := readMetaFile(d)
390		if err != nil {
391			return uid, err
392		}
393
394		var b *Block
395
396		// Use already open blocks if we can, to avoid
397		// having the index data in memory twice.
398		for _, o := range open {
399			if meta.ULID == o.Meta().ULID {
400				b = o
401				break
402			}
403		}
404
405		if b == nil {
406			var err error
407			b, err = OpenBlock(c.logger, d, c.chunkPool)
408			if err != nil {
409				return uid, err
410			}
411			defer b.Close()
412		}
413
414		metas = append(metas, meta)
415		blocks = append(blocks, b)
416		bs = append(bs, b)
417		uids = append(uids, meta.ULID.String())
418	}
419
420	entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
421	uid = ulid.MustNew(ulid.Now(), entropy)
422
423	meta := compactBlockMetas(uid, metas...)
424	err = c.write(dest, meta, blocks...)
425	if err == nil {
426		if meta.Stats.NumSamples == 0 {
427			for _, b := range bs {
428				b.meta.Compaction.Deletable = true
429				if err = writeMetaFile(b.dir, &b.meta); err != nil {
430					level.Error(c.logger).Log(
431						"msg", "Failed to write 'Deletable' to meta file after compaction",
432						"ulid", b.meta.ULID,
433					)
434				}
435			}
436			uid = ulid.ULID{}
437			level.Info(c.logger).Log(
438				"msg", "compact blocks resulted in empty block",
439				"count", len(blocks),
440				"sources", fmt.Sprintf("%v", uids),
441				"duration", time.Since(start),
442			)
443		} else {
444			level.Info(c.logger).Log(
445				"msg", "compact blocks",
446				"count", len(blocks),
447				"mint", meta.MinTime,
448				"maxt", meta.MaxTime,
449				"ulid", meta.ULID,
450				"sources", fmt.Sprintf("%v", uids),
451				"duration", time.Since(start),
452			)
453		}
454		return uid, nil
455	}
456
457	var merr tsdb_errors.MultiError
458	merr.Add(err)
459	if err != context.Canceled {
460		for _, b := range bs {
461			if err := b.setCompactionFailed(); err != nil {
462				merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir()))
463			}
464		}
465	}
466
467	return uid, merr
468}
469
470func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
471	start := time.Now()
472
473	entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
474	uid := ulid.MustNew(ulid.Now(), entropy)
475
476	meta := &BlockMeta{
477		ULID:    uid,
478		MinTime: mint,
479		MaxTime: maxt,
480	}
481	meta.Compaction.Level = 1
482	meta.Compaction.Sources = []ulid.ULID{uid}
483
484	if parent != nil {
485		meta.Compaction.Parents = []BlockDesc{
486			{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
487		}
488	}
489
490	err := c.write(dest, meta, b)
491	if err != nil {
492		return uid, err
493	}
494
495	if meta.Stats.NumSamples == 0 {
496		return ulid.ULID{}, nil
497	}
498
499	level.Info(c.logger).Log(
500		"msg", "write block",
501		"mint", meta.MinTime,
502		"maxt", meta.MaxTime,
503		"ulid", meta.ULID,
504		"duration", time.Since(start),
505	)
506	return uid, nil
507}
508
509// instrumentedChunkWriter is used for level 1 compactions to record statistics
510// about compacted chunks.
511type instrumentedChunkWriter struct {
512	ChunkWriter
513
514	size    prometheus.Histogram
515	samples prometheus.Histogram
516	trange  prometheus.Histogram
517}
518
519func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
520	for _, c := range chunks {
521		w.size.Observe(float64(len(c.Chunk.Bytes())))
522		w.samples.Observe(float64(c.Chunk.NumSamples()))
523		w.trange.Observe(float64(c.MaxTime - c.MinTime))
524	}
525	return w.ChunkWriter.WriteChunks(chunks...)
526}
527
528// write creates a new block that is the union of the provided blocks into dir.
529// It cleans up all files of the old blocks after completing successfully.
530func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
531	dir := filepath.Join(dest, meta.ULID.String())
532	tmp := dir + ".tmp"
533	var closers []io.Closer
534	defer func(t time.Time) {
535		var merr tsdb_errors.MultiError
536		merr.Add(err)
537		merr.Add(closeAll(closers))
538		err = merr.Err()
539
540		// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
541		if err := os.RemoveAll(tmp); err != nil {
542			level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
543		}
544		if err != nil {
545			c.metrics.failed.Inc()
546		}
547		c.metrics.ran.Inc()
548		c.metrics.duration.Observe(time.Since(t).Seconds())
549	}(time.Now())
550
551	if err = os.RemoveAll(tmp); err != nil {
552		return err
553	}
554
555	if err = os.MkdirAll(tmp, 0777); err != nil {
556		return err
557	}
558
559	// Populate chunk and index files into temporary directory with
560	// data of all blocks.
561	var chunkw ChunkWriter
562
563	chunkw, err = chunks.NewWriter(chunkDir(tmp))
564	if err != nil {
565		return errors.Wrap(err, "open chunk writer")
566	}
567	closers = append(closers, chunkw)
568	// Record written chunk sizes on level 1 compactions.
569	if meta.Compaction.Level == 1 {
570		chunkw = &instrumentedChunkWriter{
571			ChunkWriter: chunkw,
572			size:        c.metrics.chunkSize,
573			samples:     c.metrics.chunkSamples,
574			trange:      c.metrics.chunkRange,
575		}
576	}
577
578	indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
579	if err != nil {
580		return errors.Wrap(err, "open index writer")
581	}
582	closers = append(closers, indexw)
583
584	if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
585		return errors.Wrap(err, "write compaction")
586	}
587
588	select {
589	case <-c.ctx.Done():
590		return c.ctx.Err()
591	default:
592	}
593
594	// We are explicitly closing them here to check for error even
595	// though these are covered under defer. This is because in Windows,
596	// you cannot delete these unless they are closed and the defer is to
597	// make sure they are closed if the function exits due to an error above.
598	var merr tsdb_errors.MultiError
599	for _, w := range closers {
600		merr.Add(w.Close())
601	}
602	closers = closers[:0] // Avoid closing the writers twice in the defer.
603	if merr.Err() != nil {
604		return merr.Err()
605	}
606
607	// Populated block is empty, so exit early.
608	if meta.Stats.NumSamples == 0 {
609		return nil
610	}
611
612	if err = writeMetaFile(tmp, meta); err != nil {
613		return errors.Wrap(err, "write merged meta")
614	}
615
616	// Create an empty tombstones file.
617	if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
618		return errors.Wrap(err, "write new tombstones file")
619	}
620
621	df, err := fileutil.OpenDir(tmp)
622	if err != nil {
623		return errors.Wrap(err, "open temporary block dir")
624	}
625	defer func() {
626		if df != nil {
627			df.Close()
628		}
629	}()
630
631	if err := df.Sync(); err != nil {
632		return errors.Wrap(err, "sync temporary dir file")
633	}
634
635	// Close temp dir before rename block dir (for windows platform).
636	if err = df.Close(); err != nil {
637		return errors.Wrap(err, "close temporary dir")
638	}
639	df = nil
640
641	// Block successfully written, make visible and remove old ones.
642	if err := renameFile(tmp, dir); err != nil {
643		return errors.Wrap(err, "rename block dir")
644	}
645
646	return nil
647}
648
649// populateBlock fills the index and chunk writers with new data gathered as the union
650// of the provided blocks. It returns meta information for the new block.
651// It expects sorted blocks input by mint.
652func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
653	if len(blocks) == 0 {
654		return errors.New("cannot populate block from no readers")
655	}
656
657	var (
658		set         ChunkSeriesSet
659		allSymbols  = make(map[string]struct{}, 1<<16)
660		closers     = []io.Closer{}
661		overlapping bool
662	)
663	defer func() {
664		var merr tsdb_errors.MultiError
665		merr.Add(err)
666		merr.Add(closeAll(closers))
667		err = merr.Err()
668		c.metrics.populatingBlocks.Set(0)
669	}()
670	c.metrics.populatingBlocks.Set(1)
671
672	globalMaxt := blocks[0].MaxTime()
673	for i, b := range blocks {
674		select {
675		case <-c.ctx.Done():
676			return c.ctx.Err()
677		default:
678		}
679
680		if !overlapping {
681			if i > 0 && b.MinTime() < globalMaxt {
682				c.metrics.overlappingBlocks.Inc()
683				overlapping = true
684				level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
685			}
686			if b.MaxTime() > globalMaxt {
687				globalMaxt = b.MaxTime()
688			}
689		}
690
691		indexr, err := b.Index()
692		if err != nil {
693			return errors.Wrapf(err, "open index reader for block %s", b)
694		}
695		closers = append(closers, indexr)
696
697		chunkr, err := b.Chunks()
698		if err != nil {
699			return errors.Wrapf(err, "open chunk reader for block %s", b)
700		}
701		closers = append(closers, chunkr)
702
703		tombsr, err := b.Tombstones()
704		if err != nil {
705			return errors.Wrapf(err, "open tombstone reader for block %s", b)
706		}
707		closers = append(closers, tombsr)
708
709		symbols, err := indexr.Symbols()
710		if err != nil {
711			return errors.Wrap(err, "read symbols")
712		}
713		for s := range symbols {
714			allSymbols[s] = struct{}{}
715		}
716
717		all, err := indexr.Postings(index.AllPostingsKey())
718		if err != nil {
719			return err
720		}
721		all = indexr.SortedPostings(all)
722
723		s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
724
725		if i == 0 {
726			set = s
727			continue
728		}
729		set, err = newCompactionMerger(set, s)
730		if err != nil {
731			return err
732		}
733	}
734
735	// We fully rebuild the postings list index from merged series.
736	var (
737		postings = index.NewMemPostings()
738		values   = map[string]stringset{}
739		i        = uint64(0)
740	)
741
742	if err := indexw.AddSymbols(allSymbols); err != nil {
743		return errors.Wrap(err, "add symbols")
744	}
745
746	for set.Next() {
747		select {
748		case <-c.ctx.Done():
749			return c.ctx.Err()
750		default:
751		}
752
753		lset, chks, dranges := set.At() // The chunks here are not fully deleted.
754		if overlapping {
755			// If blocks are overlapping, it is possible to have unsorted chunks.
756			sort.Slice(chks, func(i, j int) bool {
757				return chks[i].MinTime < chks[j].MinTime
758			})
759		}
760
761		// Skip the series with all deleted chunks.
762		if len(chks) == 0 {
763			continue
764		}
765
766		for i, chk := range chks {
767			if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
768				return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
769					chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
770			}
771
772			if len(dranges) > 0 {
773				// Re-encode the chunk to not have deleted values.
774				if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
775					continue
776				}
777				newChunk := chunkenc.NewXORChunk()
778				app, err := newChunk.Appender()
779				if err != nil {
780					return err
781				}
782
783				it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
784				for it.Next() {
785					ts, v := it.At()
786					app.Append(ts, v)
787				}
788
789				chks[i].Chunk = newChunk
790			}
791		}
792
793		mergedChks := chks
794		if overlapping {
795			mergedChks, err = chunks.MergeOverlappingChunks(chks)
796			if err != nil {
797				return errors.Wrap(err, "merge overlapping chunks")
798			}
799		}
800		if err := chunkw.WriteChunks(mergedChks...); err != nil {
801			return errors.Wrap(err, "write chunks")
802		}
803
804		if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
805			return errors.Wrap(err, "add series")
806		}
807
808		meta.Stats.NumChunks += uint64(len(mergedChks))
809		meta.Stats.NumSeries++
810		for _, chk := range mergedChks {
811			meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
812		}
813
814		for _, chk := range mergedChks {
815			if err := c.chunkPool.Put(chk.Chunk); err != nil {
816				return errors.Wrap(err, "put chunk")
817			}
818		}
819
820		for _, l := range lset {
821			valset, ok := values[l.Name]
822			if !ok {
823				valset = stringset{}
824				values[l.Name] = valset
825			}
826			valset.set(l.Value)
827		}
828		postings.Add(i, lset)
829
830		i++
831	}
832	if set.Err() != nil {
833		return errors.Wrap(set.Err(), "iterate compaction set")
834	}
835
836	s := make([]string, 0, 256)
837	for n, v := range values {
838		s = s[:0]
839
840		for x := range v {
841			s = append(s, x)
842		}
843		if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
844			return errors.Wrap(err, "write label index")
845		}
846	}
847
848	for _, l := range postings.SortedKeys() {
849		if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
850			return errors.Wrap(err, "write postings")
851		}
852	}
853	return nil
854}
855
856type compactionSeriesSet struct {
857	p          index.Postings
858	index      IndexReader
859	chunks     ChunkReader
860	tombstones TombstoneReader
861
862	l         labels.Labels
863	c         []chunks.Meta
864	intervals Intervals
865	err       error
866}
867
868func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet {
869	return &compactionSeriesSet{
870		index:      i,
871		chunks:     c,
872		tombstones: t,
873		p:          p,
874	}
875}
876
877func (c *compactionSeriesSet) Next() bool {
878	if !c.p.Next() {
879		return false
880	}
881	var err error
882
883	c.intervals, err = c.tombstones.Get(c.p.At())
884	if err != nil {
885		c.err = errors.Wrap(err, "get tombstones")
886		return false
887	}
888
889	if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
890		c.err = errors.Wrapf(err, "get series %d", c.p.At())
891		return false
892	}
893
894	// Remove completely deleted chunks.
895	if len(c.intervals) > 0 {
896		chks := make([]chunks.Meta, 0, len(c.c))
897		for _, chk := range c.c {
898			if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
899				chks = append(chks, chk)
900			}
901		}
902
903		c.c = chks
904	}
905
906	for i := range c.c {
907		chk := &c.c[i]
908
909		chk.Chunk, err = c.chunks.Chunk(chk.Ref)
910		if err != nil {
911			c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref)
912			return false
913		}
914	}
915
916	return true
917}
918
919func (c *compactionSeriesSet) Err() error {
920	if c.err != nil {
921		return c.err
922	}
923	return c.p.Err()
924}
925
926func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
927	return c.l, c.c, c.intervals
928}
929
930type compactionMerger struct {
931	a, b ChunkSeriesSet
932
933	aok, bok  bool
934	l         labels.Labels
935	c         []chunks.Meta
936	intervals Intervals
937}
938
939func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
940	c := &compactionMerger{
941		a: a,
942		b: b,
943	}
944	// Initialize first elements of both sets as Next() needs
945	// one element look-ahead.
946	c.aok = c.a.Next()
947	c.bok = c.b.Next()
948
949	return c, c.Err()
950}
951
952func (c *compactionMerger) compare() int {
953	if !c.aok {
954		return 1
955	}
956	if !c.bok {
957		return -1
958	}
959	a, _, _ := c.a.At()
960	b, _, _ := c.b.At()
961	return labels.Compare(a, b)
962}
963
964func (c *compactionMerger) Next() bool {
965	if !c.aok && !c.bok || c.Err() != nil {
966		return false
967	}
968	// While advancing child iterators the memory used for labels and chunks
969	// may be reused. When picking a series we have to store the result.
970	var lset labels.Labels
971	var chks []chunks.Meta
972
973	d := c.compare()
974	if d > 0 {
975		lset, chks, c.intervals = c.b.At()
976		c.l = append(c.l[:0], lset...)
977		c.c = append(c.c[:0], chks...)
978
979		c.bok = c.b.Next()
980	} else if d < 0 {
981		lset, chks, c.intervals = c.a.At()
982		c.l = append(c.l[:0], lset...)
983		c.c = append(c.c[:0], chks...)
984
985		c.aok = c.a.Next()
986	} else {
987		// Both sets contain the current series. Chain them into a single one.
988		l, ca, ra := c.a.At()
989		_, cb, rb := c.b.At()
990
991		for _, r := range rb {
992			ra = ra.add(r)
993		}
994
995		c.l = append(c.l[:0], l...)
996		c.c = append(append(c.c[:0], ca...), cb...)
997		c.intervals = ra
998
999		c.aok = c.a.Next()
1000		c.bok = c.b.Next()
1001	}
1002
1003	return true
1004}
1005
1006func (c *compactionMerger) Err() error {
1007	if c.a.Err() != nil {
1008		return c.a.Err()
1009	}
1010	return c.b.Err()
1011}
1012
1013func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
1014	return c.l, c.c, c.intervals
1015}
1016
1017func renameFile(from, to string) error {
1018	if err := os.RemoveAll(to); err != nil {
1019		return err
1020	}
1021	if err := os.Rename(from, to); err != nil {
1022		return err
1023	}
1024
1025	// Directory was renamed; sync parent dir to persist rename.
1026	pdir, err := fileutil.OpenDir(filepath.Dir(to))
1027	if err != nil {
1028		return err
1029	}
1030
1031	if err = pdir.Sync(); err != nil {
1032		pdir.Close()
1033		return err
1034	}
1035	return pdir.Close()
1036}
1037