1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"encoding/binary"
18	"fmt"
19	"hash/crc32"
20	"io/ioutil"
21	"math"
22	"math/rand"
23	"os"
24	"path"
25	"path/filepath"
26	"sort"
27	"strconv"
28	"sync"
29	"testing"
30	"time"
31
32	"github.com/prometheus/prometheus/tsdb/fileutil"
33
34	"github.com/go-kit/kit/log"
35	"github.com/oklog/ulid"
36	"github.com/pkg/errors"
37	"github.com/prometheus/client_golang/prometheus"
38	prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
39	"github.com/prometheus/prometheus/pkg/labels"
40	"github.com/prometheus/prometheus/tsdb/chunks"
41	"github.com/prometheus/prometheus/tsdb/index"
42	"github.com/prometheus/prometheus/tsdb/record"
43	"github.com/prometheus/prometheus/tsdb/tombstones"
44	"github.com/prometheus/prometheus/tsdb/tsdbutil"
45	"github.com/prometheus/prometheus/tsdb/wal"
46	"github.com/prometheus/prometheus/util/testutil"
47)
48
49func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
50	tmpdir, err := ioutil.TempDir("", "test")
51	testutil.Ok(t, err)
52
53	db, err = Open(tmpdir, nil, nil, opts)
54	testutil.Ok(t, err)
55
56	// Do not close the test database by default as it will deadlock on test failures.
57	return db, func() {
58		testutil.Ok(t, os.RemoveAll(tmpdir))
59	}
60}
61
62// query runs a matcher query against the querier and fully expands its data.
63func query(t testing.TB, q Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample {
64	ss, err := q.Select(matchers...)
65	defer func() {
66		testutil.Ok(t, q.Close())
67	}()
68	testutil.Ok(t, err)
69
70	result := map[string][]tsdbutil.Sample{}
71
72	for ss.Next() {
73		series := ss.At()
74
75		samples := []tsdbutil.Sample{}
76		it := series.Iterator()
77		for it.Next() {
78			t, v := it.At()
79			samples = append(samples, sample{t: t, v: v})
80		}
81		testutil.Ok(t, it.Err())
82
83		name := series.Labels().String()
84		result[name] = samples
85	}
86	testutil.Ok(t, ss.Err())
87
88	return result
89}
90
91// Ensure that blocks are held in memory in their time order
92// and not in ULID order as they are read from the directory.
93func TestDB_reloadOrder(t *testing.T) {
94	db, delete := openTestDB(t, nil)
95	defer func() {
96		testutil.Ok(t, db.Close())
97		delete()
98	}()
99
100	metas := []BlockMeta{
101		{MinTime: 90, MaxTime: 100},
102		{MinTime: 70, MaxTime: 80},
103		{MinTime: 100, MaxTime: 110},
104	}
105	for _, m := range metas {
106		createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
107	}
108
109	testutil.Ok(t, db.reload())
110	blocks := db.Blocks()
111	testutil.Equals(t, 3, len(blocks))
112	testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
113	testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
114	testutil.Equals(t, metas[0].MinTime, blocks[1].Meta().MinTime)
115	testutil.Equals(t, metas[0].MaxTime, blocks[1].Meta().MaxTime)
116	testutil.Equals(t, metas[2].MinTime, blocks[2].Meta().MinTime)
117	testutil.Equals(t, metas[2].MaxTime, blocks[2].Meta().MaxTime)
118}
119
120func TestDataAvailableOnlyAfterCommit(t *testing.T) {
121	db, delete := openTestDB(t, nil)
122	defer func() {
123		testutil.Ok(t, db.Close())
124		delete()
125	}()
126
127	app := db.Appender()
128
129	_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
130	testutil.Ok(t, err)
131
132	querier, err := db.Querier(0, 1)
133	testutil.Ok(t, err)
134	seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
135	testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
136
137	err = app.Commit()
138	testutil.Ok(t, err)
139
140	querier, err = db.Querier(0, 1)
141	testutil.Ok(t, err)
142	defer querier.Close()
143
144	seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
145
146	testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
147}
148
149func TestDataNotAvailableAfterRollback(t *testing.T) {
150	db, delete := openTestDB(t, nil)
151	defer func() {
152		testutil.Ok(t, db.Close())
153		delete()
154	}()
155
156	app := db.Appender()
157	_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
158	testutil.Ok(t, err)
159
160	err = app.Rollback()
161	testutil.Ok(t, err)
162
163	querier, err := db.Querier(0, 1)
164	testutil.Ok(t, err)
165	defer querier.Close()
166
167	seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
168
169	testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
170}
171
172func TestDBAppenderAddRef(t *testing.T) {
173	db, delete := openTestDB(t, nil)
174	defer func() {
175		testutil.Ok(t, db.Close())
176		delete()
177	}()
178
179	app1 := db.Appender()
180
181	ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
182	testutil.Ok(t, err)
183
184	// Reference should already work before commit.
185	err = app1.AddFast(ref1, 124, 1)
186	testutil.Ok(t, err)
187
188	err = app1.Commit()
189	testutil.Ok(t, err)
190
191	app2 := db.Appender()
192
193	// first ref should already work in next transaction.
194	err = app2.AddFast(ref1, 125, 0)
195	testutil.Ok(t, err)
196
197	ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1)
198	testutil.Ok(t, err)
199
200	testutil.Assert(t, ref1 == ref2, "")
201
202	// Reference must be valid to add another sample.
203	err = app2.AddFast(ref2, 143, 2)
204	testutil.Ok(t, err)
205
206	err = app2.AddFast(9999999, 1, 1)
207	testutil.Equals(t, ErrNotFound, errors.Cause(err))
208
209	testutil.Ok(t, app2.Commit())
210
211	q, err := db.Querier(0, 200)
212	testutil.Ok(t, err)
213
214	res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
215
216	testutil.Equals(t, map[string][]tsdbutil.Sample{
217		labels.FromStrings("a", "b").String(): {
218			sample{t: 123, v: 0},
219			sample{t: 124, v: 1},
220			sample{t: 125, v: 0},
221			sample{t: 133, v: 1},
222			sample{t: 143, v: 2},
223		},
224	}, res)
225}
226
227func TestAppendEmptyLabelsIgnored(t *testing.T) {
228	db, delete := openTestDB(t, nil)
229	defer func() {
230		testutil.Ok(t, db.Close())
231		delete()
232	}()
233
234	app1 := db.Appender()
235
236	ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
237	testutil.Ok(t, err)
238
239	// Construct labels manually so there is an empty label.
240	ref2, err := app1.Add(labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0)
241	testutil.Ok(t, err)
242
243	// Should be the same series.
244	testutil.Equals(t, ref1, ref2)
245
246	err = app1.Commit()
247	testutil.Ok(t, err)
248}
249
250func TestDeleteSimple(t *testing.T) {
251	numSamples := int64(10)
252
253	cases := []struct {
254		Intervals tombstones.Intervals
255		remaint   []int64
256	}{
257		{
258			Intervals: tombstones.Intervals{{Mint: 0, Maxt: 3}},
259			remaint:   []int64{4, 5, 6, 7, 8, 9},
260		},
261		{
262			Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}},
263			remaint:   []int64{0, 4, 5, 6, 7, 8, 9},
264		},
265		{
266			Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
267			remaint:   []int64{0, 8, 9},
268		},
269		{
270			Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}},
271			remaint:   []int64{0},
272		},
273		{ // This case is to ensure that labels and symbols are deleted.
274			Intervals: tombstones.Intervals{{Mint: 0, Maxt: 9}},
275			remaint:   []int64{},
276		},
277	}
278
279Outer:
280	for _, c := range cases {
281		db, delete := openTestDB(t, nil)
282		defer func() {
283			testutil.Ok(t, db.Close())
284			delete()
285		}()
286
287		app := db.Appender()
288
289		smpls := make([]float64, numSamples)
290		for i := int64(0); i < numSamples; i++ {
291			smpls[i] = rand.Float64()
292			app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
293		}
294
295		testutil.Ok(t, app.Commit())
296
297		// TODO(gouthamve): Reset the tombstones somehow.
298		// Delete the ranges.
299		for _, r := range c.Intervals {
300			testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
301		}
302
303		// Compare the result.
304		q, err := db.Querier(0, numSamples)
305		testutil.Ok(t, err)
306
307		res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
308		testutil.Ok(t, err)
309
310		expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
311		for _, ts := range c.remaint {
312			expSamples = append(expSamples, sample{ts, smpls[ts]})
313		}
314
315		expss := newMockSeriesSet([]Series{
316			newSeries(map[string]string{"a": "b"}, expSamples),
317		})
318
319		lns, err := q.LabelNames()
320		testutil.Ok(t, err)
321		lvs, err := q.LabelValues("a")
322		testutil.Ok(t, err)
323		if len(expSamples) == 0 {
324			testutil.Equals(t, 0, len(lns))
325			testutil.Equals(t, 0, len(lvs))
326			testutil.Assert(t, res.Next() == false, "")
327			continue
328		} else {
329			testutil.Equals(t, 1, len(lns))
330			testutil.Equals(t, 1, len(lvs))
331			testutil.Equals(t, "a", lns[0])
332			testutil.Equals(t, "b", lvs[0])
333		}
334
335		for {
336			eok, rok := expss.Next(), res.Next()
337			testutil.Equals(t, eok, rok)
338
339			if !eok {
340				continue Outer
341			}
342			sexp := expss.At()
343			sres := res.At()
344
345			testutil.Equals(t, sexp.Labels(), sres.Labels())
346
347			smplExp, errExp := expandSeriesIterator(sexp.Iterator())
348			smplRes, errRes := expandSeriesIterator(sres.Iterator())
349
350			testutil.Equals(t, errExp, errRes)
351			testutil.Equals(t, smplExp, smplRes)
352		}
353	}
354}
355
356func TestAmendDatapointCausesError(t *testing.T) {
357	db, delete := openTestDB(t, nil)
358	defer func() {
359		testutil.Ok(t, db.Close())
360		delete()
361	}()
362
363	app := db.Appender()
364	_, err := app.Add(labels.Labels{}, 0, 0)
365	testutil.Ok(t, err)
366	testutil.Ok(t, app.Commit())
367
368	app = db.Appender()
369	_, err = app.Add(labels.Labels{}, 0, 1)
370	testutil.Equals(t, ErrAmendSample, err)
371	testutil.Ok(t, app.Rollback())
372}
373
374func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
375	db, delete := openTestDB(t, nil)
376	defer func() {
377		testutil.Ok(t, db.Close())
378		delete()
379	}()
380
381	app := db.Appender()
382	_, err := app.Add(labels.Labels{}, 0, math.NaN())
383	testutil.Ok(t, err)
384	testutil.Ok(t, app.Commit())
385
386	app = db.Appender()
387	_, err = app.Add(labels.Labels{}, 0, math.NaN())
388	testutil.Ok(t, err)
389}
390
391func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
392	db, delete := openTestDB(t, nil)
393	defer func() {
394		testutil.Ok(t, db.Close())
395		delete()
396	}()
397	app := db.Appender()
398	_, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
399	testutil.Ok(t, err)
400	testutil.Ok(t, app.Commit())
401
402	app = db.Appender()
403	_, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
404	testutil.Equals(t, ErrAmendSample, err)
405}
406
407func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
408	db, delete := openTestDB(t, nil)
409	defer func() {
410		testutil.Ok(t, db.Close())
411		delete()
412	}()
413
414	// Append AmendedValue.
415	app := db.Appender()
416	_, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
417	testutil.Ok(t, err)
418	_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
419	testutil.Ok(t, err)
420	testutil.Ok(t, app.Commit())
421
422	// Make sure the right value is stored.
423	q, err := db.Querier(0, 10)
424	testutil.Ok(t, err)
425
426	ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
427
428	testutil.Equals(t, map[string][]tsdbutil.Sample{
429		labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}},
430	}, ssMap)
431
432	// Append Out of Order Value.
433	app = db.Appender()
434	_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
435	testutil.Ok(t, err)
436	_, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
437	testutil.Ok(t, err)
438	testutil.Ok(t, app.Commit())
439
440	q, err = db.Querier(0, 10)
441	testutil.Ok(t, err)
442
443	ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
444
445	testutil.Equals(t, map[string][]tsdbutil.Sample{
446		labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}, sample{10, 3}},
447	}, ssMap)
448}
449
450func TestDB_Snapshot(t *testing.T) {
451	db, delete := openTestDB(t, nil)
452	defer delete()
453
454	// append data
455	app := db.Appender()
456	mint := int64(1414141414000)
457	for i := 0; i < 1000; i++ {
458		_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
459		testutil.Ok(t, err)
460	}
461	testutil.Ok(t, app.Commit())
462	testutil.Ok(t, app.Rollback())
463
464	// create snapshot
465	snap, err := ioutil.TempDir("", "snap")
466	testutil.Ok(t, err)
467
468	defer func() {
469		testutil.Ok(t, os.RemoveAll(snap))
470	}()
471	testutil.Ok(t, db.Snapshot(snap, true))
472	testutil.Ok(t, db.Close())
473
474	// reopen DB from snapshot
475	db, err = Open(snap, nil, nil, nil)
476	testutil.Ok(t, err)
477	defer func() { testutil.Ok(t, db.Close()) }()
478
479	querier, err := db.Querier(mint, mint+1000)
480	testutil.Ok(t, err)
481	defer func() { testutil.Ok(t, querier.Close()) }()
482
483	// sum values
484	seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
485	testutil.Ok(t, err)
486
487	sum := 0.0
488	for seriesSet.Next() {
489		series := seriesSet.At().Iterator()
490		for series.Next() {
491			_, v := series.At()
492			sum += v
493		}
494		testutil.Ok(t, series.Err())
495	}
496	testutil.Ok(t, seriesSet.Err())
497	testutil.Equals(t, 1000.0, sum)
498}
499
500// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
501// that are outside the set block time range.
502// See https://github.com/prometheus/prometheus/issues/5105
503func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
504	db, delete := openTestDB(t, nil)
505	defer delete()
506
507	app := db.Appender()
508	mint := int64(1414141414000)
509	for i := 0; i < 1000; i++ {
510		_, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
511		testutil.Ok(t, err)
512	}
513	testutil.Ok(t, app.Commit())
514	testutil.Ok(t, app.Rollback())
515
516	snap, err := ioutil.TempDir("", "snap")
517	testutil.Ok(t, err)
518
519	// Hackingly introduce "race", by having lower max time then maxTime in last chunk.
520	db.head.maxTime = db.head.maxTime - 10
521
522	defer func() {
523		testutil.Ok(t, os.RemoveAll(snap))
524	}()
525	testutil.Ok(t, db.Snapshot(snap, true))
526	testutil.Ok(t, db.Close())
527
528	// Reopen DB from snapshot.
529	db, err = Open(snap, nil, nil, nil)
530	testutil.Ok(t, err)
531	defer func() { testutil.Ok(t, db.Close()) }()
532
533	querier, err := db.Querier(mint, mint+1000)
534	testutil.Ok(t, err)
535	defer func() { testutil.Ok(t, querier.Close()) }()
536
537	// Sum values.
538	seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
539	testutil.Ok(t, err)
540
541	sum := 0.0
542	for seriesSet.Next() {
543		series := seriesSet.At().Iterator()
544		for series.Next() {
545			_, v := series.At()
546			sum += v
547		}
548		testutil.Ok(t, series.Err())
549	}
550	testutil.Ok(t, seriesSet.Err())
551
552	// Since we snapshotted with MaxTime - 10, so expect 10 less samples.
553	testutil.Equals(t, 1000.0-10, sum)
554}
555
556func TestDB_SnapshotWithDelete(t *testing.T) {
557	numSamples := int64(10)
558
559	db, delete := openTestDB(t, nil)
560	defer delete()
561
562	app := db.Appender()
563
564	smpls := make([]float64, numSamples)
565	for i := int64(0); i < numSamples; i++ {
566		smpls[i] = rand.Float64()
567		app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
568	}
569
570	testutil.Ok(t, app.Commit())
571	cases := []struct {
572		intervals tombstones.Intervals
573		remaint   []int64
574	}{
575		{
576			intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
577			remaint:   []int64{0, 8, 9},
578		},
579	}
580
581Outer:
582	for _, c := range cases {
583		// TODO(gouthamve): Reset the tombstones somehow.
584		// Delete the ranges.
585		for _, r := range c.intervals {
586			testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
587		}
588
589		// create snapshot
590		snap, err := ioutil.TempDir("", "snap")
591		testutil.Ok(t, err)
592
593		defer func() {
594			testutil.Ok(t, os.RemoveAll(snap))
595		}()
596		testutil.Ok(t, db.Snapshot(snap, true))
597		testutil.Ok(t, db.Close())
598
599		// reopen DB from snapshot
600		db, err = Open(snap, nil, nil, nil)
601		testutil.Ok(t, err)
602		defer func() { testutil.Ok(t, db.Close()) }()
603
604		// Compare the result.
605		q, err := db.Querier(0, numSamples)
606		testutil.Ok(t, err)
607		defer func() { testutil.Ok(t, q.Close()) }()
608
609		res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
610		testutil.Ok(t, err)
611
612		expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
613		for _, ts := range c.remaint {
614			expSamples = append(expSamples, sample{ts, smpls[ts]})
615		}
616
617		expss := newMockSeriesSet([]Series{
618			newSeries(map[string]string{"a": "b"}, expSamples),
619		})
620
621		if len(expSamples) == 0 {
622			testutil.Assert(t, res.Next() == false, "")
623			continue
624		}
625
626		for {
627			eok, rok := expss.Next(), res.Next()
628			testutil.Equals(t, eok, rok)
629
630			if !eok {
631				continue Outer
632			}
633			sexp := expss.At()
634			sres := res.At()
635
636			testutil.Equals(t, sexp.Labels(), sres.Labels())
637
638			smplExp, errExp := expandSeriesIterator(sexp.Iterator())
639			smplRes, errRes := expandSeriesIterator(sres.Iterator())
640
641			testutil.Equals(t, errExp, errRes)
642			testutil.Equals(t, smplExp, smplRes)
643		}
644	}
645}
646
647func TestDB_e2e(t *testing.T) {
648	const (
649		numDatapoints = 1000
650		numRanges     = 1000
651		timeInterval  = int64(3)
652	)
653	// Create 8 series with 1000 data-points of different ranges and run queries.
654	lbls := []labels.Labels{
655		{
656			{Name: "a", Value: "b"},
657			{Name: "instance", Value: "localhost:9090"},
658			{Name: "job", Value: "prometheus"},
659		},
660		{
661			{Name: "a", Value: "b"},
662			{Name: "instance", Value: "127.0.0.1:9090"},
663			{Name: "job", Value: "prometheus"},
664		},
665		{
666			{Name: "a", Value: "b"},
667			{Name: "instance", Value: "127.0.0.1:9090"},
668			{Name: "job", Value: "prom-k8s"},
669		},
670		{
671			{Name: "a", Value: "b"},
672			{Name: "instance", Value: "localhost:9090"},
673			{Name: "job", Value: "prom-k8s"},
674		},
675		{
676			{Name: "a", Value: "c"},
677			{Name: "instance", Value: "localhost:9090"},
678			{Name: "job", Value: "prometheus"},
679		},
680		{
681			{Name: "a", Value: "c"},
682			{Name: "instance", Value: "127.0.0.1:9090"},
683			{Name: "job", Value: "prometheus"},
684		},
685		{
686			{Name: "a", Value: "c"},
687			{Name: "instance", Value: "127.0.0.1:9090"},
688			{Name: "job", Value: "prom-k8s"},
689		},
690		{
691			{Name: "a", Value: "c"},
692			{Name: "instance", Value: "localhost:9090"},
693			{Name: "job", Value: "prom-k8s"},
694		},
695	}
696
697	seriesMap := map[string][]tsdbutil.Sample{}
698	for _, l := range lbls {
699		seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
700	}
701
702	db, delete := openTestDB(t, nil)
703	defer func() {
704		testutil.Ok(t, db.Close())
705		delete()
706	}()
707
708	app := db.Appender()
709
710	for _, l := range lbls {
711		lset := labels.New(l...)
712		series := []tsdbutil.Sample{}
713
714		ts := rand.Int63n(300)
715		for i := 0; i < numDatapoints; i++ {
716			v := rand.Float64()
717
718			series = append(series, sample{ts, v})
719
720			_, err := app.Add(lset, ts, v)
721			testutil.Ok(t, err)
722
723			ts += rand.Int63n(timeInterval) + 1
724		}
725
726		seriesMap[lset.String()] = series
727	}
728
729	testutil.Ok(t, app.Commit())
730
731	// Query each selector on 1000 random time-ranges.
732	queries := []struct {
733		ms []*labels.Matcher
734	}{
735		{
736			ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")},
737		},
738		{
739			ms: []*labels.Matcher{
740				labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
741				labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"),
742			},
743		},
744		{
745			ms: []*labels.Matcher{
746				labels.MustNewMatcher(labels.MatchEqual, "a", "c"),
747				labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"),
748				labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
749			},
750		},
751		// TODO: Add Regexp Matchers.
752	}
753
754	for _, qry := range queries {
755		matched := labels.Slice{}
756		for _, ls := range lbls {
757			s := labels.Selector(qry.ms)
758			if s.Matches(ls) {
759				matched = append(matched, ls)
760			}
761		}
762
763		sort.Sort(matched)
764
765		for i := 0; i < numRanges; i++ {
766			mint := rand.Int63n(300)
767			maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints))
768
769			expected := map[string][]tsdbutil.Sample{}
770
771			// Build the mockSeriesSet.
772			for _, m := range matched {
773				smpls := boundedSamples(seriesMap[m.String()], mint, maxt)
774				if len(smpls) > 0 {
775					expected[m.String()] = smpls
776				}
777			}
778
779			q, err := db.Querier(mint, maxt)
780			testutil.Ok(t, err)
781
782			ss, err := q.Select(qry.ms...)
783			testutil.Ok(t, err)
784
785			result := map[string][]tsdbutil.Sample{}
786
787			for ss.Next() {
788				x := ss.At()
789
790				smpls, err := expandSeriesIterator(x.Iterator())
791				testutil.Ok(t, err)
792
793				if len(smpls) > 0 {
794					result[x.Labels().String()] = smpls
795				}
796			}
797
798			testutil.Ok(t, ss.Err())
799			testutil.Equals(t, expected, result)
800
801			q.Close()
802		}
803	}
804}
805
806func TestWALFlushedOnDBClose(t *testing.T) {
807	db, delete := openTestDB(t, nil)
808	defer delete()
809
810	dirDb := db.Dir()
811
812	lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
813
814	app := db.Appender()
815	_, err := app.Add(lbls, 0, 1)
816	testutil.Ok(t, err)
817	testutil.Ok(t, app.Commit())
818
819	testutil.Ok(t, db.Close())
820
821	db, err = Open(dirDb, nil, nil, nil)
822	testutil.Ok(t, err)
823	defer func() { testutil.Ok(t, db.Close()) }()
824
825	q, err := db.Querier(0, 1)
826	testutil.Ok(t, err)
827
828	values, err := q.LabelValues("labelname")
829	testutil.Ok(t, err)
830	testutil.Equals(t, []string{"labelvalue"}, values)
831}
832
833func TestWALSegmentSizeOptions(t *testing.T) {
834	tests := map[int]func(dbdir string, segmentSize int){
835		// Default Wal Size.
836		0: func(dbDir string, segmentSize int) {
837			files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
838			testutil.Ok(t, err)
839			for _, f := range files[:len(files)-1] {
840				testutil.Equals(t, int64(DefaultOptions.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
841			}
842			lastFile := files[len(files)-1]
843			testutil.Assert(t, int64(DefaultOptions.WALSegmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
844		},
845		// Custom Wal Size.
846		2 * 32 * 1024: func(dbDir string, segmentSize int) {
847			files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
848			testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
849			testutil.Ok(t, err)
850			for _, f := range files[:len(files)-1] {
851				testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
852			}
853			lastFile := files[len(files)-1]
854			testutil.Assert(t, int64(segmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
855		},
856		// Wal disabled.
857		-1: func(dbDir string, segmentSize int) {
858			if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) {
859				t.Fatal("wal directory is present when the wal is disabled")
860			}
861		},
862	}
863	for segmentSize, testFunc := range tests {
864		t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) {
865			options := *DefaultOptions
866			options.WALSegmentSize = segmentSize
867			db, delete := openTestDB(t, &options)
868			defer delete()
869			app := db.Appender()
870			for i := int64(0); i < 155; i++ {
871				_, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
872				testutil.Ok(t, err)
873				testutil.Ok(t, app.Commit())
874			}
875
876			dbDir := db.Dir()
877			db.Close()
878			testFunc(dbDir, options.WALSegmentSize)
879		})
880	}
881}
882
883func TestTombstoneClean(t *testing.T) {
884	numSamples := int64(10)
885
886	db, delete := openTestDB(t, nil)
887	defer delete()
888
889	app := db.Appender()
890
891	smpls := make([]float64, numSamples)
892	for i := int64(0); i < numSamples; i++ {
893		smpls[i] = rand.Float64()
894		app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
895	}
896
897	testutil.Ok(t, app.Commit())
898	cases := []struct {
899		intervals tombstones.Intervals
900		remaint   []int64
901	}{
902		{
903			intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
904			remaint:   []int64{0, 8, 9},
905		},
906	}
907
908	for _, c := range cases {
909		// Delete the ranges.
910
911		// create snapshot
912		snap, err := ioutil.TempDir("", "snap")
913		testutil.Ok(t, err)
914
915		defer func() {
916			testutil.Ok(t, os.RemoveAll(snap))
917		}()
918		testutil.Ok(t, db.Snapshot(snap, true))
919		testutil.Ok(t, db.Close())
920
921		// reopen DB from snapshot
922		db, err = Open(snap, nil, nil, nil)
923		testutil.Ok(t, err)
924		defer db.Close()
925
926		for _, r := range c.intervals {
927			testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
928		}
929
930		// All of the setup for THIS line.
931		testutil.Ok(t, db.CleanTombstones())
932
933		// Compare the result.
934		q, err := db.Querier(0, numSamples)
935		testutil.Ok(t, err)
936		defer q.Close()
937
938		res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
939		testutil.Ok(t, err)
940
941		expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
942		for _, ts := range c.remaint {
943			expSamples = append(expSamples, sample{ts, smpls[ts]})
944		}
945
946		expss := newMockSeriesSet([]Series{
947			newSeries(map[string]string{"a": "b"}, expSamples),
948		})
949
950		if len(expSamples) == 0 {
951			testutil.Assert(t, res.Next() == false, "")
952			continue
953		}
954
955		for {
956			eok, rok := expss.Next(), res.Next()
957			testutil.Equals(t, eok, rok)
958
959			if !eok {
960				break
961			}
962			sexp := expss.At()
963			sres := res.At()
964
965			testutil.Equals(t, sexp.Labels(), sres.Labels())
966
967			smplExp, errExp := expandSeriesIterator(sexp.Iterator())
968			smplRes, errRes := expandSeriesIterator(sres.Iterator())
969
970			testutil.Equals(t, errExp, errRes)
971			testutil.Equals(t, smplExp, smplRes)
972		}
973
974		for _, b := range db.Blocks() {
975			testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
976		}
977	}
978}
979
980// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
981// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
982// if TombstoneClean leaves any blocks behind these will overlap.
983func TestTombstoneCleanFail(t *testing.T) {
984
985	db, delete := openTestDB(t, nil)
986	defer func() {
987		testutil.Ok(t, db.Close())
988		delete()
989	}()
990
991	var expectedBlockDirs []string
992
993	// Create some empty blocks pending for compaction.
994	// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
995	totalBlocks := 2
996	for i := 0; i < totalBlocks; i++ {
997		blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
998		block, err := OpenBlock(nil, blockDir, nil)
999		testutil.Ok(t, err)
1000		// Add some some fake tombstones to trigger the compaction.
1001		tomb := tombstones.NewMemTombstones()
1002		tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1})
1003		block.tombstones = tomb
1004
1005		db.blocks = append(db.blocks, block)
1006		expectedBlockDirs = append(expectedBlockDirs, blockDir)
1007	}
1008
1009	// Initialize the mockCompactorFailing with a room for a single compaction iteration.
1010	// mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
1011	db.compactor = &mockCompactorFailing{
1012		t:      t,
1013		blocks: db.blocks,
1014		max:    totalBlocks + 1,
1015	}
1016
1017	// The compactor should trigger a failure here.
1018	testutil.NotOk(t, db.CleanTombstones())
1019
1020	// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
1021	actualBlockDirs, err := blockDirs(db.dir)
1022	testutil.Ok(t, err)
1023	testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
1024}
1025
1026// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
1027type mockCompactorFailing struct {
1028	t      *testing.T
1029	blocks []*Block
1030	max    int
1031}
1032
1033func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
1034	return nil, nil
1035}
1036func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
1037	if len(c.blocks) >= c.max {
1038		return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
1039	}
1040
1041	block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
1042	testutil.Ok(c.t, err)
1043	testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
1044	c.blocks = append(c.blocks, block)
1045
1046	// Now check that all expected blocks are actually persisted on disk.
1047	// This way we make sure that the we have some blocks that are supposed to be removed.
1048	var expectedBlocks []string
1049	for _, b := range c.blocks {
1050		expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
1051	}
1052	actualBlockDirs, err := blockDirs(dest)
1053	testutil.Ok(c.t, err)
1054
1055	testutil.Equals(c.t, expectedBlocks, actualBlockDirs)
1056
1057	return block.Meta().ULID, nil
1058}
1059
1060func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) {
1061	return ulid.ULID{}, nil
1062
1063}
1064
1065func TestTimeRetention(t *testing.T) {
1066	db, delete := openTestDB(t, &Options{
1067		BlockRanges: []int64{1000},
1068	})
1069	defer func() {
1070		testutil.Ok(t, db.Close())
1071		delete()
1072	}()
1073
1074	blocks := []*BlockMeta{
1075		{MinTime: 500, MaxTime: 900}, // Oldest block
1076		{MinTime: 1000, MaxTime: 1500},
1077		{MinTime: 1500, MaxTime: 2000}, // Newest Block
1078	}
1079
1080	for _, m := range blocks {
1081		createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime))
1082	}
1083
1084	testutil.Ok(t, db.reload())                       // Reload the db to register the new blocks.
1085	testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
1086
1087	db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime)
1088	testutil.Ok(t, db.reload())
1089
1090	expBlocks := blocks[1:]
1091	actBlocks := db.Blocks()
1092
1093	testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch")
1094	testutil.Equals(t, len(expBlocks), len(actBlocks))
1095	testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime)
1096	testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime)
1097}
1098
1099func TestSizeRetention(t *testing.T) {
1100	db, delete := openTestDB(t, &Options{
1101		BlockRanges: []int64{100},
1102	})
1103	defer func() {
1104		testutil.Ok(t, db.Close())
1105		delete()
1106	}()
1107
1108	blocks := []*BlockMeta{
1109		{MinTime: 100, MaxTime: 200}, // Oldest block
1110		{MinTime: 200, MaxTime: 300},
1111		{MinTime: 300, MaxTime: 400},
1112		{MinTime: 400, MaxTime: 500},
1113		{MinTime: 500, MaxTime: 600}, // Newest Block
1114	}
1115
1116	for _, m := range blocks {
1117		createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime))
1118	}
1119
1120	headBlocks := []*BlockMeta{
1121		{MinTime: 700, MaxTime: 800},
1122	}
1123
1124	// Add some data to the WAL.
1125	headApp := db.Head().Appender()
1126	for _, m := range headBlocks {
1127		series := genSeries(100, 10, m.MinTime, m.MaxTime)
1128		for _, s := range series {
1129			it := s.Iterator()
1130			for it.Next() {
1131				tim, v := it.At()
1132				_, err := headApp.Add(s.Labels(), tim, v)
1133				testutil.Ok(t, err)
1134			}
1135			testutil.Ok(t, it.Err())
1136		}
1137	}
1138	testutil.Ok(t, headApp.Commit())
1139
1140	// Test that registered size matches the actual disk size.
1141	testutil.Ok(t, db.reload())                                         // Reload the db to register the new db size.
1142	testutil.Equals(t, len(blocks), len(db.Blocks()))                   // Ensure all blocks are registered.
1143	blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
1144	walSize, err := db.Head().wal.Size()
1145	testutil.Ok(t, err)
1146	// Expected size should take into account block size + WAL size
1147	expSize := blockSize + walSize
1148	actSize, err := fileutil.DirSize(db.Dir())
1149	testutil.Ok(t, err)
1150	testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
1151
1152	// Create a WAL checkpoint, and compare sizes.
1153	first, last, err := db.Head().wal.Segments()
1154	testutil.Ok(t, err)
1155	_, err = wal.Checkpoint(db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0)
1156	testutil.Ok(t, err)
1157	blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
1158	walSize, err = db.Head().wal.Size()
1159	testutil.Ok(t, err)
1160	expSize = blockSize + walSize
1161	actSize, err = fileutil.DirSize(db.Dir())
1162	testutil.Ok(t, err)
1163	testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
1164
1165	// Decrease the max bytes limit so that a delete is triggered.
1166	// Check total size, total count and check that the oldest block was deleted.
1167	firstBlockSize := db.Blocks()[0].Size()
1168	sizeLimit := actSize - firstBlockSize
1169	db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
1170	testutil.Ok(t, db.reload())  // Reload the db to register the new db size.
1171
1172	expBlocks := blocks[1:]
1173	actBlocks := db.Blocks()
1174	blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
1175	walSize, err = db.Head().wal.Size()
1176	testutil.Ok(t, err)
1177	// Expected size should take into account block size + WAL size
1178	expSize = blockSize + walSize
1179	actRetentionCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
1180	actSize, err = fileutil.DirSize(db.Dir())
1181	testutil.Ok(t, err)
1182
1183	testutil.Equals(t, 1, actRetentionCount, "metric retention count mismatch")
1184	testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
1185	testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit)
1186	testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1)
1187	testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block")
1188	testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block")
1189}
1190
1191func TestSizeRetentionMetric(t *testing.T) {
1192	cases := []struct {
1193		maxBytes    int64
1194		expMaxBytes int64
1195	}{
1196		{maxBytes: 1000, expMaxBytes: 1000},
1197		{maxBytes: 0, expMaxBytes: 0},
1198		{maxBytes: -1000, expMaxBytes: 0},
1199	}
1200
1201	for _, c := range cases {
1202		db, delete := openTestDB(t, &Options{
1203			BlockRanges: []int64{100},
1204			MaxBytes:    c.maxBytes,
1205		})
1206
1207		actMaxBytes := int64(prom_testutil.ToFloat64(db.metrics.maxBytes))
1208		testutil.Equals(t, actMaxBytes, c.expMaxBytes, "metric retention limit bytes mismatch")
1209
1210		testutil.Ok(t, db.Close())
1211		delete()
1212	}
1213}
1214
1215func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
1216	db, delete := openTestDB(t, nil)
1217	defer func() {
1218		testutil.Ok(t, db.Close())
1219		delete()
1220	}()
1221
1222	labelpairs := []labels.Labels{
1223		labels.FromStrings("a", "abcd", "b", "abcde"),
1224		labels.FromStrings("labelname", "labelvalue"),
1225	}
1226
1227	app := db.Appender()
1228	for _, lbls := range labelpairs {
1229		_, err := app.Add(lbls, 0, 1)
1230		testutil.Ok(t, err)
1231	}
1232	testutil.Ok(t, app.Commit())
1233
1234	cases := []struct {
1235		selector labels.Selector
1236		series   []labels.Labels
1237	}{{
1238		selector: labels.Selector{
1239			labels.MustNewMatcher(labels.MatchNotEqual, "lname", "lvalue"),
1240		},
1241		series: labelpairs,
1242	}, {
1243		selector: labels.Selector{
1244			labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"),
1245			labels.MustNewMatcher(labels.MatchNotEqual, "b", "abcde"),
1246		},
1247		series: []labels.Labels{},
1248	}, {
1249		selector: labels.Selector{
1250			labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"),
1251			labels.MustNewMatcher(labels.MatchNotEqual, "b", "abc"),
1252		},
1253		series: []labels.Labels{labelpairs[0]},
1254	}, {
1255		selector: labels.Selector{
1256			labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abd.*"),
1257		},
1258		series: labelpairs,
1259	}, {
1260		selector: labels.Selector{
1261			labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abc.*"),
1262		},
1263		series: labelpairs[1:],
1264	}, {
1265		selector: labels.Selector{
1266			labels.MustNewMatcher(labels.MatchNotRegexp, "c", "abd.*"),
1267		},
1268		series: labelpairs,
1269	}, {
1270		selector: labels.Selector{
1271			labels.MustNewMatcher(labels.MatchNotRegexp, "labelname", "labelvalue"),
1272		},
1273		series: labelpairs[:1],
1274	}}
1275
1276	q, err := db.Querier(0, 10)
1277	testutil.Ok(t, err)
1278	defer func() { testutil.Ok(t, q.Close()) }()
1279
1280	for _, c := range cases {
1281		ss, err := q.Select(c.selector...)
1282		testutil.Ok(t, err)
1283
1284		lres, err := expandSeriesSet(ss)
1285		testutil.Ok(t, err)
1286
1287		testutil.Equals(t, c.series, lres)
1288	}
1289}
1290
1291func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {
1292	result := []labels.Labels{}
1293	for ss.Next() {
1294		result = append(result, ss.At().Labels())
1295	}
1296
1297	return result, ss.Err()
1298}
1299
1300func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
1301	// Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm
1302	// will handle that.
1303	var metas = make([]BlockMeta, 11)
1304	for i := 10; i >= 0; i-- {
1305		metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)}
1306	}
1307
1308	testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps")
1309
1310	// Add overlapping blocks. We've to establish order again since we aren't interested
1311	// in trivial overlaps caused by unorderedness.
1312	add := func(ms ...BlockMeta) []BlockMeta {
1313		repl := append(append([]BlockMeta{}, metas...), ms...)
1314		sort.Slice(repl, func(i, j int) bool {
1315			return repl[i].MinTime < repl[j].MinTime
1316		})
1317		return repl
1318	}
1319
1320	// o1 overlaps with 10-20.
1321	o1 := BlockMeta{MinTime: 15, MaxTime: 17}
1322	testutil.Equals(t, Overlaps{
1323		{Min: 15, Max: 17}: {metas[1], o1},
1324	}, OverlappingBlocks(add(o1)))
1325
1326	// o2 overlaps with 20-30 and 30-40.
1327	o2 := BlockMeta{MinTime: 21, MaxTime: 31}
1328	testutil.Equals(t, Overlaps{
1329		{Min: 21, Max: 30}: {metas[2], o2},
1330		{Min: 30, Max: 31}: {o2, metas[3]},
1331	}, OverlappingBlocks(add(o2)))
1332
1333	// o3a and o3b overlaps with 30-40 and each other.
1334	o3a := BlockMeta{MinTime: 33, MaxTime: 39}
1335	o3b := BlockMeta{MinTime: 34, MaxTime: 36}
1336	testutil.Equals(t, Overlaps{
1337		{Min: 34, Max: 36}: {metas[3], o3a, o3b},
1338	}, OverlappingBlocks(add(o3a, o3b)))
1339
1340	// o4 is 1:1 overlap with 50-60.
1341	o4 := BlockMeta{MinTime: 50, MaxTime: 60}
1342	testutil.Equals(t, Overlaps{
1343		{Min: 50, Max: 60}: {metas[5], o4},
1344	}, OverlappingBlocks(add(o4)))
1345
1346	// o5 overlaps with 60-70, 70-80 and 80-90.
1347	o5 := BlockMeta{MinTime: 61, MaxTime: 85}
1348	testutil.Equals(t, Overlaps{
1349		{Min: 61, Max: 70}: {metas[6], o5},
1350		{Min: 70, Max: 80}: {o5, metas[7]},
1351		{Min: 80, Max: 85}: {o5, metas[8]},
1352	}, OverlappingBlocks(add(o5)))
1353
1354	// o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
1355	o6a := BlockMeta{MinTime: 92, MaxTime: 105}
1356	o6b := BlockMeta{MinTime: 94, MaxTime: 99}
1357	testutil.Equals(t, Overlaps{
1358		{Min: 94, Max: 99}:   {metas[9], o6a, o6b},
1359		{Min: 100, Max: 105}: {o6a, metas[10]},
1360	}, OverlappingBlocks(add(o6a, o6b)))
1361
1362	// All together.
1363	testutil.Equals(t, Overlaps{
1364		{Min: 15, Max: 17}: {metas[1], o1},
1365		{Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]},
1366		{Min: 34, Max: 36}: {metas[3], o3a, o3b},
1367		{Min: 50, Max: 60}: {metas[5], o4},
1368		{Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]},
1369		{Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]},
1370	}, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b)))
1371
1372	// Additional case.
1373	var nc1 []BlockMeta
1374	nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5})
1375	nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
1376	nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
1377	nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
1378	nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
1379	nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 6})
1380	nc1 = append(nc1, BlockMeta{MinTime: 3, MaxTime: 5})
1381	nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7})
1382	nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10})
1383	nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9})
1384	testutil.Equals(t, Overlaps{
1385		{Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6
1386		{Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]},                         // 1-5, 2-6, 3-5
1387		{Min: 5, Max: 6}: {nc1[5], nc1[7]},                                 // 2-6, 5-7
1388		{Min: 8, Max: 9}: {nc1[8], nc1[9]},                                 // 7-10, 8-9
1389	}, OverlappingBlocks(nc1))
1390}
1391
1392// Regression test for https://github.com/prometheus/prometheus/tsdb/issues/347
1393func TestChunkAtBlockBoundary(t *testing.T) {
1394	db, delete := openTestDB(t, nil)
1395	defer func() {
1396		testutil.Ok(t, db.Close())
1397		delete()
1398	}()
1399
1400	app := db.Appender()
1401
1402	blockRange := DefaultOptions.BlockRanges[0]
1403	label := labels.FromStrings("foo", "bar")
1404
1405	for i := int64(0); i < 3; i++ {
1406		_, err := app.Add(label, i*blockRange, 0)
1407		testutil.Ok(t, err)
1408		_, err = app.Add(label, i*blockRange+1000, 0)
1409		testutil.Ok(t, err)
1410	}
1411
1412	err := app.Commit()
1413	testutil.Ok(t, err)
1414
1415	err = db.compact()
1416	testutil.Ok(t, err)
1417
1418	for _, block := range db.Blocks() {
1419		r, err := block.Index()
1420		testutil.Ok(t, err)
1421		defer r.Close()
1422
1423		meta := block.Meta()
1424
1425		k, v := index.AllPostingsKey()
1426		p, err := r.Postings(k, v)
1427		testutil.Ok(t, err)
1428
1429		var (
1430			lset labels.Labels
1431			chks []chunks.Meta
1432		)
1433
1434		chunkCount := 0
1435
1436		for p.Next() {
1437			err = r.Series(p.At(), &lset, &chks)
1438			testutil.Ok(t, err)
1439			for _, c := range chks {
1440				testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
1441					"chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
1442					meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
1443				chunkCount++
1444			}
1445		}
1446		testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
1447	}
1448}
1449
1450func TestQuerierWithBoundaryChunks(t *testing.T) {
1451	db, delete := openTestDB(t, nil)
1452	defer func() {
1453		testutil.Ok(t, db.Close())
1454		delete()
1455	}()
1456
1457	app := db.Appender()
1458
1459	blockRange := DefaultOptions.BlockRanges[0]
1460	label := labels.FromStrings("foo", "bar")
1461
1462	for i := int64(0); i < 5; i++ {
1463		_, err := app.Add(label, i*blockRange, 0)
1464		testutil.Ok(t, err)
1465	}
1466
1467	err := app.Commit()
1468	testutil.Ok(t, err)
1469
1470	err = db.compact()
1471	testutil.Ok(t, err)
1472
1473	testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")
1474
1475	q, err := db.Querier(blockRange, 2*blockRange)
1476	testutil.Ok(t, err)
1477	defer q.Close()
1478
1479	// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
1480	count := len(q.(*querier).blocks)
1481	testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
1482}
1483
1484// TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
1485// 	- no blocks no WAL: set to the time of the first  appended sample
1486// 	- no blocks with WAL: set to the smallest sample from the WAL
1487//	- with blocks no WAL: set to the last block maxT
1488// 	- with blocks with WAL: same as above
1489func TestInitializeHeadTimestamp(t *testing.T) {
1490	t.Run("clean", func(t *testing.T) {
1491		dir, err := ioutil.TempDir("", "test_head_init")
1492		testutil.Ok(t, err)
1493		defer func() {
1494			testutil.Ok(t, os.RemoveAll(dir))
1495		}()
1496
1497		db, err := Open(dir, nil, nil, nil)
1498		testutil.Ok(t, err)
1499		defer db.Close()
1500
1501		// Should be set to init values if no WAL or blocks exist so far.
1502		testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
1503		testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
1504
1505		// First added sample initializes the writable range.
1506		app := db.Appender()
1507		_, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
1508		testutil.Ok(t, err)
1509
1510		testutil.Equals(t, int64(1000), db.head.MinTime())
1511		testutil.Equals(t, int64(1000), db.head.MaxTime())
1512	})
1513	t.Run("wal-only", func(t *testing.T) {
1514		dir, err := ioutil.TempDir("", "test_head_init")
1515		testutil.Ok(t, err)
1516		defer func() {
1517			testutil.Ok(t, os.RemoveAll(dir))
1518		}()
1519
1520		testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1521		w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
1522		testutil.Ok(t, err)
1523
1524		var enc record.Encoder
1525		err = w.Log(
1526			enc.Series([]record.RefSeries{
1527				{Ref: 123, Labels: labels.FromStrings("a", "1")},
1528				{Ref: 124, Labels: labels.FromStrings("a", "2")},
1529			}, nil),
1530			enc.Samples([]record.RefSample{
1531				{Ref: 123, T: 5000, V: 1},
1532				{Ref: 124, T: 15000, V: 1},
1533			}, nil),
1534		)
1535		testutil.Ok(t, err)
1536		testutil.Ok(t, w.Close())
1537
1538		db, err := Open(dir, nil, nil, nil)
1539		testutil.Ok(t, err)
1540		defer db.Close()
1541
1542		testutil.Equals(t, int64(5000), db.head.MinTime())
1543		testutil.Equals(t, int64(15000), db.head.MaxTime())
1544	})
1545	t.Run("existing-block", func(t *testing.T) {
1546		dir, err := ioutil.TempDir("", "test_head_init")
1547		testutil.Ok(t, err)
1548		defer func() {
1549			testutil.Ok(t, os.RemoveAll(dir))
1550		}()
1551
1552		createBlock(t, dir, genSeries(1, 1, 1000, 2000))
1553
1554		db, err := Open(dir, nil, nil, nil)
1555		testutil.Ok(t, err)
1556		defer db.Close()
1557
1558		testutil.Equals(t, int64(2000), db.head.MinTime())
1559		testutil.Equals(t, int64(2000), db.head.MaxTime())
1560	})
1561	t.Run("existing-block-and-wal", func(t *testing.T) {
1562		dir, err := ioutil.TempDir("", "test_head_init")
1563		testutil.Ok(t, err)
1564		defer func() {
1565			testutil.Ok(t, os.RemoveAll(dir))
1566		}()
1567
1568		createBlock(t, dir, genSeries(1, 1, 1000, 6000))
1569
1570		testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1571		w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
1572		testutil.Ok(t, err)
1573
1574		var enc record.Encoder
1575		err = w.Log(
1576			enc.Series([]record.RefSeries{
1577				{Ref: 123, Labels: labels.FromStrings("a", "1")},
1578				{Ref: 124, Labels: labels.FromStrings("a", "2")},
1579			}, nil),
1580			enc.Samples([]record.RefSample{
1581				{Ref: 123, T: 5000, V: 1},
1582				{Ref: 124, T: 15000, V: 1},
1583			}, nil),
1584		)
1585		testutil.Ok(t, err)
1586		testutil.Ok(t, w.Close())
1587
1588		r := prometheus.NewRegistry()
1589
1590		db, err := Open(dir, nil, r, nil)
1591		testutil.Ok(t, err)
1592		defer db.Close()
1593
1594		testutil.Equals(t, int64(6000), db.head.MinTime())
1595		testutil.Equals(t, int64(15000), db.head.MaxTime())
1596		// Check that old series has been GCed.
1597		testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series))
1598	})
1599}
1600
1601func TestNoEmptyBlocks(t *testing.T) {
1602	db, delete := openTestDB(t, &Options{
1603		BlockRanges: []int64{100},
1604	})
1605	defer func() {
1606		testutil.Ok(t, db.Close())
1607		delete()
1608	}()
1609	db.DisableCompactions()
1610
1611	rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
1612	defaultLabel := labels.FromStrings("foo", "bar")
1613	defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*")
1614
1615	t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
1616		testutil.Ok(t, db.compact())
1617		actBlocks, err := blockDirs(db.Dir())
1618		testutil.Ok(t, err)
1619		testutil.Equals(t, len(db.Blocks()), len(actBlocks))
1620		testutil.Equals(t, 0, len(actBlocks))
1621		testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
1622	})
1623
1624	t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
1625		app := db.Appender()
1626		_, err := app.Add(defaultLabel, 1, 0)
1627		testutil.Ok(t, err)
1628		_, err = app.Add(defaultLabel, 2, 0)
1629		testutil.Ok(t, err)
1630		_, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
1631		testutil.Ok(t, err)
1632		testutil.Ok(t, app.Commit())
1633		testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
1634		testutil.Ok(t, db.compact())
1635		testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
1636
1637		actBlocks, err := blockDirs(db.Dir())
1638		testutil.Ok(t, err)
1639		testutil.Equals(t, len(db.Blocks()), len(actBlocks))
1640		testutil.Equals(t, 0, len(actBlocks))
1641
1642		app = db.Appender()
1643		_, err = app.Add(defaultLabel, 1, 0)
1644		testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
1645
1646		// Adding new blocks.
1647		currentTime := db.Head().MaxTime()
1648		_, err = app.Add(defaultLabel, currentTime, 0)
1649		testutil.Ok(t, err)
1650		_, err = app.Add(defaultLabel, currentTime+1, 0)
1651		testutil.Ok(t, err)
1652		_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
1653		testutil.Ok(t, err)
1654		testutil.Ok(t, app.Commit())
1655
1656		testutil.Ok(t, db.compact())
1657		testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
1658		actBlocks, err = blockDirs(db.Dir())
1659		testutil.Ok(t, err)
1660		testutil.Equals(t, len(db.Blocks()), len(actBlocks))
1661		testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
1662	})
1663
1664	t.Run(`When no new block is created from head, and there are some blocks on disk
1665	compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
1666		oldBlocks := db.Blocks()
1667		app := db.Appender()
1668		currentTime := db.Head().MaxTime()
1669		_, err := app.Add(defaultLabel, currentTime, 0)
1670		testutil.Ok(t, err)
1671		_, err = app.Add(defaultLabel, currentTime+1, 0)
1672		testutil.Ok(t, err)
1673		_, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
1674		testutil.Ok(t, err)
1675		testutil.Ok(t, app.Commit())
1676		testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
1677		testutil.Ok(t, db.compact())
1678		testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
1679		testutil.Equals(t, oldBlocks, db.Blocks())
1680	})
1681
1682	t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
1683		currentTime := db.Head().MaxTime()
1684		blocks := []*BlockMeta{
1685			{MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]},
1686			{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
1687		}
1688		for _, m := range blocks {
1689			createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime))
1690		}
1691
1692		oldBlocks := db.Blocks()
1693		testutil.Ok(t, db.reload())                                      // Reload the db to register the new blocks.
1694		testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
1695		testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
1696		testutil.Ok(t, db.compact())
1697		testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones")
1698
1699		actBlocks, err := blockDirs(db.Dir())
1700		testutil.Ok(t, err)
1701		testutil.Equals(t, len(db.Blocks()), len(actBlocks))
1702		testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
1703	})
1704}
1705
1706func TestDB_LabelNames(t *testing.T) {
1707	tests := []struct {
1708		// Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
1709		// -> Add 'sampleLabels2' -> Test Head+Disk
1710
1711		sampleLabels1 [][2]string // For checking head and disk separately.
1712		// To test Head+Disk, sampleLabels2 should have
1713		// at least 1 unique label name which is not in sampleLabels1.
1714		sampleLabels2 [][2]string // // For checking head and disk together.
1715		exp1          []string    // after adding sampleLabels1.
1716		exp2          []string    // after adding sampleLabels1 and sampleLabels2.
1717	}{
1718		{
1719			sampleLabels1: [][2]string{
1720				{"name1", "1"},
1721				{"name3", "3"},
1722				{"name2", "2"},
1723			},
1724			sampleLabels2: [][2]string{
1725				{"name4", "4"},
1726				{"name1", "1"},
1727			},
1728			exp1: []string{"name1", "name2", "name3"},
1729			exp2: []string{"name1", "name2", "name3", "name4"},
1730		},
1731		{
1732			sampleLabels1: [][2]string{
1733				{"name2", "2"},
1734				{"name1", "1"},
1735				{"name2", "2"},
1736			},
1737			sampleLabels2: [][2]string{
1738				{"name6", "6"},
1739				{"name0", "0"},
1740			},
1741			exp1: []string{"name1", "name2"},
1742			exp2: []string{"name0", "name1", "name2", "name6"},
1743		},
1744	}
1745
1746	blockRange := DefaultOptions.BlockRanges[0]
1747	// Appends samples into the database.
1748	appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) {
1749		t.Helper()
1750		app := db.Appender()
1751		for i := mint; i <= maxt; i++ {
1752			for _, tuple := range sampleLabels {
1753				label := labels.FromStrings(tuple[0], tuple[1])
1754				_, err := app.Add(label, i*blockRange, 0)
1755				testutil.Ok(t, err)
1756			}
1757		}
1758		err := app.Commit()
1759		testutil.Ok(t, err)
1760	}
1761	for _, tst := range tests {
1762		db, delete := openTestDB(t, nil)
1763		defer func() {
1764			testutil.Ok(t, db.Close())
1765			delete()
1766		}()
1767
1768		appendSamples(db, 0, 4, tst.sampleLabels1)
1769
1770		// Testing head.
1771		headIndexr, err := db.head.Index()
1772		testutil.Ok(t, err)
1773		labelNames, err := headIndexr.LabelNames()
1774		testutil.Ok(t, err)
1775		testutil.Equals(t, tst.exp1, labelNames)
1776		testutil.Ok(t, headIndexr.Close())
1777
1778		// Testing disk.
1779		err = db.compact()
1780		testutil.Ok(t, err)
1781		// All blocks have same label names, hence check them individually.
1782		// No need to aggregate and check.
1783		for _, b := range db.Blocks() {
1784			blockIndexr, err := b.Index()
1785			testutil.Ok(t, err)
1786			labelNames, err = blockIndexr.LabelNames()
1787			testutil.Ok(t, err)
1788			testutil.Equals(t, tst.exp1, labelNames)
1789			testutil.Ok(t, blockIndexr.Close())
1790		}
1791
1792		// Adding more samples to head with new label names
1793		// so that we can test (head+disk).LabelNames() (the union).
1794		appendSamples(db, 5, 9, tst.sampleLabels2)
1795
1796		// Testing DB (union).
1797		q, err := db.Querier(math.MinInt64, math.MaxInt64)
1798		testutil.Ok(t, err)
1799		labelNames, err = q.LabelNames()
1800		testutil.Ok(t, err)
1801		testutil.Ok(t, q.Close())
1802		testutil.Equals(t, tst.exp2, labelNames)
1803	}
1804}
1805
1806func TestCorrectNumTombstones(t *testing.T) {
1807	db, delete := openTestDB(t, nil)
1808	defer func() {
1809		testutil.Ok(t, db.Close())
1810		delete()
1811	}()
1812
1813	blockRange := DefaultOptions.BlockRanges[0]
1814	defaultLabel := labels.FromStrings("foo", "bar")
1815	defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value)
1816
1817	app := db.Appender()
1818	for i := int64(0); i < 3; i++ {
1819		for j := int64(0); j < 15; j++ {
1820			_, err := app.Add(defaultLabel, i*blockRange+j, 0)
1821			testutil.Ok(t, err)
1822		}
1823	}
1824	testutil.Ok(t, app.Commit())
1825
1826	err := db.compact()
1827	testutil.Ok(t, err)
1828	testutil.Equals(t, 1, len(db.blocks))
1829
1830	testutil.Ok(t, db.Delete(0, 1, defaultMatcher))
1831	testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
1832
1833	// {0, 1} and {2, 3} are merged to form 1 tombstone.
1834	testutil.Ok(t, db.Delete(2, 3, defaultMatcher))
1835	testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
1836
1837	testutil.Ok(t, db.Delete(5, 6, defaultMatcher))
1838	testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)
1839
1840	testutil.Ok(t, db.Delete(9, 11, defaultMatcher))
1841	testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
1842}
1843
1844func TestVerticalCompaction(t *testing.T) {
1845	cases := []struct {
1846		blockSeries          [][]Series
1847		expSeries            map[string][]tsdbutil.Sample
1848		expBlockNum          int
1849		expOverlappingBlocks int
1850	}{
1851		// Case 0
1852		// |--------------|
1853		//        |----------------|
1854		{
1855			blockSeries: [][]Series{
1856				{
1857					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1858						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1859						sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
1860					}),
1861				},
1862				{
1863					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1864						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1865						sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
1866						sample{12, 99}, sample{13, 99}, sample{14, 99},
1867					}),
1868				},
1869			},
1870			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
1871				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
1872				sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1873				sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
1874				sample{12, 99}, sample{13, 99}, sample{14, 99},
1875			}},
1876			expBlockNum:          1,
1877			expOverlappingBlocks: 1,
1878		},
1879		// Case 1
1880		// |-------------------------------|
1881		//        |----------------|
1882		{
1883			blockSeries: [][]Series{
1884				{
1885					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1886						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1887						sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
1888						sample{11, 0}, sample{13, 0}, sample{17, 0},
1889					}),
1890				},
1891				{
1892					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1893						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1894						sample{8, 99}, sample{9, 99}, sample{10, 99},
1895					}),
1896				},
1897			},
1898			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
1899				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
1900				sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1901				sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0},
1902				sample{13, 0}, sample{17, 0},
1903			}},
1904			expBlockNum:          1,
1905			expOverlappingBlocks: 1,
1906		},
1907		// Case 2
1908		// |-------------------------------|
1909		//        |------------|
1910		//                           |--------------------|
1911		{
1912			blockSeries: [][]Series{
1913				{
1914					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1915						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1916						sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
1917						sample{11, 0}, sample{13, 0}, sample{17, 0},
1918					}),
1919				},
1920				{
1921					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1922						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1923						sample{8, 99}, sample{9, 99},
1924					}),
1925				},
1926				{
1927					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1928						sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
1929						sample{21, 59}, sample{22, 59},
1930					}),
1931				},
1932			},
1933			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
1934				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
1935				sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
1936				sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0},
1937				sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
1938				sample{21, 59}, sample{22, 59},
1939			}},
1940			expBlockNum:          1,
1941			expOverlappingBlocks: 1,
1942		},
1943		// Case 3
1944		// |-------------------|
1945		//                           |--------------------|
1946		//               |----------------|
1947		{
1948			blockSeries: [][]Series{
1949				{
1950					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1951						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1952						sample{5, 0}, sample{8, 0}, sample{9, 0},
1953					}),
1954				},
1955				{
1956					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1957						sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
1958						sample{21, 59}, sample{22, 59},
1959					}),
1960				},
1961				{
1962					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1963						sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
1964						sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
1965						sample{16, 99}, sample{17, 99},
1966					}),
1967				},
1968			},
1969			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
1970				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1971				sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
1972				sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59},
1973				sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59},
1974				sample{21, 59}, sample{22, 59},
1975			}},
1976			expBlockNum:          1,
1977			expOverlappingBlocks: 1,
1978		},
1979		// Case 4
1980		// |-------------------------------------|
1981		//            |------------|
1982		//      |-------------------------|
1983		{
1984			blockSeries: [][]Series{
1985				{
1986					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1987						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
1988						sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
1989						sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
1990						sample{20, 0}, sample{22, 0},
1991					}),
1992				},
1993				{
1994					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
1995						sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
1996						sample{11, 59},
1997					}),
1998				},
1999				{
2000					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2001						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
2002						sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
2003						sample{16, 99}, sample{17, 99},
2004					}),
2005				},
2006			},
2007			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
2008				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
2009				sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
2010				sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
2011				sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
2012				sample{20, 0}, sample{22, 0},
2013			}},
2014			expBlockNum:          1,
2015			expOverlappingBlocks: 1,
2016		},
2017		// Case 5: series are merged properly when there are multiple series.
2018		// |-------------------------------------|
2019		//            |------------|
2020		//      |-------------------------|
2021		{
2022			blockSeries: [][]Series{
2023				{
2024					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2025						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
2026						sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
2027						sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
2028						sample{20, 0}, sample{22, 0},
2029					}),
2030					newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{
2031						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
2032						sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
2033						sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
2034						sample{20, 0}, sample{22, 0},
2035					}),
2036					newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
2037						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
2038						sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
2039						sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
2040						sample{20, 0}, sample{22, 0},
2041					}),
2042				},
2043				{
2044					newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{
2045						sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
2046						sample{11, 59},
2047					}),
2048					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2049						sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
2050						sample{11, 59},
2051					}),
2052					newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
2053						sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
2054						sample{11, 59},
2055					}),
2056					newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
2057						sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
2058						sample{11, 59},
2059					}),
2060				},
2061				{
2062					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2063						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
2064						sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
2065						sample{16, 99}, sample{17, 99},
2066					}),
2067					newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
2068						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
2069						sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
2070						sample{16, 99}, sample{17, 99},
2071					}),
2072					newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
2073						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
2074						sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
2075						sample{16, 99}, sample{17, 99},
2076					}),
2077				},
2078			},
2079			expSeries: map[string][]tsdbutil.Sample{
2080				`{__name__="a"}`: {
2081					sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
2082					sample{11, 59},
2083				},
2084				`{a="b"}`: {
2085					sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
2086					sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
2087					sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
2088					sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
2089					sample{20, 0}, sample{22, 0},
2090				},
2091				`{aa="bb"}`: {
2092					sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59},
2093					sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
2094					sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
2095				},
2096				`{b="c"}`: {
2097					sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
2098					sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
2099					sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
2100					sample{20, 0}, sample{22, 0},
2101				},
2102				`{c="d"}`: {
2103					sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
2104					sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
2105					sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
2106					sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
2107					sample{20, 0}, sample{22, 0},
2108				},
2109			},
2110			expBlockNum:          1,
2111			expOverlappingBlocks: 1,
2112		},
2113		// Case 6
2114		// |--------------|
2115		//        |----------------|
2116		//                                         |--------------|
2117		//                                                  |----------------|
2118		{
2119			blockSeries: [][]Series{
2120				{
2121					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2122						sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
2123						sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
2124					}),
2125				},
2126				{
2127					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2128						sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
2129						sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
2130						sample{12, 99}, sample{13, 99}, sample{14, 99},
2131					}),
2132				},
2133				{
2134					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2135						sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{24, 0},
2136						sample{25, 0}, sample{27, 0}, sample{28, 0}, sample{29, 0},
2137					}),
2138				},
2139				{
2140					newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
2141						sample{23, 99}, sample{25, 99}, sample{26, 99}, sample{27, 99},
2142						sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
2143					}),
2144				},
2145			},
2146			expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
2147				sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
2148				sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
2149				sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
2150				sample{12, 99}, sample{13, 99}, sample{14, 99},
2151				sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{23, 99},
2152				sample{24, 0}, sample{25, 99}, sample{26, 99}, sample{27, 99},
2153				sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
2154			}},
2155			expBlockNum:          2,
2156			expOverlappingBlocks: 2,
2157		},
2158	}
2159
2160	defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")
2161	for _, c := range cases {
2162		if ok := t.Run("", func(t *testing.T) {
2163
2164			tmpdir, err := ioutil.TempDir("", "data")
2165			testutil.Ok(t, err)
2166			defer func() {
2167				testutil.Ok(t, os.RemoveAll(tmpdir))
2168			}()
2169
2170			for _, series := range c.blockSeries {
2171				createBlock(t, tmpdir, series)
2172			}
2173			opts := *DefaultOptions
2174			opts.AllowOverlappingBlocks = true
2175			db, err := Open(tmpdir, nil, nil, &opts)
2176			testutil.Ok(t, err)
2177			defer func() {
2178				testutil.Ok(t, db.Close())
2179			}()
2180			db.DisableCompactions()
2181			testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].")
2182
2183			// Vertical Query Merging test.
2184			querier, err := db.Querier(0, 100)
2185			testutil.Ok(t, err)
2186			actSeries := query(t, querier, defaultMatcher)
2187			testutil.Equals(t, c.expSeries, actSeries)
2188
2189			// Vertical compaction.
2190			lc := db.compactor.(*LeveledCompactor)
2191			testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here")
2192			err = db.compact()
2193			testutil.Ok(t, err)
2194			testutil.Equals(t, c.expBlockNum, len(db.Blocks()), "Wrong number of blocks [after compact]")
2195
2196			testutil.Equals(t, c.expOverlappingBlocks, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch")
2197
2198			// Query test after merging the overlapping blocks.
2199			querier, err = db.Querier(0, 100)
2200			testutil.Ok(t, err)
2201			actSeries = query(t, querier, defaultMatcher)
2202			testutil.Equals(t, c.expSeries, actSeries)
2203		}); !ok {
2204			return
2205		}
2206	}
2207}
2208
2209// TestBlockRanges checks the following use cases:
2210//  - No samples can be added with timestamps lower than the last block maxt.
2211//  - The compactor doesn't create overlapping blocks
2212// even when the last blocks is not within the default boundaries.
2213//	- Lower boundary is based on the smallest sample in the head and
2214// upper boundary is rounded to the configured block range.
2215//
2216// This ensures that a snapshot that includes the head and creates a block with a custom time range
2217// will not overlap with the first block created by the next compaction.
2218func TestBlockRanges(t *testing.T) {
2219	logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
2220
2221	dir, err := ioutil.TempDir("", "test_storage")
2222	if err != nil {
2223		t.Fatalf("Opening test dir failed: %s", err)
2224	}
2225
2226	rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1
2227
2228	// Test that the compactor doesn't create overlapping blocks
2229	// when a non standard block already exists.
2230	firstBlockMaxT := int64(3)
2231	createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
2232	db, err := Open(dir, logger, nil, DefaultOptions)
2233	if err != nil {
2234		t.Fatalf("Opening test storage failed: %s", err)
2235	}
2236	defer func() {
2237		os.RemoveAll(dir)
2238	}()
2239	app := db.Appender()
2240	lbl := labels.Labels{{Name: "a", Value: "b"}}
2241	_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
2242	if err == nil {
2243		t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
2244	}
2245	_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
2246	testutil.Ok(t, err)
2247	_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
2248	testutil.Ok(t, err)
2249	secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
2250	_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction
2251
2252	testutil.Ok(t, err)
2253	testutil.Ok(t, app.Commit())
2254	for x := 0; x < 100; x++ {
2255		if len(db.Blocks()) == 2 {
2256			break
2257		}
2258		time.Sleep(100 * time.Millisecond)
2259	}
2260	testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")
2261
2262	if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
2263		t.Fatalf("new block overlaps  old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
2264	}
2265
2266	// Test that wal records are skipped when an existing block covers the same time ranges
2267	// and compaction doesn't create an overlapping block.
2268	db.DisableCompactions()
2269	_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
2270	testutil.Ok(t, err)
2271	_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
2272	testutil.Ok(t, err)
2273	_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
2274	testutil.Ok(t, err)
2275	_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
2276	testutil.Ok(t, err)
2277	testutil.Ok(t, app.Commit())
2278	testutil.Ok(t, db.Close())
2279
2280	thirdBlockMaxt := secondBlockMaxt + 2
2281	createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt))
2282
2283	db, err = Open(dir, logger, nil, DefaultOptions)
2284	if err != nil {
2285		t.Fatalf("Opening test storage failed: %s", err)
2286	}
2287	defer db.Close()
2288	testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
2289	testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
2290
2291	app = db.Appender()
2292	_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
2293	testutil.Ok(t, err)
2294	testutil.Ok(t, app.Commit())
2295	for x := 0; x < 100; x++ {
2296		if len(db.Blocks()) == 4 {
2297			break
2298		}
2299		time.Sleep(100 * time.Millisecond)
2300	}
2301
2302	testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")
2303
2304	if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
2305		t.Fatalf("new block overlaps  old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
2306	}
2307}
2308
2309// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk.
2310// It also checks that the API calls return equivalent results as a normal db.Open() mode.
2311func TestDBReadOnly(t *testing.T) {
2312	var (
2313		dbDir          string
2314		logger         = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
2315		expBlocks      []*Block
2316		expSeries      map[string][]tsdbutil.Sample
2317		expSeriesCount int
2318		expDBHash      []byte
2319		matchAll       = labels.MustNewMatcher(labels.MatchEqual, "", "")
2320		err            error
2321	)
2322
2323	// Bootstrap the db.
2324	{
2325		dbDir, err = ioutil.TempDir("", "test")
2326		testutil.Ok(t, err)
2327
2328		defer func() {
2329			testutil.Ok(t, os.RemoveAll(dbDir))
2330		}()
2331
2332		dbBlocks := []*BlockMeta{
2333			{MinTime: 10, MaxTime: 11},
2334			{MinTime: 11, MaxTime: 12},
2335			{MinTime: 12, MaxTime: 13},
2336		}
2337
2338		for _, m := range dbBlocks {
2339			createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime))
2340		}
2341		expSeriesCount++
2342	}
2343
2344	// Open a normal db to use for a comparison.
2345	{
2346		dbWritable, err := Open(dbDir, logger, nil, nil)
2347		testutil.Ok(t, err)
2348		dbWritable.DisableCompactions()
2349
2350		dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir())
2351		testutil.Ok(t, err)
2352		app := dbWritable.Appender()
2353		_, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
2354		testutil.Ok(t, err)
2355		testutil.Ok(t, app.Commit())
2356		expSeriesCount++
2357
2358		expBlocks = dbWritable.Blocks()
2359		expDbSize, err := fileutil.DirSize(dbWritable.Dir())
2360		testutil.Ok(t, err)
2361		testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append")
2362
2363		q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64)
2364		testutil.Ok(t, err)
2365		expSeries = query(t, q, matchAll)
2366
2367		testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows.
2368		expDBHash = testutil.DirHash(t, dbWritable.Dir())
2369	}
2370
2371	// Open a read only db and ensure that the API returns the same result as the normal DB.
2372	{
2373		dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
2374		testutil.Ok(t, err)
2375		defer func() {
2376			testutil.Ok(t, dbReadOnly.Close())
2377		}()
2378		blocks, err := dbReadOnly.Blocks()
2379		testutil.Ok(t, err)
2380		testutil.Equals(t, len(expBlocks), len(blocks))
2381
2382		for i, expBlock := range expBlocks {
2383			testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
2384		}
2385
2386		q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64)
2387		testutil.Ok(t, err)
2388		readOnlySeries := query(t, q, matchAll)
2389		readOnlyDBHash := testutil.DirHash(t, dbDir)
2390
2391		testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch")
2392		testutil.Equals(t, expSeries, readOnlySeries, "series mismatch")
2393		testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
2394	}
2395}
2396
2397// TestDBReadOnlyClosing ensures that after closing the db
2398// all api methods return an ErrClosed.
2399func TestDBReadOnlyClosing(t *testing.T) {
2400	dbDir, err := ioutil.TempDir("", "test")
2401	testutil.Ok(t, err)
2402
2403	defer func() {
2404		testutil.Ok(t, os.RemoveAll(dbDir))
2405	}()
2406	db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
2407	testutil.Ok(t, err)
2408	testutil.Ok(t, db.Close())
2409	testutil.Equals(t, db.Close(), ErrClosed)
2410	_, err = db.Blocks()
2411	testutil.Equals(t, err, ErrClosed)
2412	_, err = db.Querier(0, 1)
2413	testutil.Equals(t, err, ErrClosed)
2414}
2415
2416func TestDBReadOnly_FlushWAL(t *testing.T) {
2417	var (
2418		dbDir  string
2419		logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
2420		err    error
2421		maxt   int
2422	)
2423
2424	// Bootstrap the db.
2425	{
2426		dbDir, err = ioutil.TempDir("", "test")
2427		testutil.Ok(t, err)
2428
2429		defer func() {
2430			testutil.Ok(t, os.RemoveAll(dbDir))
2431		}()
2432
2433		// Append data to the WAL.
2434		db, err := Open(dbDir, logger, nil, nil)
2435		testutil.Ok(t, err)
2436		db.DisableCompactions()
2437		app := db.Appender()
2438		maxt = 1000
2439		for i := 0; i < maxt; i++ {
2440			_, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
2441			testutil.Ok(t, err)
2442		}
2443		testutil.Ok(t, app.Commit())
2444		defer func() { testutil.Ok(t, db.Close()) }()
2445	}
2446
2447	// Flush WAL.
2448	db, err := OpenDBReadOnly(dbDir, logger)
2449	testutil.Ok(t, err)
2450
2451	flush, err := ioutil.TempDir("", "flush")
2452	testutil.Ok(t, err)
2453
2454	defer func() {
2455		testutil.Ok(t, os.RemoveAll(flush))
2456	}()
2457	testutil.Ok(t, db.FlushWAL(flush))
2458	testutil.Ok(t, db.Close())
2459
2460	// Reopen the DB from the flushed WAL block.
2461	db, err = OpenDBReadOnly(flush, logger)
2462	testutil.Ok(t, err)
2463	defer func() { testutil.Ok(t, db.Close()) }()
2464	blocks, err := db.Blocks()
2465	testutil.Ok(t, err)
2466	testutil.Equals(t, len(blocks), 1)
2467
2468	querier, err := db.Querier(0, int64(maxt)-1)
2469	testutil.Ok(t, err)
2470	defer func() { testutil.Ok(t, querier.Close()) }()
2471
2472	// Sum the values.
2473	seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
2474	testutil.Ok(t, err)
2475
2476	sum := 0.0
2477	for seriesSet.Next() {
2478		series := seriesSet.At().Iterator()
2479		for series.Next() {
2480			_, v := series.At()
2481			sum += v
2482		}
2483		testutil.Ok(t, series.Err())
2484	}
2485	testutil.Ok(t, seriesSet.Err())
2486	testutil.Equals(t, 1000.0, sum)
2487}
2488
2489// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
2490// that the resulted segments includes the expected chunks data.
2491func TestChunkWriter_ReadAfterWrite(t *testing.T) {
2492	chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
2493	chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
2494	chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
2495	chk4 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}})
2496	chk5 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}})
2497	chunkSize := len(chk1.Chunk.Bytes()) + chunks.MaxChunkLengthFieldSize + chunks.ChunkEncodingSize + crc32.Size
2498
2499	tests := []struct {
2500		chks [][]chunks.Meta
2501		segmentSize,
2502		expSegmentsCount int
2503		expSegmentSizes []int
2504	}{
2505		// 0:Last chunk ends at the segment boundary so
2506		// all chunks should fit in a single segment.
2507		{
2508			chks: [][]chunks.Meta{
2509				[]chunks.Meta{
2510					chk1,
2511					chk2,
2512					chk3,
2513				},
2514			},
2515			segmentSize:      3 * chunkSize,
2516			expSegmentSizes:  []int{3 * chunkSize},
2517			expSegmentsCount: 1,
2518		},
2519		// 1:Two chunks can fit in a single segment so the last one should result in a new segment.
2520		{
2521			chks: [][]chunks.Meta{
2522				[]chunks.Meta{
2523					chk1,
2524					chk2,
2525					chk3,
2526					chk4,
2527					chk5,
2528				},
2529			},
2530			segmentSize:      2 * chunkSize,
2531			expSegmentSizes:  []int{2 * chunkSize, 2 * chunkSize, chunkSize},
2532			expSegmentsCount: 3,
2533		},
2534		// 2:When the segment size is smaller than the size of 2 chunks
2535		// the last segment should still create a new segment.
2536		{
2537			chks: [][]chunks.Meta{
2538				[]chunks.Meta{
2539					chk1,
2540					chk2,
2541					chk3,
2542				},
2543			},
2544			segmentSize:      2*chunkSize - 1,
2545			expSegmentSizes:  []int{chunkSize, chunkSize, chunkSize},
2546			expSegmentsCount: 3,
2547		},
2548		// 3:When the segment is smaller than a single chunk
2549		// it should still be written by ignoring the max segment size.
2550		{
2551			chks: [][]chunks.Meta{
2552				[]chunks.Meta{
2553					chk1,
2554				},
2555			},
2556			segmentSize:      chunkSize - 1,
2557			expSegmentSizes:  []int{chunkSize},
2558			expSegmentsCount: 1,
2559		},
2560		// 4:All chunks are bigger than the max segment size, but
2561		// these should still be written even when this will result in bigger segment than the set size.
2562		// Each segment will hold a single chunk.
2563		{
2564			chks: [][]chunks.Meta{
2565				[]chunks.Meta{
2566					chk1,
2567					chk2,
2568					chk3,
2569				},
2570			},
2571			segmentSize:      1,
2572			expSegmentSizes:  []int{chunkSize, chunkSize, chunkSize},
2573			expSegmentsCount: 3,
2574		},
2575		// 5:Adding multiple batches of chunks.
2576		{
2577			chks: [][]chunks.Meta{
2578				[]chunks.Meta{
2579					chk1,
2580					chk2,
2581					chk3,
2582				},
2583				[]chunks.Meta{
2584					chk4,
2585					chk5,
2586				},
2587			},
2588			segmentSize:      3 * chunkSize,
2589			expSegmentSizes:  []int{3 * chunkSize, 2 * chunkSize},
2590			expSegmentsCount: 2,
2591		},
2592		// 6:Adding multiple batches of chunks.
2593		{
2594			chks: [][]chunks.Meta{
2595				[]chunks.Meta{
2596					chk1,
2597				},
2598				[]chunks.Meta{
2599					chk2,
2600					chk3,
2601				},
2602				[]chunks.Meta{
2603					chk4,
2604				},
2605			},
2606			segmentSize:      2 * chunkSize,
2607			expSegmentSizes:  []int{2 * chunkSize, 2 * chunkSize},
2608			expSegmentsCount: 2,
2609		},
2610	}
2611
2612	for i, test := range tests {
2613		t.Run(strconv.Itoa(i), func(t *testing.T) {
2614
2615			tempDir, err := ioutil.TempDir("", "test_chunk_writer")
2616			testutil.Ok(t, err)
2617			defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
2618
2619			chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
2620			testutil.Ok(t, err)
2621
2622			for _, chks := range test.chks {
2623				testutil.Ok(t, chunkw.WriteChunks(chks...))
2624			}
2625			testutil.Ok(t, chunkw.Close())
2626
2627			files, err := ioutil.ReadDir(tempDir)
2628			testutil.Ok(t, err)
2629			testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
2630
2631			// Verify that all data is written to the segments.
2632			sizeExp := 0
2633			sizeAct := 0
2634
2635			for _, chks := range test.chks {
2636				for _, chk := range chks {
2637					l := make([]byte, binary.MaxVarintLen32)
2638					sizeExp += binary.PutUvarint(l, uint64(len(chk.Chunk.Bytes()))) // The length field.
2639					sizeExp += chunks.ChunkEncodingSize
2640					sizeExp += len(chk.Chunk.Bytes()) // The data itself.
2641					sizeExp += crc32.Size             // The 4 bytes of crc32
2642				}
2643			}
2644			sizeExp += test.expSegmentsCount * chunks.SegmentHeaderSize // The segment header bytes.
2645
2646			for i, f := range files {
2647				size := int(f.Size())
2648				// Verify that the segment is the same or smaller than the expected size.
2649				testutil.Assert(t, chunks.SegmentHeaderSize+test.expSegmentSizes[i] >= size, "Segment:%v should NOT be bigger than:%v actual:%v", i, chunks.SegmentHeaderSize+test.expSegmentSizes[i], size)
2650
2651				sizeAct += size
2652			}
2653			testutil.Equals(t, sizeExp, sizeAct)
2654
2655			// Check the content of the chunks.
2656			r, err := chunks.NewDirReader(tempDir, nil)
2657			testutil.Ok(t, err)
2658			defer func() { testutil.Ok(t, r.Close()) }()
2659
2660			for _, chks := range test.chks {
2661				for _, chkExp := range chks {
2662					chkAct, err := r.Chunk(chkExp.Ref)
2663					testutil.Ok(t, err)
2664					testutil.Equals(t, chkExp.Chunk.Bytes(), chkAct.Bytes())
2665				}
2666			}
2667		})
2668	}
2669}
2670
2671// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently.
2672// Regression test for https://github.com/prometheus/prometheus/pull/6514.
2673func TestChunkReader_ConcurrentReads(t *testing.T) {
2674	chks := []chunks.Meta{
2675		tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}),
2676		tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}),
2677		tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}),
2678		tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}),
2679		tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}),
2680	}
2681
2682	tempDir, err := ioutil.TempDir("", "test_chunk_writer")
2683	testutil.Ok(t, err)
2684	defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
2685
2686	chunkw, err := chunks.NewWriter(tempDir)
2687	testutil.Ok(t, err)
2688
2689	testutil.Ok(t, chunkw.WriteChunks(chks...))
2690	testutil.Ok(t, chunkw.Close())
2691
2692	r, err := chunks.NewDirReader(tempDir, nil)
2693	testutil.Ok(t, err)
2694
2695	var wg sync.WaitGroup
2696	for _, chk := range chks {
2697		for i := 0; i < 100; i++ {
2698			wg.Add(1)
2699			go func(chunk chunks.Meta) {
2700				defer wg.Done()
2701
2702				chkAct, err := r.Chunk(chunk.Ref)
2703				testutil.Ok(t, err)
2704				testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes())
2705			}(chk)
2706		}
2707		wg.Wait()
2708	}
2709	testutil.Ok(t, r.Close())
2710}
2711