1// Copyright 2017 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package tsdb
15
16import (
17	"context"
18	"fmt"
19	"io/ioutil"
20	"math"
21	"math/rand"
22	"os"
23	"path/filepath"
24	"sort"
25	"strconv"
26	"sync"
27	"testing"
28
29	"github.com/pkg/errors"
30	prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
31	"github.com/stretchr/testify/require"
32
33	"github.com/prometheus/prometheus/pkg/exemplar"
34	"github.com/prometheus/prometheus/pkg/labels"
35	"github.com/prometheus/prometheus/storage"
36	"github.com/prometheus/prometheus/tsdb/chunkenc"
37	"github.com/prometheus/prometheus/tsdb/chunks"
38	"github.com/prometheus/prometheus/tsdb/index"
39	"github.com/prometheus/prometheus/tsdb/record"
40	"github.com/prometheus/prometheus/tsdb/tombstones"
41	"github.com/prometheus/prometheus/tsdb/tsdbutil"
42	"github.com/prometheus/prometheus/tsdb/wal"
43)
44
45func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) {
46	dir, err := ioutil.TempDir("", "test")
47	require.NoError(t, err)
48	wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL)
49	require.NoError(t, err)
50
51	opts := DefaultHeadOptions()
52	opts.ChunkRange = chunkRange
53	opts.ChunkDirRoot = dir
54	opts.NumExemplars = 10
55	h, err := NewHead(nil, nil, wlog, opts, nil)
56	require.NoError(t, err)
57
58	require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
59
60	t.Cleanup(func() {
61		require.NoError(t, os.RemoveAll(dir))
62	})
63	return h, wlog
64}
65
66func BenchmarkCreateSeries(b *testing.B) {
67	series := genSeries(b.N, 10, 0, 0)
68	h, _ := newTestHead(b, 10000, false)
69	defer func() {
70		require.NoError(b, h.Close())
71	}()
72
73	b.ReportAllocs()
74	b.ResetTimer()
75
76	for _, s := range series {
77		h.getOrCreate(s.Labels().Hash(), s.Labels())
78	}
79}
80
81func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
82	var enc record.Encoder
83	for _, r := range recs {
84		switch v := r.(type) {
85		case []record.RefSeries:
86			require.NoError(t, w.Log(enc.Series(v, nil)))
87		case []record.RefSample:
88			require.NoError(t, w.Log(enc.Samples(v, nil)))
89		case []tombstones.Stone:
90			require.NoError(t, w.Log(enc.Tombstones(v, nil)))
91		case []record.RefExemplar:
92			require.NoError(t, w.Log(enc.Exemplars(v, nil)))
93		}
94	}
95}
96
97func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
98	sr, err := wal.NewSegmentsReader(dir)
99	require.NoError(t, err)
100	defer sr.Close()
101
102	var dec record.Decoder
103	r := wal.NewReader(sr)
104
105	for r.Next() {
106		rec := r.Record()
107
108		switch dec.Type(rec) {
109		case record.Series:
110			series, err := dec.Series(rec, nil)
111			require.NoError(t, err)
112			recs = append(recs, series)
113		case record.Samples:
114			samples, err := dec.Samples(rec, nil)
115			require.NoError(t, err)
116			recs = append(recs, samples)
117		case record.Tombstones:
118			tstones, err := dec.Tombstones(rec, nil)
119			require.NoError(t, err)
120			recs = append(recs, tstones)
121		default:
122			t.Fatalf("unknown record type")
123		}
124	}
125	require.NoError(t, r.Err())
126	return recs
127}
128
129func BenchmarkLoadWAL(b *testing.B) {
130	cases := []struct {
131		// Total series is (batches*seriesPerBatch).
132		batches          int
133		seriesPerBatch   int
134		samplesPerSeries int
135	}{
136		{ // Less series and more samples. 2 hour WAL with 1 second scrape interval.
137			batches:          10,
138			seriesPerBatch:   100,
139			samplesPerSeries: 7200,
140		},
141		{ // More series and less samples.
142			batches:          10,
143			seriesPerBatch:   10000,
144			samplesPerSeries: 50,
145		},
146		{ // In between.
147			batches:          10,
148			seriesPerBatch:   1000,
149			samplesPerSeries: 480,
150		},
151	}
152
153	labelsPerSeries := 5
154	// Rough estimates of most common % of samples that have an exemplar for each scrape.
155	exemplarsPercentages := []float64{0, 0.5, 1, 5}
156	lastExemplarsPerSeries := -1
157	for _, c := range cases {
158		for _, p := range exemplarsPercentages {
159			exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100))
160			// For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries
161			// multiple times without this check.
162			if exemplarsPerSeries == lastExemplarsPerSeries {
163				continue
164			}
165			lastExemplarsPerSeries = exemplarsPerSeries
166			// fmt.Println("exemplars per series: ", exemplarsPerSeries)
167			b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries),
168				func(b *testing.B) {
169					dir, err := ioutil.TempDir("", "test_load_wal")
170					require.NoError(b, err)
171					defer func() {
172						require.NoError(b, os.RemoveAll(dir))
173					}()
174
175					w, err := wal.New(nil, nil, dir, false)
176					require.NoError(b, err)
177
178					// Write series.
179					refSeries := make([]record.RefSeries, 0, c.seriesPerBatch)
180					for k := 0; k < c.batches; k++ {
181						refSeries = refSeries[:0]
182						for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ {
183							lbls := make(map[string]string, labelsPerSeries)
184							lbls[defaultLabelName] = strconv.Itoa(i)
185							for j := 1; len(lbls) < labelsPerSeries; j++ {
186								lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
187							}
188							refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)})
189						}
190						populateTestWAL(b, w, []interface{}{refSeries})
191					}
192
193					// Write samples.
194					refSamples := make([]record.RefSample, 0, c.seriesPerBatch)
195					for i := 0; i < c.samplesPerSeries; i++ {
196						for j := 0; j < c.batches; j++ {
197							refSamples = refSamples[:0]
198							for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
199								refSamples = append(refSamples, record.RefSample{
200									Ref: uint64(k) * 100,
201									T:   int64(i) * 10,
202									V:   float64(i) * 100,
203								})
204							}
205							populateTestWAL(b, w, []interface{}{refSamples})
206						}
207					}
208
209					// Write samples.
210					refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
211					for i := 0; i < exemplarsPerSeries; i++ {
212						for j := 0; j < c.batches; j++ {
213							refExemplars = refExemplars[:0]
214							for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
215								refExemplars = append(refExemplars, record.RefExemplar{
216									Ref:    uint64(k) * 100,
217									T:      int64(i) * 10,
218									V:      float64(i) * 100,
219									Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
220								})
221							}
222							populateTestWAL(b, w, []interface{}{refExemplars})
223						}
224					}
225
226					b.ResetTimer()
227
228					// Load the WAL.
229					for i := 0; i < b.N; i++ {
230						opts := DefaultHeadOptions()
231						opts.ChunkRange = 1000
232						opts.ChunkDirRoot = w.Dir()
233						h, err := NewHead(nil, nil, w, opts, nil)
234						require.NoError(b, err)
235						h.Init(0)
236					}
237				})
238		}
239	}
240}
241
242func TestHead_ReadWAL(t *testing.T) {
243	for _, compress := range []bool{false, true} {
244		t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
245			entries := []interface{}{
246				[]record.RefSeries{
247					{Ref: 10, Labels: labels.FromStrings("a", "1")},
248					{Ref: 11, Labels: labels.FromStrings("a", "2")},
249					{Ref: 100, Labels: labels.FromStrings("a", "3")},
250				},
251				[]record.RefSample{
252					{Ref: 0, T: 99, V: 1},
253					{Ref: 10, T: 100, V: 2},
254					{Ref: 100, T: 100, V: 3},
255				},
256				[]record.RefSeries{
257					{Ref: 50, Labels: labels.FromStrings("a", "4")},
258					// This series has two refs pointing to it.
259					{Ref: 101, Labels: labels.FromStrings("a", "3")},
260				},
261				[]record.RefSample{
262					{Ref: 10, T: 101, V: 5},
263					{Ref: 50, T: 101, V: 6},
264					{Ref: 101, T: 101, V: 7},
265				},
266				[]tombstones.Stone{
267					{Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}},
268				},
269				[]record.RefExemplar{
270					{Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("traceID", "asdf")},
271				},
272			}
273
274			head, w := newTestHead(t, 1000, compress)
275			defer func() {
276				require.NoError(t, head.Close())
277			}()
278
279			populateTestWAL(t, w, entries)
280
281			require.NoError(t, head.Init(math.MinInt64))
282			require.Equal(t, uint64(101), head.lastSeriesID.Load())
283
284			s10 := head.series.getByID(10)
285			s11 := head.series.getByID(11)
286			s50 := head.series.getByID(50)
287			s100 := head.series.getByID(100)
288
289			require.Equal(t, labels.FromStrings("a", "1"), s10.lset)
290			require.Equal(t, (*memSeries)(nil), s11) // Series without samples should be garbage collected at head.Init().
291			require.Equal(t, labels.FromStrings("a", "4"), s50.lset)
292			require.Equal(t, labels.FromStrings("a", "3"), s100.lset)
293
294			expandChunk := func(c chunkenc.Iterator) (x []sample) {
295				for c.Next() {
296					t, v := c.At()
297					x = append(x, sample{t: t, v: v})
298				}
299				require.NoError(t, c.Err())
300				return x
301			}
302			require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil)))
303			require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil)))
304			require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil)))
305
306			q, err := head.ExemplarQuerier(context.Background())
307			require.NoError(t, err)
308			e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")})
309			require.NoError(t, err)
310			require.Equal(t, e[0].Exemplars[0], exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("traceID", "asdf")})
311		})
312	}
313}
314
315func TestHead_WALMultiRef(t *testing.T) {
316	head, w := newTestHead(t, 1000, false)
317
318	require.NoError(t, head.Init(0))
319
320	app := head.Appender(context.Background())
321	ref1, err := app.Append(0, labels.FromStrings("foo", "bar"), 100, 1)
322	require.NoError(t, err)
323	require.NoError(t, app.Commit())
324	require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
325
326	// Add another sample outside chunk range to mmap a chunk.
327	app = head.Appender(context.Background())
328	_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1500, 2)
329	require.NoError(t, err)
330	require.NoError(t, app.Commit())
331	require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
332
333	require.NoError(t, head.Truncate(1600))
334
335	app = head.Appender(context.Background())
336	ref2, err := app.Append(0, labels.FromStrings("foo", "bar"), 1700, 3)
337	require.NoError(t, err)
338	require.NoError(t, app.Commit())
339	require.Equal(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
340
341	// Add another sample outside chunk range to mmap a chunk.
342	app = head.Appender(context.Background())
343	_, err = app.Append(0, labels.FromStrings("foo", "bar"), 2000, 4)
344	require.NoError(t, err)
345	require.NoError(t, app.Commit())
346	require.Equal(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated))
347
348	require.NotEqual(t, ref1, ref2, "Refs are the same")
349	require.NoError(t, head.Close())
350
351	w, err = wal.New(nil, nil, w.Dir(), false)
352	require.NoError(t, err)
353
354	opts := DefaultHeadOptions()
355	opts.ChunkRange = 1000
356	opts.ChunkDirRoot = w.Dir()
357	head, err = NewHead(nil, nil, w, opts, nil)
358	require.NoError(t, err)
359	require.NoError(t, head.Init(0))
360	defer func() {
361		require.NoError(t, head.Close())
362	}()
363
364	q, err := NewBlockQuerier(head, 0, 2100)
365	require.NoError(t, err)
366	series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
367	require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {
368		sample{100, 1},
369		sample{1500, 2},
370		sample{1700, 3},
371		sample{2000, 4},
372	}}, series)
373}
374
375func TestHead_UnknownWALRecord(t *testing.T) {
376	head, w := newTestHead(t, 1000, false)
377	w.Log([]byte{255, 42})
378	require.NoError(t, head.Init(0))
379	require.NoError(t, head.Close())
380}
381
382func TestHead_Truncate(t *testing.T) {
383	h, _ := newTestHead(t, 1000, false)
384	defer func() {
385		require.NoError(t, h.Close())
386	}()
387
388	h.initTime(0)
389
390	s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
391	s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
392	s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
393	s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))
394
395	s1.mmappedChunks = []*mmappedChunk{
396		{minTime: 0, maxTime: 999},
397		{minTime: 1000, maxTime: 1999},
398		{minTime: 2000, maxTime: 2999},
399	}
400	s2.mmappedChunks = []*mmappedChunk{
401		{minTime: 1000, maxTime: 1999},
402		{minTime: 2000, maxTime: 2999},
403		{minTime: 3000, maxTime: 3999},
404	}
405	s3.mmappedChunks = []*mmappedChunk{
406		{minTime: 0, maxTime: 999},
407		{minTime: 1000, maxTime: 1999},
408	}
409	s4.mmappedChunks = []*mmappedChunk{}
410
411	// Truncation need not be aligned.
412	require.NoError(t, h.Truncate(1))
413
414	require.NoError(t, h.Truncate(2000))
415
416	require.Equal(t, []*mmappedChunk{
417		{minTime: 2000, maxTime: 2999},
418	}, h.series.getByID(s1.ref).mmappedChunks)
419
420	require.Equal(t, []*mmappedChunk{
421		{minTime: 2000, maxTime: 2999},
422		{minTime: 3000, maxTime: 3999},
423	}, h.series.getByID(s2.ref).mmappedChunks)
424
425	require.Nil(t, h.series.getByID(s3.ref))
426	require.Nil(t, h.series.getByID(s4.ref))
427
428	postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
429	postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
430	postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
431	postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
432	postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
433	postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))
434
435	require.Equal(t, []uint64{s1.ref}, postingsA1)
436	require.Equal(t, []uint64{s2.ref}, postingsA2)
437	require.Equal(t, []uint64{s1.ref, s2.ref}, postingsB1)
438	require.Equal(t, []uint64{s1.ref, s2.ref}, postingsAll)
439	require.Nil(t, postingsB2)
440	require.Nil(t, postingsC1)
441
442	require.Equal(t, map[string]struct{}{
443		"":  {}, // from 'all' postings list
444		"a": {},
445		"b": {},
446		"1": {},
447		"2": {},
448	}, h.symbols)
449
450	values := map[string]map[string]struct{}{}
451	for _, name := range h.postings.LabelNames() {
452		ss, ok := values[name]
453		if !ok {
454			ss = map[string]struct{}{}
455			values[name] = ss
456		}
457		for _, value := range h.postings.LabelValues(name) {
458			ss[value] = struct{}{}
459		}
460	}
461	require.Equal(t, map[string]map[string]struct{}{
462		"a": {"1": struct{}{}, "2": struct{}{}},
463		"b": {"1": struct{}{}},
464	}, values)
465}
466
467// Validate various behaviors brought on by firstChunkID accounting for
468// garbage collected chunks.
469func TestMemSeries_truncateChunks(t *testing.T) {
470	dir, err := ioutil.TempDir("", "truncate_chunks")
471	require.NoError(t, err)
472	defer func() {
473		require.NoError(t, os.RemoveAll(dir))
474	}()
475	// This is usually taken from the Head, but passing manually here.
476	chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
477	require.NoError(t, err)
478	defer func() {
479		require.NoError(t, chunkDiskMapper.Close())
480	}()
481
482	memChunkPool := sync.Pool{
483		New: func() interface{} {
484			return &memChunk{}
485		},
486	}
487
488	s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool)
489
490	for i := 0; i < 4000; i += 5 {
491		ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
492		require.True(t, ok, "sample append failed")
493	}
494
495	// Check that truncate removes half of the chunks and afterwards
496	// that the ID of the last chunk still gives us the same chunk afterwards.
497	countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk.
498	lastID := s.chunkID(countBefore - 1)
499	lastChunk, _, err := s.chunk(lastID, chunkDiskMapper)
500	require.NoError(t, err)
501	require.NotNil(t, lastChunk)
502
503	chk, _, err := s.chunk(0, chunkDiskMapper)
504	require.NotNil(t, chk)
505	require.NoError(t, err)
506
507	s.truncateChunksBefore(2000)
508
509	require.Equal(t, int64(2000), s.mmappedChunks[0].minTime)
510	_, _, err = s.chunk(0, chunkDiskMapper)
511	require.Equal(t, storage.ErrNotFound, err, "first chunks not gone")
512	require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk.
513	chk, _, err = s.chunk(lastID, chunkDiskMapper)
514	require.NoError(t, err)
515	require.Equal(t, lastChunk, chk)
516
517	// Validate that the series' sample buffer is applied correctly to the last chunk
518	// after truncation.
519	it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
520	_, ok := it1.(*memSafeIterator)
521	require.True(t, ok)
522
523	it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil)
524	_, ok = it2.(*memSafeIterator)
525	require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer")
526}
527
528func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
529	for _, compress := range []bool{false, true} {
530		t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
531			entries := []interface{}{
532				[]record.RefSeries{
533					{Ref: 10, Labels: labels.FromStrings("a", "1")},
534				},
535				[]record.RefSample{},
536				[]record.RefSeries{
537					{Ref: 50, Labels: labels.FromStrings("a", "2")},
538				},
539				[]record.RefSample{
540					{Ref: 50, T: 80, V: 1},
541					{Ref: 50, T: 90, V: 1},
542				},
543			}
544			head, w := newTestHead(t, 1000, compress)
545			defer func() {
546				require.NoError(t, head.Close())
547			}()
548
549			populateTestWAL(t, w, entries)
550
551			require.NoError(t, head.Init(math.MinInt64))
552
553			require.NoError(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1")))
554		})
555	}
556}
557
558func TestHeadDeleteSimple(t *testing.T) {
559	buildSmpls := func(s []int64) []sample {
560		ss := make([]sample, 0, len(s))
561		for _, t := range s {
562			ss = append(ss, sample{t: t, v: float64(t)})
563		}
564		return ss
565	}
566	smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
567	lblDefault := labels.Label{Name: "a", Value: "b"}
568
569	cases := []struct {
570		dranges    tombstones.Intervals
571		addSamples []sample // Samples to add after delete.
572		smplsExp   []sample
573	}{
574		{
575			dranges:  tombstones.Intervals{{Mint: 0, Maxt: 3}},
576			smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}),
577		},
578		{
579			dranges:  tombstones.Intervals{{Mint: 1, Maxt: 3}},
580			smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}),
581		},
582		{
583			dranges:  tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
584			smplsExp: buildSmpls([]int64{0, 8, 9}),
585		},
586		{
587			dranges:  tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}},
588			smplsExp: buildSmpls([]int64{0}),
589		},
590		{ // This case is to ensure that labels and symbols are deleted.
591			dranges:  tombstones.Intervals{{Mint: 0, Maxt: 9}},
592			smplsExp: buildSmpls([]int64{}),
593		},
594		{
595			dranges:    tombstones.Intervals{{Mint: 1, Maxt: 3}},
596			addSamples: buildSmpls([]int64{11, 13, 15}),
597			smplsExp:   buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9, 11, 13, 15}),
598		},
599		{
600			// After delete, the appended samples in the deleted range should be visible
601			// as the tombstones are clamped to head min/max time.
602			dranges:    tombstones.Intervals{{Mint: 7, Maxt: 20}},
603			addSamples: buildSmpls([]int64{11, 13, 15}),
604			smplsExp:   buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 11, 13, 15}),
605		},
606	}
607
608	for _, compress := range []bool{false, true} {
609		t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
610			for _, c := range cases {
611				head, w := newTestHead(t, 1000, compress)
612
613				app := head.Appender(context.Background())
614				for _, smpl := range smplsAll {
615					_, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v)
616					require.NoError(t, err)
617
618				}
619				require.NoError(t, app.Commit())
620
621				// Delete the ranges.
622				for _, r := range c.dranges {
623					require.NoError(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)))
624				}
625
626				// Add more samples.
627				app = head.Appender(context.Background())
628				for _, smpl := range c.addSamples {
629					_, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v)
630					require.NoError(t, err)
631
632				}
633				require.NoError(t, app.Commit())
634
635				// Compare the samples for both heads - before and after the reloadBlocks.
636				reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks.
637				require.NoError(t, err)
638				opts := DefaultHeadOptions()
639				opts.ChunkRange = 1000
640				opts.ChunkDirRoot = reloadedW.Dir()
641				reloadedHead, err := NewHead(nil, nil, reloadedW, opts, nil)
642				require.NoError(t, err)
643				require.NoError(t, reloadedHead.Init(0))
644
645				// Compare the query results for both heads - before and after the reloadBlocks.
646			Outer:
647				for _, h := range []*Head{head, reloadedHead} {
648					q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
649					require.NoError(t, err)
650					actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
651					require.NoError(t, q.Close())
652					expSeriesSet := newMockSeriesSet([]storage.Series{
653						storage.NewListSeries(labels.Labels{lblDefault}, func() []tsdbutil.Sample {
654							ss := make([]tsdbutil.Sample, 0, len(c.smplsExp))
655							for _, s := range c.smplsExp {
656								ss = append(ss, s)
657							}
658							return ss
659						}(),
660						),
661					})
662
663					for {
664						eok, rok := expSeriesSet.Next(), actSeriesSet.Next()
665						require.Equal(t, eok, rok)
666
667						if !eok {
668							require.NoError(t, h.Close())
669							require.NoError(t, actSeriesSet.Err())
670							require.Equal(t, 0, len(actSeriesSet.Warnings()))
671							continue Outer
672						}
673						expSeries := expSeriesSet.At()
674						actSeries := actSeriesSet.At()
675
676						require.Equal(t, expSeries.Labels(), actSeries.Labels())
677
678						smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil)
679						smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil)
680
681						require.Equal(t, errExp, errRes)
682						require.Equal(t, smplExp, smplRes)
683					}
684				}
685			}
686		})
687	}
688}
689
690func TestDeleteUntilCurMax(t *testing.T) {
691	hb, _ := newTestHead(t, 1000000, false)
692	defer func() {
693		require.NoError(t, hb.Close())
694	}()
695
696	numSamples := int64(10)
697	app := hb.Appender(context.Background())
698	smpls := make([]float64, numSamples)
699	for i := int64(0); i < numSamples; i++ {
700		smpls[i] = rand.Float64()
701		_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
702		require.NoError(t, err)
703	}
704	require.NoError(t, app.Commit())
705	require.NoError(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
706
707	// Test the series returns no samples. The series is cleared only after compaction.
708	q, err := NewBlockQuerier(hb, 0, 100000)
709	require.NoError(t, err)
710	res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
711	require.True(t, res.Next(), "series is not present")
712	s := res.At()
713	it := s.Iterator()
714	require.False(t, it.Next(), "expected no samples")
715	for res.Next() {
716	}
717	require.NoError(t, res.Err())
718	require.Equal(t, 0, len(res.Warnings()))
719
720	// Add again and test for presence.
721	app = hb.Appender(context.Background())
722	_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 11, 1)
723	require.NoError(t, err)
724	require.NoError(t, app.Commit())
725	q, err = NewBlockQuerier(hb, 0, 100000)
726	require.NoError(t, err)
727	res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
728	require.True(t, res.Next(), "series don't exist")
729	exps := res.At()
730	it = exps.Iterator()
731	resSamples, err := storage.ExpandSamples(it, newSample)
732	require.NoError(t, err)
733	require.Equal(t, []tsdbutil.Sample{sample{11, 1}}, resSamples)
734	for res.Next() {
735	}
736	require.NoError(t, res.Err())
737	require.Equal(t, 0, len(res.Warnings()))
738}
739
740func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
741	numSamples := 10000
742
743	// Enough samples to cause a checkpoint.
744	hb, w := newTestHead(t, int64(numSamples)*10, false)
745
746	for i := 0; i < numSamples; i++ {
747		app := hb.Appender(context.Background())
748		_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0)
749		require.NoError(t, err)
750		require.NoError(t, app.Commit())
751	}
752	require.NoError(t, hb.Delete(0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
753	require.NoError(t, hb.Truncate(1))
754	require.NoError(t, hb.Close())
755
756	// Confirm there's been a checkpoint.
757	cdir, _, err := wal.LastCheckpoint(w.Dir())
758	require.NoError(t, err)
759	// Read in checkpoint and WAL.
760	recs := readTestWAL(t, cdir)
761	recs = append(recs, readTestWAL(t, w.Dir())...)
762
763	var series, samples, stones int
764	for _, rec := range recs {
765		switch rec.(type) {
766		case []record.RefSeries:
767			series++
768		case []record.RefSample:
769			samples++
770		case []tombstones.Stone:
771			stones++
772		default:
773			t.Fatalf("unknown record type")
774		}
775	}
776	require.Equal(t, 1, series)
777	require.Equal(t, 9999, samples)
778	require.Equal(t, 1, stones)
779
780}
781
782func TestDelete_e2e(t *testing.T) {
783	numDatapoints := 1000
784	numRanges := 1000
785	timeInterval := int64(2)
786	// Create 8 series with 1000 data-points of different ranges, delete and run queries.
787	lbls := [][]labels.Label{
788		{
789			{Name: "a", Value: "b"},
790			{Name: "instance", Value: "localhost:9090"},
791			{Name: "job", Value: "prometheus"},
792		},
793		{
794			{Name: "a", Value: "b"},
795			{Name: "instance", Value: "127.0.0.1:9090"},
796			{Name: "job", Value: "prometheus"},
797		},
798		{
799			{Name: "a", Value: "b"},
800			{Name: "instance", Value: "127.0.0.1:9090"},
801			{Name: "job", Value: "prom-k8s"},
802		},
803		{
804			{Name: "a", Value: "b"},
805			{Name: "instance", Value: "localhost:9090"},
806			{Name: "job", Value: "prom-k8s"},
807		},
808		{
809			{Name: "a", Value: "c"},
810			{Name: "instance", Value: "localhost:9090"},
811			{Name: "job", Value: "prometheus"},
812		},
813		{
814			{Name: "a", Value: "c"},
815			{Name: "instance", Value: "127.0.0.1:9090"},
816			{Name: "job", Value: "prometheus"},
817		},
818		{
819			{Name: "a", Value: "c"},
820			{Name: "instance", Value: "127.0.0.1:9090"},
821			{Name: "job", Value: "prom-k8s"},
822		},
823		{
824			{Name: "a", Value: "c"},
825			{Name: "instance", Value: "localhost:9090"},
826			{Name: "job", Value: "prom-k8s"},
827		},
828	}
829	seriesMap := map[string][]tsdbutil.Sample{}
830	for _, l := range lbls {
831		seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
832	}
833
834	hb, _ := newTestHead(t, 100000, false)
835	defer func() {
836		require.NoError(t, hb.Close())
837	}()
838
839	app := hb.Appender(context.Background())
840	for _, l := range lbls {
841		ls := labels.New(l...)
842		series := []tsdbutil.Sample{}
843		ts := rand.Int63n(300)
844		for i := 0; i < numDatapoints; i++ {
845			v := rand.Float64()
846			_, err := app.Append(0, ls, ts, v)
847			require.NoError(t, err)
848			series = append(series, sample{ts, v})
849			ts += rand.Int63n(timeInterval) + 1
850		}
851		seriesMap[labels.New(l...).String()] = series
852	}
853	require.NoError(t, app.Commit())
854	// Delete a time-range from each-selector.
855	dels := []struct {
856		ms     []*labels.Matcher
857		drange tombstones.Intervals
858	}{
859		{
860			ms:     []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")},
861			drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 600, Maxt: 670}},
862		},
863		{
864			ms: []*labels.Matcher{
865				labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
866				labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"),
867			},
868			drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 100, Maxt: 670}},
869		},
870		{
871			ms: []*labels.Matcher{
872				labels.MustNewMatcher(labels.MatchEqual, "a", "c"),
873				labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"),
874				labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
875			},
876			drange: tombstones.Intervals{{Mint: 300, Maxt: 400}, {Mint: 100, Maxt: 6700}},
877		},
878		// TODO: Add Regexp Matchers.
879	}
880	for _, del := range dels {
881		for _, r := range del.drange {
882			require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
883		}
884		matched := labels.Slice{}
885		for _, ls := range lbls {
886			s := labels.Selector(del.ms)
887			if s.Matches(ls) {
888				matched = append(matched, ls)
889			}
890		}
891		sort.Sort(matched)
892		for i := 0; i < numRanges; i++ {
893			q, err := NewBlockQuerier(hb, 0, 100000)
894			require.NoError(t, err)
895			defer q.Close()
896			ss := q.Select(true, nil, del.ms...)
897			// Build the mockSeriesSet.
898			matchedSeries := make([]storage.Series, 0, len(matched))
899			for _, m := range matched {
900				smpls := seriesMap[m.String()]
901				smpls = deletedSamples(smpls, del.drange)
902				// Only append those series for which samples exist as mockSeriesSet
903				// doesn't skip series with no samples.
904				// TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator
905				if len(smpls) > 0 {
906					matchedSeries = append(matchedSeries, storage.NewListSeries(m, smpls))
907				}
908			}
909			expSs := newMockSeriesSet(matchedSeries)
910			// Compare both SeriesSets.
911			for {
912				eok, rok := expSs.Next(), ss.Next()
913				// Skip a series if iterator is empty.
914				if rok {
915					for !ss.At().Iterator().Next() {
916						rok = ss.Next()
917						if !rok {
918							break
919						}
920					}
921				}
922				require.Equal(t, eok, rok)
923				if !eok {
924					break
925				}
926				sexp := expSs.At()
927				sres := ss.At()
928				require.Equal(t, sexp.Labels(), sres.Labels())
929				smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil)
930				smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil)
931				require.Equal(t, errExp, errRes)
932				require.Equal(t, smplExp, smplRes)
933			}
934			require.NoError(t, ss.Err())
935			require.Equal(t, 0, len(ss.Warnings()))
936		}
937	}
938}
939
940func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample {
941	for len(full) > 0 {
942		if full[0].T() >= mint {
943			break
944		}
945		full = full[1:]
946	}
947	for i, s := range full {
948		// labels.Labelinate on the first sample larger than maxt.
949		if s.T() > maxt {
950			return full[:i]
951		}
952	}
953	// maxt is after highest sample.
954	return full
955}
956
957func deletedSamples(full []tsdbutil.Sample, dranges tombstones.Intervals) []tsdbutil.Sample {
958	ds := make([]tsdbutil.Sample, 0, len(full))
959Outer:
960	for _, s := range full {
961		for _, r := range dranges {
962			if r.InBounds(s.T()) {
963				continue Outer
964			}
965		}
966		ds = append(ds, s)
967	}
968
969	return ds
970}
971
972func TestComputeChunkEndTime(t *testing.T) {
973	cases := []struct {
974		start, cur, max int64
975		res             int64
976	}{
977		{
978			start: 0,
979			cur:   250,
980			max:   1000,
981			res:   1000,
982		},
983		{
984			start: 100,
985			cur:   200,
986			max:   1000,
987			res:   550,
988		},
989		// Case where we fit floored 0 chunks. Must catch division by 0
990		// and default to maximum time.
991		{
992			start: 0,
993			cur:   500,
994			max:   1000,
995			res:   1000,
996		},
997		// Catch division by zero for cur == start. Strictly not a possible case.
998		{
999			start: 100,
1000			cur:   100,
1001			max:   1000,
1002			res:   104,
1003		},
1004	}
1005
1006	for _, c := range cases {
1007		got := computeChunkEndTime(c.start, c.cur, c.max)
1008		if got != c.res {
1009			t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got)
1010		}
1011	}
1012}
1013
1014func TestMemSeries_append(t *testing.T) {
1015	dir, err := ioutil.TempDir("", "append")
1016	require.NoError(t, err)
1017	defer func() {
1018		require.NoError(t, os.RemoveAll(dir))
1019	}()
1020	// This is usually taken from the Head, but passing manually here.
1021	chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
1022	require.NoError(t, err)
1023	defer func() {
1024		require.NoError(t, chunkDiskMapper.Close())
1025	}()
1026
1027	s := newMemSeries(labels.Labels{}, 1, 500, nil)
1028
1029	// Add first two samples at the very end of a chunk range and the next two
1030	// on and after it.
1031	// New chunk must correctly be cut at 1000.
1032	ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper)
1033	require.True(t, ok, "append failed")
1034	require.True(t, chunkCreated, "first sample created chunk")
1035
1036	ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper)
1037	require.True(t, ok, "append failed")
1038	require.False(t, chunkCreated, "second sample should use same chunk")
1039
1040	ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper)
1041	require.True(t, ok, "append failed")
1042	require.True(t, chunkCreated, "expected new chunk on boundary")
1043
1044	ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper)
1045	require.True(t, ok, "append failed")
1046	require.False(t, chunkCreated, "second sample should use same chunk")
1047
1048	require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk")
1049	require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range")
1050	require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range")
1051	require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
1052	require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
1053
1054	// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
1055	// at approximately 120 samples per chunk.
1056	for i := 1; i < 1000; i++ {
1057		ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper)
1058		require.True(t, ok, "append failed")
1059	}
1060
1061	require.Greater(t, len(s.mmappedChunks)+1, 7, "expected intermediate chunks")
1062
1063	// All chunks but the first and last should now be moderately full.
1064	for i, c := range s.mmappedChunks[1:] {
1065		chk, err := chunkDiskMapper.Chunk(c.ref)
1066		require.NoError(t, err)
1067		require.Greater(t, chk.NumSamples(), 100, "unexpected small chunk %d of length %d", i, chk.NumSamples())
1068	}
1069}
1070
1071func TestGCChunkAccess(t *testing.T) {
1072	// Put a chunk, select it. GC it and then access it.
1073	h, _ := newTestHead(t, 1000, false)
1074	defer func() {
1075		require.NoError(t, h.Close())
1076	}()
1077
1078	h.initTime(0)
1079
1080	s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
1081
1082	// Appending 2 samples for the first chunk.
1083	ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
1084	require.True(t, ok, "series append failed")
1085	require.True(t, chunkCreated, "chunks was not created")
1086	ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
1087	require.True(t, ok, "series append failed")
1088	require.False(t, chunkCreated, "chunks was created")
1089
1090	// A new chunks should be created here as it's beyond the chunk range.
1091	ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
1092	require.True(t, ok, "series append failed")
1093	require.True(t, chunkCreated, "chunks was not created")
1094	ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
1095	require.True(t, ok, "series append failed")
1096	require.False(t, chunkCreated, "chunks was created")
1097
1098	idx := h.indexRange(0, 1500)
1099	var (
1100		lset   labels.Labels
1101		chunks []chunks.Meta
1102	)
1103	require.NoError(t, idx.Series(1, &lset, &chunks))
1104
1105	require.Equal(t, labels.Labels{{
1106		Name: "a", Value: "1",
1107	}}, lset)
1108	require.Equal(t, 2, len(chunks))
1109
1110	cr, err := h.chunksRange(0, 1500, nil)
1111	require.NoError(t, err)
1112	_, err = cr.Chunk(chunks[0].Ref)
1113	require.NoError(t, err)
1114	_, err = cr.Chunk(chunks[1].Ref)
1115	require.NoError(t, err)
1116
1117	require.NoError(t, h.Truncate(1500)) // Remove a chunk.
1118
1119	_, err = cr.Chunk(chunks[0].Ref)
1120	require.Equal(t, storage.ErrNotFound, err)
1121	_, err = cr.Chunk(chunks[1].Ref)
1122	require.NoError(t, err)
1123}
1124
1125func TestGCSeriesAccess(t *testing.T) {
1126	// Put a series, select it. GC it and then access it.
1127	h, _ := newTestHead(t, 1000, false)
1128	defer func() {
1129		require.NoError(t, h.Close())
1130	}()
1131
1132	h.initTime(0)
1133
1134	s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
1135
1136	// Appending 2 samples for the first chunk.
1137	ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
1138	require.True(t, ok, "series append failed")
1139	require.True(t, chunkCreated, "chunks was not created")
1140	ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
1141	require.True(t, ok, "series append failed")
1142	require.False(t, chunkCreated, "chunks was created")
1143
1144	// A new chunks should be created here as it's beyond the chunk range.
1145	ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
1146	require.True(t, ok, "series append failed")
1147	require.True(t, chunkCreated, "chunks was not created")
1148	ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
1149	require.True(t, ok, "series append failed")
1150	require.False(t, chunkCreated, "chunks was created")
1151
1152	idx := h.indexRange(0, 2000)
1153	var (
1154		lset   labels.Labels
1155		chunks []chunks.Meta
1156	)
1157	require.NoError(t, idx.Series(1, &lset, &chunks))
1158
1159	require.Equal(t, labels.Labels{{
1160		Name: "a", Value: "1",
1161	}}, lset)
1162	require.Equal(t, 2, len(chunks))
1163
1164	cr, err := h.chunksRange(0, 2000, nil)
1165	require.NoError(t, err)
1166	_, err = cr.Chunk(chunks[0].Ref)
1167	require.NoError(t, err)
1168	_, err = cr.Chunk(chunks[1].Ref)
1169	require.NoError(t, err)
1170
1171	require.NoError(t, h.Truncate(2000)) // Remove the series.
1172
1173	require.Equal(t, (*memSeries)(nil), h.series.getByID(1))
1174
1175	_, err = cr.Chunk(chunks[0].Ref)
1176	require.Equal(t, storage.ErrNotFound, err)
1177	_, err = cr.Chunk(chunks[1].Ref)
1178	require.Equal(t, storage.ErrNotFound, err)
1179}
1180
1181func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
1182	h, _ := newTestHead(t, 1000, false)
1183	defer func() {
1184		require.NoError(t, h.Close())
1185	}()
1186
1187	h.initTime(0)
1188
1189	app := h.appender()
1190	lset := labels.FromStrings("a", "1")
1191	_, err := app.Append(0, lset, 2100, 1)
1192	require.NoError(t, err)
1193
1194	require.NoError(t, h.Truncate(2000))
1195	require.NotNil(t, h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
1196
1197	require.NoError(t, app.Commit())
1198
1199	q, err := NewBlockQuerier(h, 1500, 2500)
1200	require.NoError(t, err)
1201	defer q.Close()
1202
1203	ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
1204	require.Equal(t, true, ss.Next())
1205	for ss.Next() {
1206	}
1207	require.NoError(t, ss.Err())
1208	require.Equal(t, 0, len(ss.Warnings()))
1209}
1210
1211func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
1212	h, _ := newTestHead(t, 1000, false)
1213	defer func() {
1214		require.NoError(t, h.Close())
1215	}()
1216
1217	h.initTime(0)
1218
1219	app := h.appender()
1220	lset := labels.FromStrings("a", "1")
1221	_, err := app.Append(0, lset, 2100, 1)
1222	require.NoError(t, err)
1223
1224	require.NoError(t, h.Truncate(2000))
1225	require.NotNil(t, h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected")
1226
1227	require.NoError(t, app.Rollback())
1228
1229	q, err := NewBlockQuerier(h, 1500, 2500)
1230	require.NoError(t, err)
1231	defer q.Close()
1232
1233	ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
1234	require.Equal(t, false, ss.Next())
1235	require.Equal(t, 0, len(ss.Warnings()))
1236
1237	// Truncate again, this time the series should be deleted
1238	require.NoError(t, h.Truncate(2050))
1239	require.Equal(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset))
1240}
1241
1242func TestHead_LogRollback(t *testing.T) {
1243	for _, compress := range []bool{false, true} {
1244		t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
1245			h, w := newTestHead(t, 1000, compress)
1246			defer func() {
1247				require.NoError(t, h.Close())
1248			}()
1249
1250			app := h.Appender(context.Background())
1251			_, err := app.Append(0, labels.FromStrings("a", "b"), 1, 2)
1252			require.NoError(t, err)
1253
1254			require.NoError(t, app.Rollback())
1255			recs := readTestWAL(t, w.Dir())
1256
1257			require.Equal(t, 1, len(recs))
1258
1259			series, ok := recs[0].([]record.RefSeries)
1260			require.True(t, ok, "expected series record but got %+v", recs[0])
1261			require.Equal(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series)
1262		})
1263	}
1264}
1265
1266// TestWalRepair_DecodingError ensures that a repair is run for an error
1267// when decoding a record.
1268func TestWalRepair_DecodingError(t *testing.T) {
1269	var enc record.Encoder
1270	for name, test := range map[string]struct {
1271		corrFunc  func(rec []byte) []byte // Func that applies the corruption to a record.
1272		rec       []byte
1273		totalRecs int
1274		expRecs   int
1275	}{
1276		"decode_series": {
1277			func(rec []byte) []byte {
1278				return rec[:3]
1279			},
1280			enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}),
1281			9,
1282			5,
1283		},
1284		"decode_samples": {
1285			func(rec []byte) []byte {
1286				return rec[:3]
1287			},
1288			enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}),
1289			9,
1290			5,
1291		},
1292		"decode_tombstone": {
1293			func(rec []byte) []byte {
1294				return rec[:3]
1295			},
1296			enc.Tombstones([]tombstones.Stone{{Ref: 1, Intervals: tombstones.Intervals{}}}, []byte{}),
1297			9,
1298			5,
1299		},
1300	} {
1301		for _, compress := range []bool{false, true} {
1302			t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) {
1303				dir, err := ioutil.TempDir("", "wal_repair")
1304				require.NoError(t, err)
1305				defer func() {
1306					require.NoError(t, os.RemoveAll(dir))
1307				}()
1308
1309				// Fill the wal and corrupt it.
1310				{
1311					w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress)
1312					require.NoError(t, err)
1313
1314					for i := 1; i <= test.totalRecs; i++ {
1315						// At this point insert a corrupted record.
1316						if i-1 == test.expRecs {
1317							require.NoError(t, w.Log(test.corrFunc(test.rec)))
1318							continue
1319						}
1320						require.NoError(t, w.Log(test.rec))
1321					}
1322
1323					opts := DefaultHeadOptions()
1324					opts.ChunkRange = 1
1325					opts.ChunkDirRoot = w.Dir()
1326					h, err := NewHead(nil, nil, w, opts, nil)
1327					require.NoError(t, err)
1328					require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal))
1329					initErr := h.Init(math.MinInt64)
1330
1331					err = errors.Cause(initErr) // So that we can pick up errors even if wrapped.
1332					_, corrErr := err.(*wal.CorruptionErr)
1333					require.True(t, corrErr, "reading the wal didn't return corruption error")
1334					require.NoError(t, w.Close())
1335				}
1336
1337				// Open the db to trigger a repair.
1338				{
1339					db, err := Open(dir, nil, nil, DefaultOptions(), nil)
1340					require.NoError(t, err)
1341					defer func() {
1342						require.NoError(t, db.Close())
1343					}()
1344					require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal))
1345				}
1346
1347				// Read the wal content after the repair.
1348				{
1349					sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal"))
1350					require.NoError(t, err)
1351					defer sr.Close()
1352					r := wal.NewReader(sr)
1353
1354					var actRec int
1355					for r.Next() {
1356						actRec++
1357					}
1358					require.NoError(t, r.Err())
1359					require.Equal(t, test.expRecs, actRec, "Wrong number of intact records")
1360				}
1361			})
1362		}
1363	}
1364}
1365
1366func TestHeadReadWriterRepair(t *testing.T) {
1367	dir, err := ioutil.TempDir("", "head_read_writer_repair")
1368	require.NoError(t, err)
1369	defer func() {
1370		require.NoError(t, os.RemoveAll(dir))
1371	}()
1372
1373	const chunkRange = 1000
1374
1375	walDir := filepath.Join(dir, "wal")
1376	// Fill the chunk segments and corrupt it.
1377	{
1378		w, err := wal.New(nil, nil, walDir, false)
1379		require.NoError(t, err)
1380
1381		opts := DefaultHeadOptions()
1382		opts.ChunkRange = chunkRange
1383		opts.ChunkDirRoot = dir
1384		h, err := NewHead(nil, nil, w, opts, nil)
1385		require.NoError(t, err)
1386		require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
1387		require.NoError(t, h.Init(math.MinInt64))
1388
1389		s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
1390		require.True(t, created, "series was not created")
1391
1392		for i := 0; i < 7; i++ {
1393			ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper)
1394			require.True(t, ok, "series append failed")
1395			require.True(t, chunkCreated, "chunk was not created")
1396			ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
1397			require.True(t, ok, "series append failed")
1398			require.False(t, chunkCreated, "chunk was created")
1399			require.NoError(t, h.chunkDiskMapper.CutNewFile())
1400		}
1401		require.NoError(t, h.Close())
1402
1403		// Verify that there are 7 segment files.
1404		files, err := ioutil.ReadDir(mmappedChunksDir(dir))
1405		require.NoError(t, err)
1406		require.Equal(t, 7, len(files))
1407
1408		// Corrupt the 4th file by writing a random byte to series ref.
1409		f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666)
1410		require.NoError(t, err)
1411		n, err := f.WriteAt([]byte{67, 88}, chunks.HeadChunkFileHeaderSize+2)
1412		require.NoError(t, err)
1413		require.Equal(t, 2, n)
1414		require.NoError(t, f.Close())
1415	}
1416
1417	// Open the db to trigger a repair.
1418	{
1419		db, err := Open(dir, nil, nil, DefaultOptions(), nil)
1420		require.NoError(t, err)
1421		defer func() {
1422			require.NoError(t, db.Close())
1423		}()
1424		require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.mmapChunkCorruptionTotal))
1425	}
1426
1427	// Verify that there are 3 segment files after the repair.
1428	// The segments from the corrupt segment should be removed.
1429	{
1430		files, err := ioutil.ReadDir(mmappedChunksDir(dir))
1431		require.NoError(t, err)
1432		require.Equal(t, 3, len(files))
1433	}
1434}
1435
1436func TestNewWalSegmentOnTruncate(t *testing.T) {
1437	h, wlog := newTestHead(t, 1000, false)
1438	defer func() {
1439		require.NoError(t, h.Close())
1440	}()
1441	add := func(ts int64) {
1442		app := h.Appender(context.Background())
1443		_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts, 0)
1444		require.NoError(t, err)
1445		require.NoError(t, app.Commit())
1446	}
1447
1448	add(0)
1449	_, last, err := wal.Segments(wlog.Dir())
1450	require.NoError(t, err)
1451	require.Equal(t, 0, last)
1452
1453	add(1)
1454	require.NoError(t, h.Truncate(1))
1455	_, last, err = wal.Segments(wlog.Dir())
1456	require.NoError(t, err)
1457	require.Equal(t, 1, last)
1458
1459	add(2)
1460	require.NoError(t, h.Truncate(2))
1461	_, last, err = wal.Segments(wlog.Dir())
1462	require.NoError(t, err)
1463	require.Equal(t, 2, last)
1464}
1465
1466func TestAddDuplicateLabelName(t *testing.T) {
1467	h, _ := newTestHead(t, 1000, false)
1468	defer func() {
1469		require.NoError(t, h.Close())
1470	}()
1471
1472	add := func(labels labels.Labels, labelName string) {
1473		app := h.Appender(context.Background())
1474		_, err := app.Append(0, labels, 0, 0)
1475		require.Error(t, err)
1476		require.Equal(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
1477	}
1478
1479	add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a")
1480	add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a")
1481	add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le")
1482}
1483
1484func TestMemSeriesIsolation(t *testing.T) {
1485	// Put a series, select it. GC it and then access it.
1486	lastValue := func(h *Head, maxAppendID uint64) int {
1487		idx, err := h.Index()
1488
1489		require.NoError(t, err)
1490
1491		iso := h.iso.State()
1492		iso.maxAppendID = maxAppendID
1493
1494		chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso)
1495		require.NoError(t, err)
1496		// Hm.. here direct block chunk querier might be required?
1497		querier := blockQuerier{
1498			blockBaseQuerier: &blockBaseQuerier{
1499				index:      idx,
1500				chunks:     chunks,
1501				tombstones: tombstones.NewMemTombstones(),
1502
1503				mint: 0,
1504				maxt: 10000,
1505			},
1506		}
1507
1508		require.NoError(t, err)
1509		defer querier.Close()
1510
1511		ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
1512		_, seriesSet, ws, err := expandSeriesSet(ss)
1513		require.NoError(t, err)
1514		require.Equal(t, 0, len(ws))
1515
1516		for _, series := range seriesSet {
1517			return int(series[len(series)-1].v)
1518		}
1519		return -1
1520	}
1521
1522	addSamples := func(h *Head) int {
1523		i := 1
1524		for ; i <= 1000; i++ {
1525			var app storage.Appender
1526			// To initialize bounds.
1527			if h.MinTime() == math.MaxInt64 {
1528				app = &initAppender{head: h}
1529			} else {
1530				a := h.appender()
1531				a.cleanupAppendIDsBelow = 0
1532				app = a
1533			}
1534
1535			_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1536			require.NoError(t, err)
1537			require.NoError(t, app.Commit())
1538		}
1539		return i
1540	}
1541
1542	testIsolation := func(h *Head, i int) {
1543	}
1544
1545	// Test isolation without restart of Head.
1546	hb, _ := newTestHead(t, 1000, false)
1547	i := addSamples(hb)
1548	testIsolation(hb, i)
1549
1550	// Test simple cases in different chunks when no appendID cleanup has been performed.
1551	require.Equal(t, 10, lastValue(hb, 10))
1552	require.Equal(t, 130, lastValue(hb, 130))
1553	require.Equal(t, 160, lastValue(hb, 160))
1554	require.Equal(t, 240, lastValue(hb, 240))
1555	require.Equal(t, 500, lastValue(hb, 500))
1556	require.Equal(t, 750, lastValue(hb, 750))
1557	require.Equal(t, 995, lastValue(hb, 995))
1558	require.Equal(t, 999, lastValue(hb, 999))
1559
1560	// Cleanup appendIDs below 500.
1561	app := hb.appender()
1562	app.cleanupAppendIDsBelow = 500
1563	_, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1564	require.NoError(t, err)
1565	require.NoError(t, app.Commit())
1566	i++
1567
1568	// We should not get queries with a maxAppendID below 500 after the cleanup,
1569	// but they only take the remaining appendIDs into account.
1570	require.Equal(t, 499, lastValue(hb, 10))
1571	require.Equal(t, 499, lastValue(hb, 130))
1572	require.Equal(t, 499, lastValue(hb, 160))
1573	require.Equal(t, 499, lastValue(hb, 240))
1574	require.Equal(t, 500, lastValue(hb, 500))
1575	require.Equal(t, 995, lastValue(hb, 995))
1576	require.Equal(t, 999, lastValue(hb, 999))
1577
1578	// Cleanup appendIDs below 1000, which means the sample buffer is
1579	// the only thing with appendIDs.
1580	app = hb.appender()
1581	app.cleanupAppendIDsBelow = 1000
1582	_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1583	require.NoError(t, err)
1584	require.NoError(t, app.Commit())
1585	require.Equal(t, 999, lastValue(hb, 998))
1586	require.Equal(t, 999, lastValue(hb, 999))
1587	require.Equal(t, 1000, lastValue(hb, 1000))
1588	require.Equal(t, 1001, lastValue(hb, 1001))
1589	require.Equal(t, 1002, lastValue(hb, 1002))
1590	require.Equal(t, 1002, lastValue(hb, 1003))
1591
1592	i++
1593	// Cleanup appendIDs below 1001, but with a rollback.
1594	app = hb.appender()
1595	app.cleanupAppendIDsBelow = 1001
1596	_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1597	require.NoError(t, err)
1598	require.NoError(t, app.Rollback())
1599	require.Equal(t, 1000, lastValue(hb, 999))
1600	require.Equal(t, 1000, lastValue(hb, 1000))
1601	require.Equal(t, 1001, lastValue(hb, 1001))
1602	require.Equal(t, 1002, lastValue(hb, 1002))
1603	require.Equal(t, 1002, lastValue(hb, 1003))
1604
1605	require.NoError(t, hb.Close())
1606
1607	// Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay.
1608	hb, w := newTestHead(t, 1000, false)
1609	i = addSamples(hb)
1610	require.NoError(t, hb.Close())
1611
1612	wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false)
1613	require.NoError(t, err)
1614	opts := DefaultHeadOptions()
1615	opts.ChunkRange = 1000
1616	opts.ChunkDirRoot = wlog.Dir()
1617	hb, err = NewHead(nil, nil, wlog, opts, nil)
1618	defer func() { require.NoError(t, hb.Close()) }()
1619	require.NoError(t, err)
1620	require.NoError(t, hb.Init(0))
1621
1622	// No appends after restarting. Hence all should return the last value.
1623	require.Equal(t, 1000, lastValue(hb, 10))
1624	require.Equal(t, 1000, lastValue(hb, 130))
1625	require.Equal(t, 1000, lastValue(hb, 160))
1626	require.Equal(t, 1000, lastValue(hb, 240))
1627	require.Equal(t, 1000, lastValue(hb, 500))
1628
1629	// Cleanup appendIDs below 1000, which means the sample buffer is
1630	// the only thing with appendIDs.
1631	app = hb.appender()
1632	_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1633	i++
1634	require.NoError(t, err)
1635	require.NoError(t, app.Commit())
1636	require.Equal(t, 1001, lastValue(hb, 998))
1637	require.Equal(t, 1001, lastValue(hb, 999))
1638	require.Equal(t, 1001, lastValue(hb, 1000))
1639	require.Equal(t, 1001, lastValue(hb, 1001))
1640	require.Equal(t, 1001, lastValue(hb, 1002))
1641	require.Equal(t, 1001, lastValue(hb, 1003))
1642
1643	// Cleanup appendIDs below 1002, but with a rollback.
1644	app = hb.appender()
1645	_, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i))
1646	require.NoError(t, err)
1647	require.NoError(t, app.Rollback())
1648	require.Equal(t, 1001, lastValue(hb, 999))
1649	require.Equal(t, 1001, lastValue(hb, 1000))
1650	require.Equal(t, 1001, lastValue(hb, 1001))
1651	require.Equal(t, 1001, lastValue(hb, 1002))
1652	require.Equal(t, 1001, lastValue(hb, 1003))
1653}
1654
1655func TestIsolationRollback(t *testing.T) {
1656	// Rollback after a failed append and test if the low watermark has progressed anyway.
1657	hb, _ := newTestHead(t, 1000, false)
1658	defer func() {
1659		require.NoError(t, hb.Close())
1660	}()
1661
1662	app := hb.Appender(context.Background())
1663	_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
1664	require.NoError(t, err)
1665	require.NoError(t, app.Commit())
1666	require.Equal(t, uint64(1), hb.iso.lowWatermark())
1667
1668	app = hb.Appender(context.Background())
1669	_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
1670	require.NoError(t, err)
1671	_, err = app.Append(0, labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2)
1672	require.Error(t, err)
1673	require.NoError(t, app.Rollback())
1674	require.Equal(t, uint64(2), hb.iso.lowWatermark())
1675
1676	app = hb.Appender(context.Background())
1677	_, err = app.Append(0, labels.FromStrings("foo", "bar"), 3, 3)
1678	require.NoError(t, err)
1679	require.NoError(t, app.Commit())
1680	require.Equal(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.")
1681}
1682
1683func TestIsolationLowWatermarkMonotonous(t *testing.T) {
1684	hb, _ := newTestHead(t, 1000, false)
1685	defer func() {
1686		require.NoError(t, hb.Close())
1687	}()
1688
1689	app1 := hb.Appender(context.Background())
1690	_, err := app1.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
1691	require.NoError(t, err)
1692	require.NoError(t, app1.Commit())
1693	require.Equal(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.")
1694
1695	app1 = hb.Appender(context.Background())
1696	_, err = app1.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
1697	require.NoError(t, err)
1698	require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.")
1699
1700	app2 := hb.Appender(context.Background())
1701	_, err = app2.Append(0, labels.FromStrings("foo", "baz"), 1, 1)
1702	require.NoError(t, err)
1703	require.NoError(t, app2.Commit())
1704	require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.")
1705
1706	is := hb.iso.State()
1707	require.Equal(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.")
1708
1709	require.NoError(t, app1.Commit())
1710	require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is committed, low watermark should stay at 2 because read is still ongoing.")
1711
1712	is.Close()
1713	require.Equal(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.")
1714}
1715
1716func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
1717	h, _ := newTestHead(t, 1000, false)
1718	defer func() {
1719		require.NoError(t, h.Close())
1720	}()
1721
1722	h.initTime(0)
1723
1724	s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
1725
1726	ok, _ := s.append(0, 0, 0, h.chunkDiskMapper)
1727	require.True(t, ok, "Series append failed.")
1728	require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
1729}
1730
1731func TestHeadSeriesChunkRace(t *testing.T) {
1732	for i := 0; i < 1000; i++ {
1733		testHeadSeriesChunkRace(t)
1734	}
1735}
1736
1737func TestIsolationWithoutAdd(t *testing.T) {
1738	hb, _ := newTestHead(t, 1000, false)
1739	defer func() {
1740		require.NoError(t, hb.Close())
1741	}()
1742
1743	app := hb.Appender(context.Background())
1744	require.NoError(t, app.Commit())
1745
1746	app = hb.Appender(context.Background())
1747	_, err := app.Append(0, labels.FromStrings("foo", "baz"), 1, 1)
1748	require.NoError(t, err)
1749	require.NoError(t, app.Commit())
1750
1751	require.Equal(t, hb.iso.lastAppendID(), hb.iso.lowWatermark(), "High watermark should be equal to the low watermark")
1752}
1753
1754func TestOutOfOrderSamplesMetric(t *testing.T) {
1755	dir, err := ioutil.TempDir("", "test")
1756	require.NoError(t, err)
1757	defer func() {
1758		require.NoError(t, os.RemoveAll(dir))
1759	}()
1760
1761	db, err := Open(dir, nil, nil, DefaultOptions(), nil)
1762	require.NoError(t, err)
1763	defer func() {
1764		require.NoError(t, db.Close())
1765	}()
1766	db.DisableCompactions()
1767
1768	ctx := context.Background()
1769	app := db.Appender(ctx)
1770	for i := 1; i <= 5; i++ {
1771		_, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99)
1772		require.NoError(t, err)
1773	}
1774	require.NoError(t, app.Commit())
1775
1776	// Test out of order metric.
1777	require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1778	app = db.Appender(ctx)
1779	_, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99)
1780	require.Equal(t, storage.ErrOutOfOrderSample, err)
1781	require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1782
1783	_, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99)
1784	require.Equal(t, storage.ErrOutOfOrderSample, err)
1785	require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1786
1787	_, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99)
1788	require.Equal(t, storage.ErrOutOfOrderSample, err)
1789	require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1790	require.NoError(t, app.Commit())
1791
1792	// Compact Head to test out of bound metric.
1793	app = db.Appender(ctx)
1794	_, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99)
1795	require.NoError(t, err)
1796	require.NoError(t, app.Commit())
1797
1798	require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load())
1799	require.NoError(t, db.Compact())
1800	require.Greater(t, db.head.minValidTime.Load(), int64(0))
1801
1802	app = db.Appender(ctx)
1803	_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99)
1804	require.Equal(t, storage.ErrOutOfBounds, err)
1805	require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
1806
1807	_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99)
1808	require.Equal(t, storage.ErrOutOfBounds, err)
1809	require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples))
1810	require.NoError(t, app.Commit())
1811
1812	// Some more valid samples for out of order.
1813	app = db.Appender(ctx)
1814	for i := 1; i <= 5; i++ {
1815		_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99)
1816		require.NoError(t, err)
1817	}
1818	require.NoError(t, app.Commit())
1819
1820	// Test out of order metric.
1821	app = db.Appender(ctx)
1822	_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99)
1823	require.Equal(t, storage.ErrOutOfOrderSample, err)
1824	require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1825
1826	_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99)
1827	require.Equal(t, storage.ErrOutOfOrderSample, err)
1828	require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1829
1830	_, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99)
1831	require.Equal(t, storage.ErrOutOfOrderSample, err)
1832	require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples))
1833	require.NoError(t, app.Commit())
1834}
1835
1836func testHeadSeriesChunkRace(t *testing.T) {
1837	h, _ := newTestHead(t, 1000, false)
1838	defer func() {
1839		require.NoError(t, h.Close())
1840	}()
1841	require.NoError(t, h.Init(0))
1842	app := h.Appender(context.Background())
1843
1844	s2, err := app.Append(0, labels.FromStrings("foo2", "bar"), 5, 0)
1845	require.NoError(t, err)
1846	for ts := int64(6); ts < 11; ts++ {
1847		_, err = app.Append(s2, nil, ts, 0)
1848		require.NoError(t, err)
1849	}
1850	require.NoError(t, app.Commit())
1851
1852	var wg sync.WaitGroup
1853	matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
1854	q, err := NewBlockQuerier(h, 18, 22)
1855	require.NoError(t, err)
1856	defer q.Close()
1857
1858	wg.Add(1)
1859	go func() {
1860		h.updateMinMaxTime(20, 25)
1861		h.gc()
1862		wg.Done()
1863	}()
1864	ss := q.Select(false, nil, matcher)
1865	for ss.Next() {
1866	}
1867	require.NoError(t, ss.Err())
1868	wg.Wait()
1869}
1870
1871func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
1872	head, _ := newTestHead(t, 1000, false)
1873	defer func() {
1874		require.NoError(t, head.Close())
1875	}()
1876
1877	const (
1878		firstSeriesTimestamp  int64 = 100
1879		secondSeriesTimestamp int64 = 200
1880		lastSeriesTimestamp   int64 = 300
1881	)
1882	var (
1883		seriesTimestamps = []int64{firstSeriesTimestamp,
1884			secondSeriesTimestamp,
1885			lastSeriesTimestamp,
1886		}
1887		expectedLabelNames  = []string{"a", "b", "c"}
1888		expectedLabelValues = []string{"d", "e", "f"}
1889	)
1890
1891	app := head.Appender(context.Background())
1892	for i, name := range expectedLabelNames {
1893		_, err := app.Append(0, labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0)
1894		require.NoError(t, err)
1895	}
1896	require.NoError(t, app.Commit())
1897	require.Equal(t, head.MinTime(), firstSeriesTimestamp)
1898	require.Equal(t, head.MaxTime(), lastSeriesTimestamp)
1899
1900	var testCases = []struct {
1901		name           string
1902		mint           int64
1903		maxt           int64
1904		expectedNames  []string
1905		expectedValues []string
1906	}{
1907		{"maxt less than head min", head.MaxTime() - 10, head.MinTime() - 10, []string{}, []string{}},
1908		{"mint less than head max", head.MaxTime() + 10, head.MinTime() + 10, []string{}, []string{}},
1909		{"mint and maxt outside head", head.MaxTime() + 10, head.MinTime() - 10, []string{}, []string{}},
1910		{"mint and maxt within head", head.MaxTime() - 10, head.MinTime() + 10, expectedLabelNames, expectedLabelValues},
1911	}
1912
1913	for _, tt := range testCases {
1914		t.Run(tt.name, func(t *testing.T) {
1915			headIdxReader := head.indexRange(tt.mint, tt.maxt)
1916			actualLabelNames, err := headIdxReader.LabelNames()
1917			require.NoError(t, err)
1918			require.Equal(t, tt.expectedNames, actualLabelNames)
1919			if len(tt.expectedValues) > 0 {
1920				for i, name := range expectedLabelNames {
1921					actualLabelValue, err := headIdxReader.SortedLabelValues(name)
1922					require.NoError(t, err)
1923					require.Equal(t, []string{tt.expectedValues[i]}, actualLabelValue)
1924				}
1925			}
1926		})
1927	}
1928}
1929
1930func TestHeadLabelValuesWithMatchers(t *testing.T) {
1931	head, _ := newTestHead(t, 1000, false)
1932	defer func() {
1933		require.NoError(t, head.Close())
1934	}()
1935
1936	app := head.Appender(context.Background())
1937	for i := 0; i < 100; i++ {
1938		_, err := app.Append(0, labels.Labels{
1939			{Name: "unique", Value: fmt.Sprintf("value%d", i)},
1940			{Name: "tens", Value: fmt.Sprintf("value%d", i/10)},
1941		}, 100, 0)
1942		require.NoError(t, err)
1943	}
1944	require.NoError(t, app.Commit())
1945
1946	var testCases = []struct {
1947		name           string
1948		labelName      string
1949		matchers       []*labels.Matcher
1950		expectedValues []string
1951	}{
1952		{
1953			name:           "get tens based on unique id",
1954			labelName:      "tens",
1955			matchers:       []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")},
1956			expectedValues: []string{"value3"},
1957		}, {
1958			name:           "get unique ids based on a ten",
1959			labelName:      "unique",
1960			matchers:       []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")},
1961			expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"},
1962		}, {
1963			name:           "get tens by pattern matching on unique id",
1964			labelName:      "tens",
1965			matchers:       []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
1966			expectedValues: []string{"value5", "value6", "value7"},
1967		}, {
1968			name:           "get tens by matching for absence of unique label",
1969			labelName:      "tens",
1970			matchers:       []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
1971			expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
1972		},
1973	}
1974
1975	for _, tt := range testCases {
1976		t.Run(tt.name, func(t *testing.T) {
1977			headIdxReader := head.indexRange(0, 200)
1978
1979			actualValues, err := headIdxReader.SortedLabelValues(tt.labelName, tt.matchers...)
1980			require.NoError(t, err)
1981			require.Equal(t, tt.expectedValues, actualValues)
1982
1983			actualValues, err = headIdxReader.LabelValues(tt.labelName, tt.matchers...)
1984			sort.Strings(actualValues)
1985			require.NoError(t, err)
1986			require.Equal(t, tt.expectedValues, actualValues)
1987		})
1988	}
1989}
1990
1991func TestErrReuseAppender(t *testing.T) {
1992	head, _ := newTestHead(t, 1000, false)
1993	defer func() {
1994		require.NoError(t, head.Close())
1995	}()
1996
1997	app := head.Appender(context.Background())
1998	_, err := app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 0, 0)
1999	require.NoError(t, err)
2000	require.NoError(t, app.Commit())
2001	require.Error(t, app.Commit())
2002	require.Error(t, app.Rollback())
2003
2004	app = head.Appender(context.Background())
2005	_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 1, 0)
2006	require.NoError(t, err)
2007	require.NoError(t, app.Rollback())
2008	require.Error(t, app.Rollback())
2009	require.Error(t, app.Commit())
2010
2011	app = head.Appender(context.Background())
2012	_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 2, 0)
2013	require.NoError(t, err)
2014	require.NoError(t, app.Commit())
2015	require.Error(t, app.Rollback())
2016	require.Error(t, app.Commit())
2017
2018	app = head.Appender(context.Background())
2019	_, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 3, 0)
2020	require.NoError(t, err)
2021	require.NoError(t, app.Rollback())
2022	require.Error(t, app.Commit())
2023	require.Error(t, app.Rollback())
2024}
2025
2026func TestHeadMintAfterTruncation(t *testing.T) {
2027	chunkRange := int64(2000)
2028	head, _ := newTestHead(t, chunkRange, false)
2029
2030	app := head.Appender(context.Background())
2031	_, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
2032	require.NoError(t, err)
2033	_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 4000, 200)
2034	require.NoError(t, err)
2035	_, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 8000, 300)
2036	require.NoError(t, err)
2037	require.NoError(t, app.Commit())
2038
2039	// Truncating outside the appendable window and actual mint being outside
2040	// appendable window should leave mint at the actual mint.
2041	require.NoError(t, head.Truncate(3500))
2042	require.Equal(t, int64(4000), head.MinTime())
2043	require.Equal(t, int64(4000), head.minValidTime.Load())
2044
2045	// After truncation outside the appendable window if the actual min time
2046	// is in the appendable window then we should leave mint at the start of appendable window.
2047	require.NoError(t, head.Truncate(5000))
2048	require.Equal(t, head.appendableMinValidTime(), head.MinTime())
2049	require.Equal(t, head.appendableMinValidTime(), head.minValidTime.Load())
2050
2051	// If the truncation time is inside the appendable window, then the min time
2052	// should be the truncation time.
2053	require.NoError(t, head.Truncate(7500))
2054	require.Equal(t, int64(7500), head.MinTime())
2055	require.Equal(t, int64(7500), head.minValidTime.Load())
2056
2057	require.NoError(t, head.Close())
2058}
2059
2060func TestHeadExemplars(t *testing.T) {
2061	chunkRange := int64(2000)
2062	head, _ := newTestHead(t, chunkRange, false)
2063	app := head.Appender(context.Background())
2064
2065	l := labels.FromStrings("traceId", "123")
2066	// It is perfectly valid to add Exemplars before the current start time -
2067	// histogram buckets that haven't been update in a while could still be
2068	// exported exemplars from an hour ago.
2069	ref, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100)
2070	require.NoError(t, err)
2071	_, err = app.AppendExemplar(ref, l, exemplar.Exemplar{
2072		Labels: l,
2073		HasTs:  true,
2074		Ts:     -1000,
2075		Value:  1,
2076	})
2077	require.NoError(t, err)
2078	require.NoError(t, app.Commit())
2079	require.NoError(t, head.Close())
2080}
2081
2082func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
2083	chunkRange := int64(2000)
2084	head, _ := newTestHead(b, chunkRange, false)
2085	b.Cleanup(func() { require.NoError(b, head.Close()) })
2086
2087	app := head.Appender(context.Background())
2088
2089	metricCount := 1000000
2090	for i := 0; i < metricCount; i++ {
2091		_, err := app.Append(0, labels.Labels{
2092			{Name: "unique", Value: fmt.Sprintf("value%d", i)},
2093			{Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))},
2094			{Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1"
2095		}, 100, 0)
2096		require.NoError(b, err)
2097	}
2098	require.NoError(b, app.Commit())
2099
2100	headIdxReader := head.indexRange(0, 200)
2101	matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")}
2102
2103	b.ResetTimer()
2104	b.ReportAllocs()
2105
2106	for benchIdx := 0; benchIdx < b.N; benchIdx++ {
2107		actualValues, err := headIdxReader.LabelValues("tens", matchers...)
2108		require.NoError(b, err)
2109		require.Equal(b, 9, len(actualValues))
2110	}
2111}
2112
2113func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
2114	dir, err := ioutil.TempDir("", "iterator_seek")
2115	require.NoError(t, err)
2116	defer func() {
2117		require.NoError(t, os.RemoveAll(dir))
2118	}()
2119	// This is usually taken from the Head, but passing manually here.
2120	chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
2121	require.NoError(t, err)
2122	defer func() {
2123		require.NoError(t, chunkDiskMapper.Close())
2124	}()
2125
2126	s := newMemSeries(labels.Labels{}, 1, 500, nil)
2127
2128	for i := 0; i < 7; i++ {
2129		ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
2130		require.True(t, ok, "sample append failed")
2131	}
2132
2133	it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil)
2134	_, ok := it.(*memSafeIterator)
2135	require.True(t, ok)
2136
2137	// First point.
2138	ok = it.Seek(0)
2139	require.True(t, ok)
2140	ts, val := it.At()
2141	require.Equal(t, int64(0), ts)
2142	require.Equal(t, float64(0), val)
2143
2144	// Advance one point.
2145	ok = it.Next()
2146	require.True(t, ok)
2147	ts, val = it.At()
2148	require.Equal(t, int64(1), ts)
2149	require.Equal(t, float64(1), val)
2150
2151	// Seeking an older timestamp shouldn't cause the iterator to go backwards.
2152	ok = it.Seek(0)
2153	require.True(t, ok)
2154	ts, val = it.At()
2155	require.Equal(t, int64(1), ts)
2156	require.Equal(t, float64(1), val)
2157
2158	// Seek into the buffer.
2159	ok = it.Seek(3)
2160	require.True(t, ok)
2161	ts, val = it.At()
2162	require.Equal(t, int64(3), ts)
2163	require.Equal(t, float64(3), val)
2164
2165	// Iterate through the rest of the buffer.
2166	for i := 4; i < 7; i++ {
2167		ok = it.Next()
2168		require.True(t, ok)
2169		ts, val = it.At()
2170		require.Equal(t, int64(i), ts)
2171		require.Equal(t, float64(i), val)
2172	}
2173
2174	// Run out of elements in the iterator.
2175	ok = it.Next()
2176	require.False(t, ok)
2177	ok = it.Seek(7)
2178	require.False(t, ok)
2179}
2180