1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"context"
18	"fmt"
19	"io/ioutil"
20	"math"
21	"os"
22	"path"
23	"path/filepath"
24	"testing"
25	"time"
26
27	"github.com/go-kit/log"
28	"github.com/pkg/errors"
29	prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
30	"github.com/stretchr/testify/require"
31
32	"github.com/prometheus/prometheus/pkg/labels"
33	"github.com/prometheus/prometheus/tsdb/chunkenc"
34	"github.com/prometheus/prometheus/tsdb/chunks"
35	"github.com/prometheus/prometheus/tsdb/fileutil"
36	"github.com/prometheus/prometheus/tsdb/tombstones"
37)
38
39func TestSplitByRange(t *testing.T) {
40	cases := []struct {
41		trange int64
42		ranges [][2]int64
43		output [][][2]int64
44	}{
45		{
46			trange: 60,
47			ranges: [][2]int64{{0, 10}},
48			output: [][][2]int64{
49				{{0, 10}},
50			},
51		},
52		{
53			trange: 60,
54			ranges: [][2]int64{{0, 60}},
55			output: [][][2]int64{
56				{{0, 60}},
57			},
58		},
59		{
60			trange: 60,
61			ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}},
62			output: [][][2]int64{
63				{{0, 10}, {9, 15}, {30, 60}},
64			},
65		},
66		{
67			trange: 60,
68			ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}},
69			output: [][][2]int64{
70				{{70, 90}},
71				{{125, 130}, {130, 180}},
72				{{1000, 1001}},
73			},
74		},
75		// Mis-aligned or too-large blocks are ignored.
76		{
77			trange: 60,
78			ranges: [][2]int64{{50, 70}, {70, 80}},
79			output: [][][2]int64{
80				{{70, 80}},
81			},
82		},
83		{
84			trange: 72,
85			ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}},
86			output: [][][2]int64{
87				{{144, 216}},
88				{{216, 288}},
89			},
90		},
91		// Various awkward edge cases easy to hit with negative numbers.
92		{
93			trange: 60,
94			ranges: [][2]int64{{-10, -5}},
95			output: [][][2]int64{
96				{{-10, -5}},
97			},
98		},
99		{
100			trange: 60,
101			ranges: [][2]int64{{-60, -50}, {-10, -5}},
102			output: [][][2]int64{
103				{{-60, -50}, {-10, -5}},
104			},
105		},
106		{
107			trange: 60,
108			ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}},
109			output: [][][2]int64{
110				{{-60, -50}, {-10, -5}},
111				{{0, 15}},
112			},
113		},
114	}
115
116	for _, c := range cases {
117		// Transform input range tuples into dirMetas.
118		blocks := make([]dirMeta, 0, len(c.ranges))
119		for _, r := range c.ranges {
120			blocks = append(blocks, dirMeta{
121				meta: &BlockMeta{
122					MinTime: r[0],
123					MaxTime: r[1],
124				},
125			})
126		}
127
128		// Transform output range tuples into dirMetas.
129		exp := make([][]dirMeta, len(c.output))
130		for i, group := range c.output {
131			for _, r := range group {
132				exp[i] = append(exp[i], dirMeta{
133					meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]},
134				})
135			}
136		}
137
138		require.Equal(t, exp, splitByRange(blocks, c.trange))
139	}
140}
141
142// See https://github.com/prometheus/prometheus/issues/3064
143func TestNoPanicFor0Tombstones(t *testing.T) {
144	metas := []dirMeta{
145		{
146			dir: "1",
147			meta: &BlockMeta{
148				MinTime: 0,
149				MaxTime: 100,
150			},
151		},
152		{
153			dir: "2",
154			meta: &BlockMeta{
155				MinTime: 101,
156				MaxTime: 200,
157			},
158		},
159	}
160
161	c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
162	require.NoError(t, err)
163
164	c.plan(metas)
165}
166
167func TestLeveledCompactor_plan(t *testing.T) {
168	// This mimics our default ExponentialBlockRanges with min block size equals to 20.
169	compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
170		20,
171		60,
172		180,
173		540,
174		1620,
175	}, nil, nil)
176	require.NoError(t, err)
177
178	cases := map[string]struct {
179		metas    []dirMeta
180		expected []string
181	}{
182		"Outside Range": {
183			metas: []dirMeta{
184				metaRange("1", 0, 20, nil),
185			},
186			expected: nil,
187		},
188		"We should wait for four blocks of size 20 to appear before compacting.": {
189			metas: []dirMeta{
190				metaRange("1", 0, 20, nil),
191				metaRange("2", 20, 40, nil),
192			},
193			expected: nil,
194		},
195		`We should wait for a next block of size 20 to appear before compacting
196		the existing ones. We have three, but we ignore the fresh one from WAl`: {
197			metas: []dirMeta{
198				metaRange("1", 0, 20, nil),
199				metaRange("2", 20, 40, nil),
200				metaRange("3", 40, 60, nil),
201			},
202			expected: nil,
203		},
204		"Block to fill the entire parent range appeared – should be compacted": {
205			metas: []dirMeta{
206				metaRange("1", 0, 20, nil),
207				metaRange("2", 20, 40, nil),
208				metaRange("3", 40, 60, nil),
209				metaRange("4", 60, 80, nil),
210			},
211			expected: []string{"1", "2", "3"},
212		},
213		`Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one
214		anymore but we ignore fresh one still, so no compaction`: {
215			metas: []dirMeta{
216				metaRange("1", 0, 20, nil),
217				metaRange("2", 20, 40, nil),
218				metaRange("3", 60, 80, nil),
219			},
220			expected: nil,
221		},
222		`Block for the next parent range appeared, and we have a gap with size 20 between second and third block.
223		We will not get this missed gap anymore and we should compact just these two.`: {
224			metas: []dirMeta{
225				metaRange("1", 0, 20, nil),
226				metaRange("2", 20, 40, nil),
227				metaRange("3", 60, 80, nil),
228				metaRange("4", 80, 100, nil),
229			},
230			expected: []string{"1", "2"},
231		},
232		"We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one": {
233			metas: []dirMeta{
234				metaRange("1", 0, 20, nil),
235				metaRange("2", 20, 40, nil),
236				metaRange("3", 40, 60, nil),
237				metaRange("4", 60, 120, nil),
238				metaRange("5", 120, 180, nil),
239			},
240			expected: []string{"1", "2", "3"},
241		},
242		"We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60": {
243			metas: []dirMeta{
244				metaRange("2", 20, 40, nil),
245				metaRange("4", 60, 120, nil),
246				metaRange("5", 960, 980, nil), // Fresh one.
247				metaRange("6", 120, 180, nil),
248				metaRange("7", 720, 960, nil),
249			},
250			expected: []string{"2", "4", "6"},
251		},
252		"Do not select large blocks that have many tombstones when there is no fresh block": {
253			metas: []dirMeta{
254				metaRange("1", 0, 540, &BlockStats{
255					NumSeries:     10,
256					NumTombstones: 3,
257				}),
258			},
259			expected: nil,
260		},
261		"Select large blocks that have many tombstones when fresh appears": {
262			metas: []dirMeta{
263				metaRange("1", 0, 540, &BlockStats{
264					NumSeries:     10,
265					NumTombstones: 3,
266				}),
267				metaRange("2", 540, 560, nil),
268			},
269			expected: []string{"1"},
270		},
271		"For small blocks, do not compact tombstones, even when fresh appears.": {
272			metas: []dirMeta{
273				metaRange("1", 0, 60, &BlockStats{
274					NumSeries:     10,
275					NumTombstones: 3,
276				}),
277				metaRange("2", 60, 80, nil),
278			},
279			expected: nil,
280		},
281		`Regression test: we were stuck in a compact loop where we always recompacted
282		the same block when tombstones and series counts were zero`: {
283			metas: []dirMeta{
284				metaRange("1", 0, 540, &BlockStats{
285					NumSeries:     0,
286					NumTombstones: 0,
287				}),
288				metaRange("2", 540, 560, nil),
289			},
290			expected: nil,
291		},
292		`Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest.
293		We need to actually look on max time instead.
294
295		With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing
296		block overlaps`: {
297			metas: []dirMeta{
298				metaRange("5", 0, 360, nil),
299				metaRange("6", 540, 560, nil), // Fresh one.
300				metaRange("7", 360, 420, nil),
301				metaRange("8", 420, 540, nil),
302			},
303			expected: []string{"7", "8"},
304		},
305		// |--------------|
306		//               |----------------|
307		//                                |--------------|
308		"Overlapping blocks 1": {
309			metas: []dirMeta{
310				metaRange("1", 0, 20, nil),
311				metaRange("2", 19, 40, nil),
312				metaRange("3", 40, 60, nil),
313			},
314			expected: []string{"1", "2"},
315		},
316		// |--------------|
317		//                |--------------|
318		//                        |--------------|
319		"Overlapping blocks 2": {
320			metas: []dirMeta{
321				metaRange("1", 0, 20, nil),
322				metaRange("2", 20, 40, nil),
323				metaRange("3", 30, 50, nil),
324			},
325			expected: []string{"2", "3"},
326		},
327		// |--------------|
328		//         |---------------------|
329		//                       |--------------|
330		"Overlapping blocks 3": {
331			metas: []dirMeta{
332				metaRange("1", 0, 20, nil),
333				metaRange("2", 10, 40, nil),
334				metaRange("3", 30, 50, nil),
335			},
336			expected: []string{"1", "2", "3"},
337		},
338		// |--------------|
339		//               |--------------------------------|
340		//                |--------------|
341		//                               |--------------|
342		"Overlapping blocks 4": {
343			metas: []dirMeta{
344				metaRange("5", 0, 360, nil),
345				metaRange("6", 340, 560, nil),
346				metaRange("7", 360, 420, nil),
347				metaRange("8", 420, 540, nil),
348			},
349			expected: []string{"5", "6", "7", "8"},
350		},
351		// |--------------|
352		//               |--------------|
353		//                                            |--------------|
354		//                                                          |--------------|
355		"Overlapping blocks 5": {
356			metas: []dirMeta{
357				metaRange("1", 0, 10, nil),
358				metaRange("2", 9, 20, nil),
359				metaRange("3", 30, 40, nil),
360				metaRange("4", 39, 50, nil),
361			},
362			expected: []string{"1", "2"},
363		},
364	}
365
366	for title, c := range cases {
367		if !t.Run(title, func(t *testing.T) {
368			res, err := compactor.plan(c.metas)
369			require.NoError(t, err)
370			require.Equal(t, c.expected, res)
371		}) {
372			return
373		}
374	}
375}
376
377func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
378	compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
379		20,
380		60,
381		240,
382		720,
383		2160,
384	}, nil, nil)
385	require.NoError(t, err)
386
387	cases := []struct {
388		metas []dirMeta
389	}{
390		{
391			metas: []dirMeta{
392				metaRange("1", 0, 20, nil),
393				metaRange("2", 20, 40, nil),
394				metaRange("3", 40, 60, nil),
395				metaRange("4", 60, 80, nil),
396			},
397		},
398		{
399			metas: []dirMeta{
400				metaRange("1", 0, 20, nil),
401				metaRange("2", 20, 40, nil),
402				metaRange("3", 60, 80, nil),
403				metaRange("4", 80, 100, nil),
404			},
405		},
406		{
407			metas: []dirMeta{
408				metaRange("1", 0, 20, nil),
409				metaRange("2", 20, 40, nil),
410				metaRange("3", 40, 60, nil),
411				metaRange("4", 60, 120, nil),
412				metaRange("5", 120, 180, nil),
413				metaRange("6", 180, 200, nil),
414			},
415		},
416	}
417
418	for _, c := range cases {
419		c.metas[1].meta.Compaction.Failed = true
420		res, err := compactor.plan(c.metas)
421		require.NoError(t, err)
422
423		require.Equal(t, []string(nil), res)
424	}
425}
426
427func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
428	compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{
429		20,
430		60,
431		240,
432		720,
433		2160,
434	}, nil, nil)
435	require.NoError(t, err)
436
437	tmpdir, err := ioutil.TempDir("", "test")
438	require.NoError(t, err)
439	defer func() {
440		require.NoError(t, os.RemoveAll(tmpdir))
441	}()
442
443	require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{}))
444	_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
445	require.True(t, os.IsNotExist(err), "directory is not cleaned up")
446}
447
448func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
449	meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
450	if stats != nil {
451		meta.Stats = *stats
452	}
453	return dirMeta{
454		dir:  name,
455		meta: meta,
456	}
457}
458
459type erringBReader struct{}
460
461func (erringBReader) Index() (IndexReader, error)            { return nil, errors.New("index") }
462func (erringBReader) Chunks() (ChunkReader, error)           { return nil, errors.New("chunks") }
463func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") }
464func (erringBReader) Meta() BlockMeta                        { return BlockMeta{} }
465func (erringBReader) Size() int64                            { return 0 }
466
467type nopChunkWriter struct{}
468
469func (nopChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return nil }
470func (nopChunkWriter) Close() error                            { return nil }
471
472func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sample) {
473	var curr []sample
474	for i := minTime; i <= maxTime; i++ {
475		curr = append(curr, sample{t: i})
476		if len(curr) >= maxSamplesPerChunk {
477			ret = append(ret, curr)
478			curr = []sample{}
479		}
480	}
481	if len(curr) > 0 {
482		ret = append(ret, curr)
483	}
484	return ret
485}
486
487func TestCompaction_populateBlock(t *testing.T) {
488	for _, tc := range []struct {
489		title              string
490		inputSeriesSamples [][]seriesSamples
491		compactMinTime     int64
492		compactMaxTime     int64 // When not defined the test runner sets a default of math.MaxInt64.
493		expSeriesSamples   []seriesSamples
494		expErr             error
495	}{
496		{
497			title:              "Populate block from empty input should return error.",
498			inputSeriesSamples: [][]seriesSamples{},
499			expErr:             errors.New("cannot populate block from no readers"),
500		},
501		{
502			// Populate from single block without chunks. We expect these kind of series being ignored.
503			inputSeriesSamples: [][]seriesSamples{
504				{{lset: map[string]string{"a": "b"}}},
505			},
506		},
507		{
508			title: "Populate from single block. We expect the same samples at the output.",
509			inputSeriesSamples: [][]seriesSamples{
510				{
511					{
512						lset:   map[string]string{"a": "b"},
513						chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
514					},
515				},
516			},
517			expSeriesSamples: []seriesSamples{
518				{
519					lset:   map[string]string{"a": "b"},
520					chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
521				},
522			},
523		},
524		{
525			title: "Populate from two blocks.",
526			inputSeriesSamples: [][]seriesSamples{
527				{
528					{
529						lset:   map[string]string{"a": "b"},
530						chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
531					},
532					{
533						lset:   map[string]string{"a": "c"},
534						chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
535					},
536					{
537						// no-chunk series should be dropped.
538						lset: map[string]string{"a": "empty"},
539					},
540				},
541				{
542					{
543						lset:   map[string]string{"a": "b"},
544						chunks: [][]sample{{{t: 21}, {t: 30}}},
545					},
546					{
547						lset:   map[string]string{"a": "c"},
548						chunks: [][]sample{{{t: 40}, {t: 45}}},
549					},
550				},
551			},
552			expSeriesSamples: []seriesSamples{
553				{
554					lset:   map[string]string{"a": "b"},
555					chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
556				},
557				{
558					lset:   map[string]string{"a": "c"},
559					chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
560				},
561			},
562		},
563		{
564			title: "Populate from two blocks; chunks with negative time.",
565			inputSeriesSamples: [][]seriesSamples{
566				{
567					{
568						lset:   map[string]string{"a": "b"},
569						chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
570					},
571					{
572						lset:   map[string]string{"a": "c"},
573						chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}},
574					},
575					{
576						// no-chunk series should be dropped.
577						lset: map[string]string{"a": "empty"},
578					},
579				},
580				{
581					{
582						lset:   map[string]string{"a": "b"},
583						chunks: [][]sample{{{t: 21}, {t: 30}}},
584					},
585					{
586						lset:   map[string]string{"a": "c"},
587						chunks: [][]sample{{{t: 40}, {t: 45}}},
588					},
589				},
590			},
591			compactMinTime: -11,
592			expSeriesSamples: []seriesSamples{
593				{
594					lset:   map[string]string{"a": "b"},
595					chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
596				},
597				{
598					lset:   map[string]string{"a": "c"},
599					chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
600				},
601			},
602		},
603		{
604			title: "Populate from two blocks showing that order is maintained.",
605			inputSeriesSamples: [][]seriesSamples{
606				{
607					{
608						lset:   map[string]string{"a": "b"},
609						chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
610					},
611					{
612						lset:   map[string]string{"a": "c"},
613						chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}},
614					},
615				},
616				{
617					{
618						lset:   map[string]string{"a": "b"},
619						chunks: [][]sample{{{t: 21}, {t: 30}}},
620					},
621					{
622						lset:   map[string]string{"a": "c"},
623						chunks: [][]sample{{{t: 40}, {t: 45}}},
624					},
625				},
626			},
627			expSeriesSamples: []seriesSamples{
628				{
629					lset:   map[string]string{"a": "b"},
630					chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}},
631				},
632				{
633					lset:   map[string]string{"a": "c"},
634					chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}},
635				},
636			},
637		},
638		{
639			title: "Populate from two blocks showing that order of series is sorted.",
640			inputSeriesSamples: [][]seriesSamples{
641				{
642					{
643						lset:   map[string]string{"a": "4"},
644						chunks: [][]sample{{{t: 5}, {t: 7}}},
645					},
646					{
647						lset:   map[string]string{"a": "3"},
648						chunks: [][]sample{{{t: 5}, {t: 6}}},
649					},
650					{
651						lset:   map[string]string{"a": "same"},
652						chunks: [][]sample{{{t: 1}, {t: 4}}},
653					},
654				},
655				{
656					{
657						lset:   map[string]string{"a": "2"},
658						chunks: [][]sample{{{t: 1}, {t: 3}}},
659					},
660					{
661						lset:   map[string]string{"a": "1"},
662						chunks: [][]sample{{{t: 1}, {t: 2}}},
663					},
664					{
665						lset:   map[string]string{"a": "same"},
666						chunks: [][]sample{{{t: 5}, {t: 8}}},
667					},
668				},
669			},
670			expSeriesSamples: []seriesSamples{
671				{
672					lset:   map[string]string{"a": "1"},
673					chunks: [][]sample{{{t: 1}, {t: 2}}},
674				},
675				{
676					lset:   map[string]string{"a": "2"},
677					chunks: [][]sample{{{t: 1}, {t: 3}}},
678				},
679				{
680					lset:   map[string]string{"a": "3"},
681					chunks: [][]sample{{{t: 5}, {t: 6}}},
682				},
683				{
684					lset:   map[string]string{"a": "4"},
685					chunks: [][]sample{{{t: 5}, {t: 7}}},
686				},
687				{
688					lset:   map[string]string{"a": "same"},
689					chunks: [][]sample{{{t: 1}, {t: 4}}, {{t: 5}, {t: 8}}},
690				},
691			},
692		},
693		{
694			title: "Populate from two blocks 1:1 duplicated chunks; with negative timestamps.",
695			inputSeriesSamples: [][]seriesSamples{
696				{
697					{
698						lset:   map[string]string{"a": "1"},
699						chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}},
700					},
701					{
702						lset:   map[string]string{"a": "2"},
703						chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}},
704					},
705				},
706				{
707					{
708						lset:   map[string]string{"a": "1"},
709						chunks: [][]sample{{{t: 3}, {t: 4}}},
710					},
711					{
712						lset:   map[string]string{"a": "2"},
713						chunks: [][]sample{{{t: 1}, {t: 3}, {t: 4}}, {{t: 7}, {t: 8}}},
714					},
715				},
716			},
717			compactMinTime: -3,
718			expSeriesSamples: []seriesSamples{
719				{
720					lset:   map[string]string{"a": "1"},
721					chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}},
722				},
723				{
724					lset:   map[string]string{"a": "2"},
725					chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}, {{t: 7}, {t: 8}}},
726				},
727			},
728		},
729		{
730			// This should not happened because head block is making sure the chunks are not crossing block boundaries.
731			// We used to return error, but now chunk is trimmed.
732			title: "Populate from single block containing chunk outside of compact meta time range.",
733			inputSeriesSamples: [][]seriesSamples{
734				{
735					{
736						lset:   map[string]string{"a": "b"},
737						chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 30}}},
738					},
739				},
740			},
741			compactMinTime: 0,
742			compactMaxTime: 20,
743			expSeriesSamples: []seriesSamples{
744				{
745					lset:   map[string]string{"a": "b"},
746					chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}}},
747				},
748			},
749		},
750		{
751			// Introduced by https://github.com/prometheus/tsdb/issues/347. We used to return error, but now chunk is trimmed.
752			title: "Populate from single block containing extra chunk",
753			inputSeriesSamples: [][]seriesSamples{
754				{
755					{
756						lset:   map[string]string{"a": "issue347"},
757						chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}},
758					},
759				},
760			},
761			compactMinTime: 0,
762			compactMaxTime: 10,
763			expSeriesSamples: []seriesSamples{
764				{
765					lset:   map[string]string{"a": "issue347"},
766					chunks: [][]sample{{{t: 1}, {t: 2}}},
767				},
768			},
769		},
770		{
771			// Deduplication expected.
772			// Introduced by pull/370 and pull/539.
773			title: "Populate from two blocks containing duplicated chunk.",
774			inputSeriesSamples: [][]seriesSamples{
775				{
776					{
777						lset:   map[string]string{"a": "b"},
778						chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}},
779					},
780				},
781				{
782					{
783						lset:   map[string]string{"a": "b"},
784						chunks: [][]sample{{{t: 10}, {t: 20}}},
785					},
786				},
787			},
788			expSeriesSamples: []seriesSamples{
789				{
790					lset:   map[string]string{"a": "b"},
791					chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}},
792				},
793			},
794		},
795		{
796			// Introduced by https://github.com/prometheus/tsdb/pull/539.
797			title: "Populate from three overlapping blocks.",
798			inputSeriesSamples: [][]seriesSamples{
799				{
800					{
801						lset:   map[string]string{"a": "overlap-all"},
802						chunks: [][]sample{{{t: 19}, {t: 30}}},
803					},
804					{
805						lset:   map[string]string{"a": "overlap-beginning"},
806						chunks: [][]sample{{{t: 0}, {t: 5}}},
807					},
808					{
809						lset:   map[string]string{"a": "overlap-ending"},
810						chunks: [][]sample{{{t: 21}, {t: 30}}},
811					},
812				},
813				{
814					{
815						lset:   map[string]string{"a": "overlap-all"},
816						chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}},
817					},
818					{
819						lset:   map[string]string{"a": "overlap-beginning"},
820						chunks: [][]sample{{{t: 0}, {t: 10}, {t: 12}, {t: 20}}},
821					},
822					{
823						lset:   map[string]string{"a": "overlap-ending"},
824						chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}},
825					},
826				},
827				{
828					{
829						lset:   map[string]string{"a": "overlap-all"},
830						chunks: [][]sample{{{t: 27}, {t: 35}}},
831					},
832					{
833						lset:   map[string]string{"a": "overlap-ending"},
834						chunks: [][]sample{{{t: 27}, {t: 35}}},
835					},
836				},
837			},
838			expSeriesSamples: []seriesSamples{
839				{
840					lset:   map[string]string{"a": "overlap-all"},
841					chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 19}, {t: 20}, {t: 27}, {t: 30}, {t: 35}}},
842				},
843				{
844					lset:   map[string]string{"a": "overlap-beginning"},
845					chunks: [][]sample{{{t: 0}, {t: 5}, {t: 10}, {t: 12}, {t: 20}}},
846				},
847				{
848					lset:   map[string]string{"a": "overlap-ending"},
849					chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}, {{t: 21}, {t: 27}, {t: 30}, {t: 35}}},
850				},
851			},
852		},
853		{
854			title: "Populate from three partially overlapping blocks with few full chunks.",
855			inputSeriesSamples: [][]seriesSamples{
856				{
857					{
858						lset:   map[string]string{"a": "1", "b": "1"},
859						chunks: samplesForRange(0, 659, 120), // 5 chunks and half.
860					},
861					{
862						lset:   map[string]string{"a": "1", "b": "2"},
863						chunks: samplesForRange(0, 659, 120),
864					},
865				},
866				{
867					{
868						lset:   map[string]string{"a": "1", "b": "2"},
869						chunks: samplesForRange(480, 1199, 120), // two chunks overlapping with previous, two non overlapping and two overlapping with next block.
870					},
871					{
872						lset:   map[string]string{"a": "1", "b": "3"},
873						chunks: samplesForRange(480, 1199, 120),
874					},
875				},
876				{
877					{
878						lset:   map[string]string{"a": "1", "b": "2"},
879						chunks: samplesForRange(960, 1499, 120), // 5 chunks and half.
880					},
881					{
882						lset:   map[string]string{"a": "1", "b": "4"},
883						chunks: samplesForRange(960, 1499, 120),
884					},
885				},
886			},
887			expSeriesSamples: []seriesSamples{
888				{
889					lset:   map[string]string{"a": "1", "b": "1"},
890					chunks: samplesForRange(0, 659, 120),
891				},
892				{
893					lset:   map[string]string{"a": "1", "b": "2"},
894					chunks: samplesForRange(0, 1499, 120),
895				},
896				{
897					lset:   map[string]string{"a": "1", "b": "3"},
898					chunks: samplesForRange(480, 1199, 120),
899				},
900				{
901					lset:   map[string]string{"a": "1", "b": "4"},
902					chunks: samplesForRange(960, 1499, 120),
903				},
904			},
905		},
906		{
907			title: "Populate from three partially overlapping blocks with chunks that are expected to merge into single big chunks.",
908			inputSeriesSamples: [][]seriesSamples{
909				{
910					{
911						lset:   map[string]string{"a": "1", "b": "2"},
912						chunks: [][]sample{{{t: 0}, {t: 6902464}}, {{t: 6961968}, {t: 7080976}}},
913					},
914				},
915				{
916					{
917						lset:   map[string]string{"a": "1", "b": "2"},
918						chunks: [][]sample{{{t: 3600000}, {t: 13953696}}, {{t: 14042952}, {t: 14221464}}},
919					},
920				},
921				{
922					{
923						lset:   map[string]string{"a": "1", "b": "2"},
924						chunks: [][]sample{{{t: 10800000}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}},
925					},
926				},
927			},
928			expSeriesSamples: []seriesSamples{
929				{
930					lset:   map[string]string{"a": "1", "b": "2"},
931					chunks: [][]sample{{{t: 0}, {t: 3600000}, {t: 6902464}, {t: 6961968}, {t: 7080976}, {t: 10800000}, {t: 13953696}, {t: 14042952}, {t: 14221464}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}},
932				},
933			},
934		},
935	} {
936		t.Run(tc.title, func(t *testing.T) {
937			blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
938			for _, b := range tc.inputSeriesSamples {
939				ir, cr, mint, maxt := createIdxChkReaders(t, b)
940				blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
941			}
942
943			c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
944			require.NoError(t, err)
945
946			meta := &BlockMeta{
947				MinTime: tc.compactMinTime,
948				MaxTime: tc.compactMaxTime,
949			}
950			if meta.MaxTime == 0 {
951				meta.MaxTime = math.MaxInt64
952			}
953
954			iw := &mockIndexWriter{}
955			err = c.populateBlock(blocks, meta, iw, nopChunkWriter{})
956			if tc.expErr != nil {
957				require.Error(t, err)
958				require.Equal(t, tc.expErr.Error(), err.Error())
959				return
960			}
961			require.NoError(t, err)
962
963			// Check if response is expected and chunk is valid.
964			var raw []seriesSamples
965			for _, s := range iw.seriesChunks {
966				ss := seriesSamples{lset: s.l.Map()}
967				var iter chunkenc.Iterator
968				for _, chk := range s.chunks {
969					var (
970						samples       = make([]sample, 0, chk.Chunk.NumSamples())
971						iter          = chk.Chunk.Iterator(iter)
972						firstTs int64 = math.MaxInt64
973						s       sample
974					)
975					for iter.Next() {
976						s.t, s.v = iter.At()
977						if firstTs == math.MaxInt64 {
978							firstTs = s.t
979						}
980						samples = append(samples, s)
981					}
982
983					// Check if chunk has correct min, max times.
984					require.Equal(t, firstTs, chk.MinTime, "chunk Meta %v does not match the first encoded sample timestamp: %v", chk, firstTs)
985					require.Equal(t, s.t, chk.MaxTime, "chunk Meta %v does not match the last encoded sample timestamp %v", chk, s.t)
986
987					require.NoError(t, iter.Err())
988					ss.chunks = append(ss.chunks, samples)
989				}
990				raw = append(raw, ss)
991			}
992			require.Equal(t, tc.expSeriesSamples, raw)
993
994			// Check if stats are calculated properly.
995			s := BlockStats{NumSeries: uint64(len(tc.expSeriesSamples))}
996			for _, series := range tc.expSeriesSamples {
997				s.NumChunks += uint64(len(series.chunks))
998				for _, chk := range series.chunks {
999					s.NumSamples += uint64(len(chk))
1000				}
1001			}
1002			require.Equal(t, s, meta.Stats)
1003		})
1004	}
1005}
1006
1007func BenchmarkCompaction(b *testing.B) {
1008	cases := []struct {
1009		ranges         [][2]int64
1010		compactionType string
1011	}{
1012		{
1013			ranges:         [][2]int64{{0, 100}, {200, 300}, {400, 500}, {600, 700}},
1014			compactionType: "normal",
1015		},
1016		{
1017			ranges:         [][2]int64{{0, 1000}, {2000, 3000}, {4000, 5000}, {6000, 7000}},
1018			compactionType: "normal",
1019		},
1020		{
1021			ranges:         [][2]int64{{0, 2000}, {3000, 5000}, {6000, 8000}, {9000, 11000}},
1022			compactionType: "normal",
1023		},
1024		{
1025			ranges:         [][2]int64{{0, 5000}, {6000, 11000}, {12000, 17000}, {18000, 23000}},
1026			compactionType: "normal",
1027		},
1028		// 40% overlaps.
1029		{
1030			ranges:         [][2]int64{{0, 100}, {60, 160}, {120, 220}, {180, 280}},
1031			compactionType: "vertical",
1032		},
1033		{
1034			ranges:         [][2]int64{{0, 1000}, {600, 1600}, {1200, 2200}, {1800, 2800}},
1035			compactionType: "vertical",
1036		},
1037		{
1038			ranges:         [][2]int64{{0, 2000}, {1200, 3200}, {2400, 4400}, {3600, 5600}},
1039			compactionType: "vertical",
1040		},
1041		{
1042			ranges:         [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}},
1043			compactionType: "vertical",
1044		},
1045	}
1046
1047	nSeries := 10000
1048	for _, c := range cases {
1049		nBlocks := len(c.ranges)
1050		b.Run(fmt.Sprintf("type=%s,blocks=%d,series=%d,samplesPerSeriesPerBlock=%d", c.compactionType, nBlocks, nSeries, c.ranges[0][1]-c.ranges[0][0]+1), func(b *testing.B) {
1051			dir, err := ioutil.TempDir("", "bench_compaction")
1052			require.NoError(b, err)
1053			defer func() {
1054				require.NoError(b, os.RemoveAll(dir))
1055			}()
1056			blockDirs := make([]string, 0, len(c.ranges))
1057			var blocks []*Block
1058			for _, r := range c.ranges {
1059				block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil)
1060				require.NoError(b, err)
1061				blocks = append(blocks, block)
1062				defer func() {
1063					require.NoError(b, block.Close())
1064				}()
1065				blockDirs = append(blockDirs, block.Dir())
1066			}
1067
1068			c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
1069			require.NoError(b, err)
1070
1071			b.ResetTimer()
1072			b.ReportAllocs()
1073			for i := 0; i < b.N; i++ {
1074				_, err = c.Compact(dir, blockDirs, blocks)
1075				require.NoError(b, err)
1076			}
1077		})
1078	}
1079}
1080
1081func BenchmarkCompactionFromHead(b *testing.B) {
1082	dir, err := ioutil.TempDir("", "bench_compaction_from_head")
1083	require.NoError(b, err)
1084	defer func() {
1085		require.NoError(b, os.RemoveAll(dir))
1086	}()
1087	totalSeries := 100000
1088	for labelNames := 1; labelNames < totalSeries; labelNames *= 10 {
1089		labelValues := totalSeries / labelNames
1090		b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) {
1091			chunkDir, err := ioutil.TempDir("", "chunk_dir")
1092			require.NoError(b, err)
1093			defer func() {
1094				require.NoError(b, os.RemoveAll(chunkDir))
1095			}()
1096			opts := DefaultHeadOptions()
1097			opts.ChunkRange = 1000
1098			opts.ChunkDirRoot = chunkDir
1099			h, err := NewHead(nil, nil, nil, opts, nil)
1100			require.NoError(b, err)
1101			for ln := 0; ln < labelNames; ln++ {
1102				app := h.Appender(context.Background())
1103				for lv := 0; lv < labelValues; lv++ {
1104					app.Append(0, labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0)
1105				}
1106				require.NoError(b, app.Commit())
1107			}
1108
1109			b.ResetTimer()
1110			b.ReportAllocs()
1111			for i := 0; i < b.N; i++ {
1112				createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h)
1113			}
1114			h.Close()
1115		})
1116	}
1117}
1118
1119// TestDisableAutoCompactions checks that we can
1120// disable and enable the auto compaction.
1121// This is needed for unit tests that rely on
1122// checking state before and after a compaction.
1123func TestDisableAutoCompactions(t *testing.T) {
1124	db := openTestDB(t, nil, nil)
1125	defer func() {
1126		require.NoError(t, db.Close())
1127	}()
1128
1129	blockRange := db.compactor.(*LeveledCompactor).ranges[0]
1130	label := labels.FromStrings("foo", "bar")
1131
1132	// Trigger a compaction to check that it was skipped and
1133	// no new blocks were created when compaction is disabled.
1134	db.DisableCompactions()
1135	app := db.Appender(context.Background())
1136	for i := int64(0); i < 3; i++ {
1137		_, err := app.Append(0, label, i*blockRange, 0)
1138		require.NoError(t, err)
1139		_, err = app.Append(0, label, i*blockRange+1000, 0)
1140		require.NoError(t, err)
1141	}
1142	require.NoError(t, app.Commit())
1143
1144	select {
1145	case db.compactc <- struct{}{}:
1146	default:
1147	}
1148
1149	for x := 0; x < 10; x++ {
1150		if prom_testutil.ToFloat64(db.metrics.compactionsSkipped) > 0.0 {
1151			break
1152		}
1153		time.Sleep(10 * time.Millisecond)
1154	}
1155
1156	require.Greater(t, prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 0.0, "No compaction was skipped after the set timeout.")
1157	require.Equal(t, 0, len(db.blocks))
1158
1159	// Enable the compaction, trigger it and check that the block is persisted.
1160	db.EnableCompactions()
1161	select {
1162	case db.compactc <- struct{}{}:
1163	default:
1164	}
1165	for x := 0; x < 100; x++ {
1166		if len(db.Blocks()) > 0 {
1167			break
1168		}
1169		time.Sleep(100 * time.Millisecond)
1170	}
1171	require.Greater(t, len(db.Blocks()), 0, "No block was persisted after the set timeout.")
1172}
1173
1174// TestCancelCompactions ensures that when the db is closed
1175// any running compaction is cancelled to unblock closing the db.
1176func TestCancelCompactions(t *testing.T) {
1177	tmpdir, err := ioutil.TempDir("", "testCancelCompaction")
1178	require.NoError(t, err)
1179	defer func() {
1180		require.NoError(t, os.RemoveAll(tmpdir))
1181	}()
1182
1183	// Create some blocks to fall within the compaction range.
1184	createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000))
1185	createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000))
1186	createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one.
1187
1188	// Copy the db so we have an exact copy to compare compaction times.
1189	tmpdirCopy := tmpdir + "Copy"
1190	err = fileutil.CopyDirs(tmpdir, tmpdirCopy)
1191	require.NoError(t, err)
1192	defer func() {
1193		require.NoError(t, os.RemoveAll(tmpdirCopy))
1194	}()
1195
1196	// Measure the compaction time without interrupting it.
1197	var timeCompactionUninterrupted time.Duration
1198	{
1199		db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
1200		require.NoError(t, err)
1201		require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
1202		require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
1203		db.compactc <- struct{}{} // Trigger a compaction.
1204		var start time.Time
1205		for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
1206			time.Sleep(3 * time.Millisecond)
1207		}
1208		start = time.Now()
1209
1210		for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 {
1211			time.Sleep(3 * time.Millisecond)
1212		}
1213		timeCompactionUninterrupted = time.Since(start)
1214
1215		require.NoError(t, db.Close())
1216	}
1217	// Measure the compaction time when closing the db in the middle of compaction.
1218	{
1219		db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil)
1220		require.NoError(t, err)
1221		require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch")
1222		require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch")
1223		db.compactc <- struct{}{} // Trigger a compaction.
1224		dbClosed := make(chan struct{})
1225
1226		for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 {
1227			time.Sleep(3 * time.Millisecond)
1228		}
1229		go func() {
1230			require.NoError(t, db.Close())
1231			close(dbClosed)
1232		}()
1233
1234		start := time.Now()
1235		<-dbClosed
1236		actT := time.Since(start)
1237		expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time.
1238		require.True(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT)
1239	}
1240}
1241
1242// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction
1243// deletes the resulting block to avoid creatings blocks with the same time range.
1244func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
1245	tests := map[string]func(*DB) int{
1246		"Test Head Compaction": func(db *DB) int {
1247			rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 - 1
1248			defaultLabel := labels.FromStrings("foo", "bar")
1249
1250			// Add some data to the head that is enough to trigger a compaction.
1251			app := db.Appender(context.Background())
1252			_, err := app.Append(0, defaultLabel, 1, 0)
1253			require.NoError(t, err)
1254			_, err = app.Append(0, defaultLabel, 2, 0)
1255			require.NoError(t, err)
1256			_, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0)
1257			require.NoError(t, err)
1258			require.NoError(t, app.Commit())
1259
1260			return 0
1261		},
1262		"Test Block Compaction": func(db *DB) int {
1263			blocks := []*BlockMeta{
1264				{MinTime: 0, MaxTime: 100},
1265				{MinTime: 100, MaxTime: 150},
1266				{MinTime: 150, MaxTime: 200},
1267			}
1268			for _, m := range blocks {
1269				createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
1270			}
1271			require.NoError(t, db.reload())
1272			require.Equal(t, len(blocks), len(db.Blocks()), "unexpected block count after a reloadBlocks")
1273
1274			return len(blocks)
1275		},
1276	}
1277
1278	for title, bootStrap := range tests {
1279		t.Run(title, func(t *testing.T) {
1280			db := openTestDB(t, nil, []int64{1, 100})
1281			defer func() {
1282				require.NoError(t, db.Close())
1283			}()
1284			db.DisableCompactions()
1285
1286			expBlocks := bootStrap(db)
1287
1288			// Create a block that will trigger the reloadBlocks to fail.
1289			blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300))
1290			lastBlockIndex := path.Join(blockPath, indexFilename)
1291			actBlocks, err := blockDirs(db.Dir())
1292			require.NoError(t, err)
1293			require.Equal(t, expBlocks, len(actBlocks)-1)    // -1 to exclude the corrupted block.
1294			require.NoError(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file.
1295
1296			require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reloadBlocks' count metrics mismatch")
1297			require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch")
1298			require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch")
1299
1300			// Do the compaction and check the metrics.
1301			// Compaction should succeed, but the reloadBlocks should fail and
1302			// the new block created from the compaction should be deleted.
1303			require.Error(t, db.Compact())
1304			require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch")
1305			require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch")
1306			require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch")
1307
1308			actBlocks, err = blockDirs(db.Dir())
1309			require.NoError(t, err)
1310			require.Equal(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block.
1311		})
1312	}
1313}
1314