1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package indexheader
5
6import (
7	"context"
8	"fmt"
9	"io/ioutil"
10	"math"
11	"os"
12	"path/filepath"
13	"strconv"
14	"testing"
15
16	"github.com/go-kit/kit/log"
17	"github.com/oklog/ulid"
18	"github.com/pkg/errors"
19	"github.com/prometheus/prometheus/pkg/labels"
20	"github.com/prometheus/prometheus/tsdb/encoding"
21	"github.com/prometheus/prometheus/tsdb/fileutil"
22	"github.com/prometheus/prometheus/tsdb/index"
23
24	"github.com/thanos-io/thanos/pkg/block"
25	"github.com/thanos-io/thanos/pkg/block/metadata"
26	"github.com/thanos-io/thanos/pkg/objstore"
27	"github.com/thanos-io/thanos/pkg/objstore/filesystem"
28	"github.com/thanos-io/thanos/pkg/testutil"
29	"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
30)
31
32func TestReaders(t *testing.T) {
33	ctx := context.Background()
34
35	tmpDir, err := ioutil.TempDir("", "test-indexheader")
36	testutil.Ok(t, err)
37	defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
38
39	bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
40	testutil.Ok(t, err)
41	defer func() { testutil.Ok(t, bkt.Close()) }()
42
43	// Create block index version 2.
44	id1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
45		{{Name: "a", Value: "1"}},
46		{{Name: "a", Value: "2"}},
47		{{Name: "a", Value: "3"}},
48		{{Name: "a", Value: "4"}},
49		{{Name: "a", Value: "5"}},
50		{{Name: "a", Value: "6"}},
51		{{Name: "a", Value: "7"}},
52		{{Name: "a", Value: "8"}},
53		{{Name: "a", Value: "9"}},
54		// Missing 10 on purpose.
55		{{Name: "a", Value: "11"}},
56		{{Name: "a", Value: "12"}},
57		{{Name: "a", Value: "13"}},
58		{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
59		{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}},
60	}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
61	testutil.Ok(t, err)
62
63	testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc))
64
65	// Copy block index version 1 for backward compatibility.
66	/* The block here was produced at the commit
67	    706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with:
68	   db, _ := Open("v1db", nil, nil, nil)
69	   app := db.Appender()
70	   app.Add(labels.FromStrings("foo", "bar"), 1, 2)
71	   app.Add(labels.FromStrings("foo", "baz"), 3, 4)
72	   app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
73	   // Make sure we've enough values for the lack of sorting of postings offsets to show up.
74	   for i := 0; i < 100; i++ {
75	     app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0)
76	   }
77	   app.Commit()
78	   db.compact()
79	   db.Close()
80	*/
81
82	m, err := metadata.ReadFromDir("./testdata/index_format_v1")
83	testutil.Ok(t, err)
84	e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String()))
85
86	_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{
87		Labels:     labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
88		Downsample: metadata.ThanosDownsample{Resolution: 0},
89		Source:     metadata.TestSource,
90	}, &m.BlockMeta)
91	testutil.Ok(t, err)
92	testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc))
93
94	for _, id := range []ulid.ULID{id1, m.ULID} {
95		t.Run(id.String(), func(t *testing.T) {
96			indexFile, err := fileutil.OpenMmapFile(filepath.Join(tmpDir, id.String(), block.IndexFilename))
97			testutil.Ok(t, err)
98			defer func() { _ = indexFile.Close() }()
99
100			b := realByteSlice(indexFile.Bytes())
101
102			t.Run("binary reader", func(t *testing.T) {
103				fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
104				testutil.Ok(t, WriteBinary(ctx, bkt, id, fn))
105
106				br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3)
107				testutil.Ok(t, err)
108
109				defer func() { testutil.Ok(t, br.Close()) }()
110
111				if id == id1 {
112					testutil.Equals(t, 1, br.version)
113					testutil.Equals(t, 2, br.indexVersion)
114					testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc)
115					testutil.Equals(t, int64(710), br.indexLastPostingEnd)
116					testutil.Equals(t, 8, br.symbols.Size())
117					testutil.Equals(t, 0, len(br.postingsV1))
118					testutil.Equals(t, 2, len(br.nameSymbols))
119					testutil.Equals(t, map[string]*postingValueOffsets{
120						"": {
121							offsets:       []postingOffset{{value: "", tableOff: 4}},
122							lastValOffset: 440,
123						},
124						"a": {
125							offsets: []postingOffset{
126								{value: "1", tableOff: 9},
127								{value: "13", tableOff: 32},
128								{value: "4", tableOff: 54},
129								{value: "7", tableOff: 75},
130								{value: "9", tableOff: 89},
131							},
132							lastValOffset: 640,
133						},
134						"longer-string": {
135							offsets: []postingOffset{
136								{value: "1", tableOff: 96},
137								{value: "2", tableOff: 115},
138							},
139							lastValOffset: 706,
140						},
141					}, br.postings)
142
143					vals, err := br.LabelValues("not-existing")
144					testutil.Ok(t, err)
145					testutil.Equals(t, []string(nil), vals)
146
147					// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
148					// Most of not existing value was working despite bug, except in certain unlucky cases
149					// it was causing "invalid size" errors.
150					_, err = br.PostingsOffset("not-existing", "1")
151					testutil.Equals(t, NotFoundRangeErr, err)
152					_, err = br.PostingsOffset("a", "0")
153					testutil.Equals(t, NotFoundRangeErr, err)
154					// Unlucky case, because the bug was causing unnecessary read & decode requiring more bytes than
155					// available. For rest cases read was noop wrong, but at least not failing.
156					_, err = br.PostingsOffset("a", "10")
157					testutil.Equals(t, NotFoundRangeErr, err)
158					_, err = br.PostingsOffset("a", "121")
159					testutil.Equals(t, NotFoundRangeErr, err)
160					_, err = br.PostingsOffset("a", "131")
161					testutil.Equals(t, NotFoundRangeErr, err)
162					_, err = br.PostingsOffset("a", "91")
163					testutil.Equals(t, NotFoundRangeErr, err)
164					_, err = br.PostingsOffset("longer-string", "0")
165					testutil.Equals(t, NotFoundRangeErr, err)
166					_, err = br.PostingsOffset("longer-string", "11")
167					testutil.Equals(t, NotFoundRangeErr, err)
168					_, err = br.PostingsOffset("longer-string", "21")
169					testutil.Equals(t, NotFoundRangeErr, err)
170				}
171
172				compareIndexToHeader(t, b, br)
173			})
174
175			t.Run("lazy binary reader", func(t *testing.T) {
176				fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
177				testutil.Ok(t, WriteBinary(ctx, bkt, id, fn))
178
179				br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil)
180				testutil.Ok(t, err)
181
182				defer func() { testutil.Ok(t, br.Close()) }()
183
184				compareIndexToHeader(t, b, br)
185			})
186		})
187	}
188
189}
190
191func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) {
192	indexReader, err := index.NewReader(indexByteSlice)
193	testutil.Ok(t, err)
194	defer func() { _ = indexReader.Close() }()
195
196	actVersion, err := headerReader.IndexVersion()
197	testutil.Ok(t, err)
198	testutil.Equals(t, indexReader.Version(), actVersion)
199
200	if indexReader.Version() == index.FormatV2 {
201		// For v2 symbols ref sequential integers 0, 1, 2 etc.
202		iter := indexReader.Symbols()
203		i := 0
204		for iter.Next() {
205			r, err := headerReader.LookupSymbol(uint32(i))
206			testutil.Ok(t, err)
207			testutil.Equals(t, iter.At(), r)
208
209			i++
210		}
211		testutil.Ok(t, iter.Err())
212		_, err := headerReader.LookupSymbol(uint32(i))
213		testutil.NotOk(t, err)
214
215	} else {
216		// For v1 symbols refs are actual offsets in the index.
217		symbols, err := getSymbolTable(indexByteSlice)
218		testutil.Ok(t, err)
219
220		for refs, sym := range symbols {
221			r, err := headerReader.LookupSymbol(refs)
222			testutil.Ok(t, err)
223			testutil.Equals(t, sym, r)
224		}
225		_, err = headerReader.LookupSymbol(200000)
226		testutil.NotOk(t, err)
227	}
228
229	expLabelNames, err := indexReader.LabelNames()
230	testutil.Ok(t, err)
231	actualLabelNames, err := headerReader.LabelNames()
232	testutil.Ok(t, err)
233	testutil.Equals(t, expLabelNames, actualLabelNames)
234
235	expRanges, err := indexReader.PostingsRanges()
236	testutil.Ok(t, err)
237
238	minStart := int64(math.MaxInt64)
239	maxEnd := int64(math.MinInt64)
240	for il, lname := range expLabelNames {
241		expectedLabelVals, err := indexReader.SortedLabelValues(lname)
242		testutil.Ok(t, err)
243
244		vals, err := headerReader.LabelValues(lname)
245		testutil.Ok(t, err)
246		testutil.Equals(t, expectedLabelVals, vals)
247
248		for iv, v := range vals {
249			if minStart > expRanges[labels.Label{Name: lname, Value: v}].Start {
250				minStart = expRanges[labels.Label{Name: lname, Value: v}].Start
251			}
252			if maxEnd < expRanges[labels.Label{Name: lname, Value: v}].End {
253				maxEnd = expRanges[labels.Label{Name: lname, Value: v}].End
254			}
255
256			ptr, err := headerReader.PostingsOffset(lname, v)
257			testutil.Ok(t, err)
258
259			// For index-cache those values are exact.
260			//
261			// For binary they are exact except last item posting offset. It's good enough if the value is larger than exact posting ending.
262			if indexReader.Version() == index.FormatV2 {
263				if iv == len(vals)-1 && il == len(expLabelNames)-1 {
264					testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
265					testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
266					continue
267				}
268			} else {
269				// For index formatV1 the last one does not mean literally last value, as postings were not sorted.
270				// Account for that. We know it's 40 label value.
271				if v == "40" {
272					testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start)
273					testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End)
274					continue
275				}
276			}
277			testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr)
278		}
279	}
280
281	ptr, err := headerReader.PostingsOffset(index.AllPostingsKey())
282	testutil.Ok(t, err)
283	testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start)
284	testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End)
285}
286
287func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta {
288	/* Copy index 6MB block index version 2. It was generated via thanosbench. Meta.json:
289		{
290		"ulid": "01DRBP4RNVZ94135ZA6B10EMRR",
291		"minTime": 1570766415000,
292		"maxTime": 1570939215001,
293		"stats": {
294			"numSamples": 115210000,
295			"numSeries": 10000,
296			"numChunks": 990000
297		},
298		"compaction": {
299			"level": 1,
300			"sources": [
301				"01DRBP4RNVZ94135ZA6B10EMRR"
302			]
303		},
304		"version": 1,
305		"thanos": {
306			"labels": {
307				"cluster": "one",
308				"dataset": "continuous"
309			},
310			"downsample": {
311				"resolution": 0
312			},
313			"source": "blockgen"
314		}
315	}
316	*/
317
318	m, err := metadata.ReadFromDir("./testdata/index_format_v2")
319	testutil.Ok(t, err)
320	e2eutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String()))
321
322	_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{
323		Labels:     labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
324		Downsample: metadata.ThanosDownsample{Resolution: 0},
325		Source:     metadata.TestSource,
326	}, &m.BlockMeta)
327	testutil.Ok(t, err)
328	testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc))
329
330	return m
331}
332
333func BenchmarkBinaryWrite(t *testing.B) {
334	ctx := context.Background()
335
336	tmpDir, err := ioutil.TempDir("", "bench-indexheader")
337	testutil.Ok(t, err)
338	defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
339
340	bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
341	testutil.Ok(t, err)
342	defer func() { testutil.Ok(t, bkt.Close()) }()
343
344	m := prepareIndexV2Block(t, tmpDir, bkt)
345	fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename)
346
347	t.ResetTimer()
348	for i := 0; i < t.N; i++ {
349		testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn))
350	}
351}
352
353func BenchmarkBinaryReader(t *testing.B) {
354	ctx := context.Background()
355	tmpDir, err := ioutil.TempDir("", "bench-indexheader")
356	testutil.Ok(t, err)
357	defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
358
359	bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
360	testutil.Ok(t, err)
361
362	m := prepareIndexV2Block(t, tmpDir, bkt)
363	fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename)
364	testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn))
365
366	t.ResetTimer()
367	for i := 0; i < t.N; i++ {
368		br, err := newFileBinaryReader(fn, 32)
369		testutil.Ok(t, err)
370		testutil.Ok(t, br.Close())
371	}
372}
373
374func BenchmarkBinaryReader_LookupSymbol(b *testing.B) {
375	for _, numSeries := range []int{valueSymbolsCacheSize, valueSymbolsCacheSize * 10} {
376		b.Run(fmt.Sprintf("num series = %d", numSeries), func(b *testing.B) {
377			benchmarkBinaryReaderLookupSymbol(b, numSeries)
378		})
379	}
380}
381
382func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) {
383	const postingOffsetsInMemSampling = 32
384
385	ctx := context.Background()
386	logger := log.NewNopLogger()
387
388	tmpDir, err := ioutil.TempDir("", "benchmark-lookupsymbol")
389	testutil.Ok(b, err)
390	defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }()
391
392	bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
393	testutil.Ok(b, err)
394	defer func() { testutil.Ok(b, bkt.Close()) }()
395
396	// Generate series labels.
397	seriesLabels := make([]labels.Labels, 0, numSeries)
398	for i := 0; i < numSeries; i++ {
399		seriesLabels = append(seriesLabels, labels.Labels{{Name: "a", Value: strconv.Itoa(i)}})
400	}
401
402	// Create a block.
403	id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
404	testutil.Ok(b, err)
405	testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc))
406
407	// Create an index reader.
408	reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling)
409	testutil.Ok(b, err)
410
411	// Get the offset of each label value symbol.
412	symbolsOffsets := make([]uint32, numSeries)
413	for i := 0; i < numSeries; i++ {
414		o, err := reader.symbols.ReverseLookup(strconv.Itoa(i))
415		testutil.Ok(b, err)
416
417		symbolsOffsets[i] = o
418	}
419
420	b.ResetTimer()
421
422	for n := 0; n < b.N; n++ {
423		for i := 0; i < len(symbolsOffsets); i++ {
424			if _, err := reader.LookupSymbol(symbolsOffsets[i]); err != nil {
425				b.Fail()
426			}
427		}
428	}
429}
430
431func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
432	version := int(b.Range(4, 5)[0])
433
434	if version != 1 && version != 2 {
435		return nil, errors.Errorf("unknown index file version %d", version)
436	}
437
438	toc, err := index.NewTOCFromByteSlice(b)
439	if err != nil {
440		return nil, errors.Wrap(err, "read TOC")
441	}
442
443	symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols))
444	if err != nil {
445		return nil, errors.Wrap(err, "read symbols")
446	}
447
448	symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2))
449	for o, s := range symbolsV1 {
450		symbolsTable[o] = s
451	}
452	for o, s := range symbolsV2 {
453		symbolsTable[uint32(o)] = s
454	}
455	return symbolsTable, nil
456}
457
458// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
459// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
460// after the reader is closed.
461func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) {
462	if off == 0 {
463		return nil, nil, nil
464	}
465	d := encoding.NewDecbufAt(bs, off, castagnoliTable)
466
467	var (
468		origLen     = d.Len()
469		cnt         = d.Be32int()
470		basePos     = uint32(off) + 4
471		nextPos     = basePos + uint32(origLen-d.Len())
472		symbolSlice []string
473		symbols     = map[uint32]string{}
474	)
475	if version == index.FormatV2 {
476		symbolSlice = make([]string, 0, cnt)
477	}
478
479	for d.Err() == nil && d.Len() > 0 && cnt > 0 {
480		s := d.UvarintStr()
481
482		if version == index.FormatV2 {
483			symbolSlice = append(symbolSlice, s)
484		} else {
485			symbols[nextPos] = s
486			nextPos = basePos + uint32(origLen-d.Len())
487		}
488		cnt--
489	}
490	return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
491}
492