1// Copyright (c) The Thanos Authors.
2// Licensed under the Apache License 2.0.
3
4package downsample
5
6import (
7	"io/ioutil"
8	"math"
9	"os"
10	"path/filepath"
11	"sort"
12	"testing"
13	"time"
14
15	"github.com/fortytw2/leaktest"
16	"github.com/go-kit/kit/log"
17	"github.com/pkg/errors"
18	"github.com/prometheus/prometheus/pkg/labels"
19	"github.com/prometheus/prometheus/pkg/value"
20	"github.com/prometheus/prometheus/tsdb"
21	"github.com/prometheus/prometheus/tsdb/chunkenc"
22	"github.com/prometheus/prometheus/tsdb/chunks"
23	"github.com/prometheus/prometheus/tsdb/index"
24	"github.com/prometheus/prometheus/tsdb/tombstones"
25	"github.com/thanos-io/thanos/pkg/block"
26	"github.com/thanos-io/thanos/pkg/block/metadata"
27	"github.com/thanos-io/thanos/pkg/testutil"
28)
29
30func TestDownsampleCounterBoundaryReset(t *testing.T) {
31
32	toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) {
33		for i := range cm {
34			achk, ok := cm[i].Chunk.(*AggrChunk)
35			testutil.Assert(t, ok, "expected *AggrChunk")
36			res = append(res, achk)
37		}
38		return
39	}
40
41	counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) {
42		for _, achk := range achks {
43			chk, err := achk.Get(AggrCounter)
44			testutil.Ok(t, err)
45
46			iter := chk.Iterator(nil)
47			for iter.Next() {
48				t, v := iter.At()
49				res = append(res, sample{t, v})
50			}
51		}
52		return
53	}
54
55	counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) {
56		var iters []chunkenc.Iterator
57		for _, achk := range achks {
58			chk, err := achk.Get(AggrCounter)
59			testutil.Ok(t, err)
60			iters = append(iters, chk.Iterator(nil))
61		}
62
63		citer := NewCounterSeriesIterator(iters...)
64		for citer.Next() {
65			t, v := citer.At()
66			res = append(res, sample{t: t, v: v})
67		}
68		return
69	}
70
71	type test struct {
72		raw                   []sample
73		rawAggrResolution     int64
74		expectedRawAggrChunks int
75		rawCounterSamples     []sample
76		rawCounterIterate     []sample
77		aggrAggrResolution    int64
78		aggrChunks            int
79		aggrCounterSamples    []sample
80		aggrCounterIterate    []sample
81	}
82
83	tests := []test{
84		{
85			// In this test case, counter resets occur at the
86			// boundaries between the t=49,t=99 and t=99,t=149
87			// windows, and the values in the t=49, t=99, and
88			// t=149 windows are high enough that the resets
89			// will only be accounted for if the first raw value
90			// of a chunk is maintained during aggregation.
91			// See #1568 for more details.
92			raw: []sample{
93				{t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5},
94				{t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10},
95				{t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20},
96				{t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40},
97			},
98			rawAggrResolution:     50,
99			expectedRawAggrChunks: 4,
100			rawCounterSamples: []sample{
101				{t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5},
102				{t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10},
103				{t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20},
104				{t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40},
105			},
106			rawCounterIterate: []sample{
107				{t: 10, v: 1}, {t: 30, v: 5},
108				{t: 50, v: 6}, {t: 70, v: 15},
109				{t: 120, v: 16}, {t: 140, v: 35},
110				{t: 160, v: 36}, {t: 180, v: 55},
111			},
112			aggrAggrResolution: 2 * 50,
113			aggrChunks:         2,
114			aggrCounterSamples: []sample{
115				{t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10},
116				{t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40},
117			},
118			aggrCounterIterate: []sample{
119				{t: 10, v: 1}, {t: 70, v: 15},
120				{t: 120, v: 16}, {t: 180, v: 55},
121			},
122		},
123	}
124
125	doTest := func(t *testing.T, test *test) {
126		// Asking for more chunks than raw samples ensures that downsampleRawLoop
127		// will create chunks with samples from a single window.
128		cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1)
129		testutil.Equals(t, test.expectedRawAggrChunks, len(cm))
130
131		rawAggrChunks := toAggrChunks(t, cm)
132		testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks))
133		testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks))
134
135		var buf []sample
136		acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks)
137		testutil.Ok(t, err)
138		testutil.Equals(t, test.aggrChunks, len(acm))
139
140		aggrAggrChunks := toAggrChunks(t, acm)
141		testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks))
142		testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks))
143	}
144
145	doTest(t, &tests[0])
146}
147
148func TestExpandChunkIterator(t *testing.T) {
149	// Validate that expanding the chunk iterator filters out-of-order samples
150	// and staleness markers.
151	// Same timestamps are okay since we use them for counter markers.
152	var res []sample
153	testutil.Ok(t,
154		expandChunkIterator(
155			newSampleIterator([]sample{
156				{100, 1}, {200, 2}, {200, 3}, {201, 4}, {200, 5},
157				{300, 6}, {400, math.Float64frombits(value.StaleNaN)}, {500, 5},
158			}), &res,
159		),
160	)
161
162	testutil.Equals(t, []sample{{100, 1}, {200, 2}, {200, 3}, {201, 4}, {300, 6}, {500, 5}}, res)
163}
164
165func TestDownsampleRaw(t *testing.T) {
166	defer leaktest.CheckTimeout(t, 10*time.Second)()
167
168	staleMarker := math.Float64frombits(value.StaleNaN)
169
170	input := []*downsampleTestSet{
171		{
172			lset: labels.FromStrings("__name__", "a"),
173			inRaw: []sample{
174				{20, 1}, {40, 2}, {60, 3}, {80, 1}, {100, 2}, {101, staleMarker}, {120, 5}, {180, 10}, {250, 1},
175			},
176			output: map[AggrType][]sample{
177				AggrCount:   {{99, 4}, {199, 3}, {250, 1}},
178				AggrSum:     {{99, 7}, {199, 17}, {250, 1}},
179				AggrMin:     {{99, 1}, {199, 2}, {250, 1}},
180				AggrMax:     {{99, 3}, {199, 10}, {250, 1}},
181				AggrCounter: {{20, 1}, {99, 4}, {199, 13}, {250, 14}, {250, 1}},
182			},
183		},
184	}
185	testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100)
186}
187
188func TestDownsampleAggr(t *testing.T) {
189	defer leaktest.CheckTimeout(t, 10*time.Second)()
190
191	input := []*downsampleTestSet{
192		{
193			lset: labels.FromStrings("__name__", "a"),
194			inAggr: map[AggrType][]sample{
195				AggrCount: {
196					{199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100},
197				},
198				AggrSum: {
199					{199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100},
200				},
201				AggrMin: {
202					{199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100},
203				},
204				AggrMax: {
205					{199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100},
206				},
207				AggrCounter: {
208					{99, 100}, {299, 150}, {499, 210}, {499, 10}, // Chunk 1.
209					{599, 20}, {799, 50}, {999, 120}, {999, 50}, // Chunk 2, no reset.
210					{1099, 40}, {1199, 80}, {1299, 110}, // Chunk 3, reset.
211				},
212			},
213			output: map[AggrType][]sample{
214				AggrCount:   {{499, 29}, {999, 100}},
215				AggrSum:     {{499, 29}, {999, 100}},
216				AggrMin:     {{499, -3}, {999, 0}},
217				AggrMax:     {{499, 10}, {999, 100}},
218				AggrCounter: {{99, 100}, {499, 210}, {999, 320}, {1299, 430}, {1299, 110}},
219			},
220		},
221	}
222	var meta metadata.Meta
223	meta.Thanos.Downsample.Resolution = 10
224	meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300}
225
226	testDownsample(t, input, &meta, 500)
227}
228
229func encodeTestAggrSeries(v map[AggrType][]sample) chunks.Meta {
230	b := newAggrChunkBuilder()
231
232	for at, d := range v {
233		for _, s := range d {
234			b.apps[at].Append(s.t, s.v)
235		}
236	}
237
238	return b.encode()
239}
240
241type downsampleTestSet struct {
242	lset   labels.Labels
243	inRaw  []sample
244	inAggr map[AggrType][]sample
245	output map[AggrType][]sample
246}
247
248// testDownsample inserts the input into a block and invokes the downsampler with the given resolution.
249// The chunk ranges within the input block are aligned at 500 time units.
250func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) {
251	t.Helper()
252
253	dir, err := ioutil.TempDir("", "downsample-raw")
254	testutil.Ok(t, err)
255	defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
256
257	// Ideally we would use tsdb.HeadBlock here for less dependency on our own code. However,
258	// it cannot accept the counter signal sample with the same timestamp as the previous sample.
259	mb := newMemBlock()
260
261	for _, d := range data {
262		if len(d.inRaw) > 0 && len(d.inAggr) > 0 {
263			t.Fatalf("test must not have raw and aggregate input data at once")
264		}
265		ser := &series{lset: d.lset}
266
267		if len(d.inRaw) > 0 {
268			chk := chunkenc.NewXORChunk()
269			app, _ := chk.Appender()
270
271			for _, s := range d.inRaw {
272				app.Append(s.t, s.v)
273			}
274			ser.chunks = append(ser.chunks, chunks.Meta{
275				MinTime: d.inRaw[0].t,
276				MaxTime: d.inRaw[len(d.inRaw)-1].t,
277				Chunk:   chk,
278			})
279		} else {
280			ser.chunks = append(ser.chunks, encodeTestAggrSeries(d.inAggr))
281		}
282		mb.addSeries(ser)
283	}
284
285	id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution)
286	testutil.Ok(t, err)
287
288	_, err = metadata.Read(filepath.Join(dir, id.String()))
289	testutil.Ok(t, err)
290
291	exp := map[uint64]map[AggrType][]sample{}
292	got := map[uint64]map[AggrType][]sample{}
293
294	for _, d := range data {
295		exp[d.lset.Hash()] = d.output
296	}
297	indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
298	testutil.Ok(t, err)
299	defer func() { testutil.Ok(t, indexr.Close()) }()
300
301	chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), block.ChunksDirname), NewPool())
302	testutil.Ok(t, err)
303	defer func() { testutil.Ok(t, chunkr.Close()) }()
304
305	pall, err := indexr.Postings(index.AllPostingsKey())
306	testutil.Ok(t, err)
307
308	for pall.Next() {
309		id := pall.At()
310
311		var lset labels.Labels
312		var chks []chunks.Meta
313		testutil.Ok(t, indexr.Series(id, &lset, &chks))
314
315		m := map[AggrType][]sample{}
316		got[lset.Hash()] = m
317
318		for _, c := range chks {
319			chk, err := chunkr.Chunk(c.Ref)
320			testutil.Ok(t, err)
321
322			for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} {
323				c, err := chk.(*AggrChunk).Get(at)
324				if err == ErrAggrNotExist {
325					continue
326				}
327				testutil.Ok(t, err)
328
329				buf := m[at]
330				testutil.Ok(t, expandChunkIterator(c.Iterator(nil), &buf))
331				m[at] = buf
332			}
333		}
334	}
335
336	testutil.Equals(t, len(exp), len(got))
337
338	for h, ser := range exp {
339		for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} {
340			t.Logf("series %d, type %s", h, at)
341			testutil.Equals(t, ser[at], got[h][at])
342		}
343	}
344}
345
346func TestAverageChunkIterator(t *testing.T) {
347	sum := []sample{{100, 30}, {200, 40}, {300, 5}, {400, -10}}
348	cnt := []sample{{100, 1}, {200, 5}, {300, 2}, {400, 10}}
349	exp := []sample{{100, 30}, {200, 8}, {300, 2.5}, {400, -1}}
350
351	x := NewAverageChunkIterator(newSampleIterator(cnt), newSampleIterator(sum))
352
353	var res []sample
354	for x.Next() {
355		t, v := x.At()
356		res = append(res, sample{t, v})
357	}
358	testutil.Ok(t, x.Err())
359	testutil.Equals(t, exp, res)
360}
361
362func TestCounterSeriesIterator(t *testing.T) {
363	defer leaktest.CheckTimeout(t, 10*time.Second)()
364
365	staleMarker := math.Float64frombits(value.StaleNaN)
366
367	chunks := [][]sample{
368		{{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}},
369		{{500, 10}, {600, 20}, {700, 30}, {800, 40}, {800, 10}}, // No actual reset.
370		{{900, 5}, {1000, 10}, {1100, 15}},                      // Actual reset.
371		{{1200, 20}, {1250, staleMarker}, {1300, 40}},           // No special last sample, no reset.
372		{{1400, 30}, {1500, 30}, {1600, 50}},                    // No special last sample, reset.
373	}
374	exp := []sample{
375		{100, 10}, {200, 20}, {300, 30}, {400, 40}, {500, 45},
376		{600, 55}, {700, 65}, {800, 75}, {900, 80}, {1000, 85},
377		{1100, 90}, {1200, 95}, {1300, 115}, {1400, 145}, {1500, 145}, {1600, 165},
378	}
379
380	var its []chunkenc.Iterator
381	for _, c := range chunks {
382		its = append(its, newSampleIterator(c))
383	}
384
385	x := NewCounterSeriesIterator(its...)
386
387	var res []sample
388	for x.Next() {
389		t, v := x.At()
390		res = append(res, sample{t, v})
391	}
392	testutil.Ok(t, x.Err())
393	testutil.Equals(t, exp, res)
394}
395
396func TestCounterSeriesIteratorSeek(t *testing.T) {
397	chunks := [][]sample{
398		{{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}},
399	}
400
401	exp := []sample{
402		{200, 20}, {300, 30}, {400, 40},
403	}
404
405	var its []chunkenc.Iterator
406	for _, c := range chunks {
407		its = append(its, newSampleIterator(c))
408	}
409
410	var res []sample
411	x := NewCounterSeriesIterator(its...)
412
413	ok := x.Seek(150)
414	testutil.Assert(t, ok, "Seek should return true")
415	testutil.Ok(t, x.Err())
416	for {
417		ts, v := x.At()
418		res = append(res, sample{ts, v})
419
420		ok = x.Next()
421		if !ok {
422			break
423		}
424	}
425	testutil.Equals(t, exp, res)
426}
427
428func TestCounterSeriesIteratorSeekExtendTs(t *testing.T) {
429	chunks := [][]sample{
430		{{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}},
431	}
432
433	var its []chunkenc.Iterator
434	for _, c := range chunks {
435		its = append(its, newSampleIterator(c))
436	}
437
438	x := NewCounterSeriesIterator(its...)
439
440	ok := x.Seek(500)
441	testutil.Assert(t, !ok, "Seek should return false")
442}
443
444func TestCounterSeriesIteratorSeekAfterNext(t *testing.T) {
445	chunks := [][]sample{
446		{{100, 10}},
447	}
448	exp := []sample{
449		{100, 10},
450	}
451
452	var its []chunkenc.Iterator
453	for _, c := range chunks {
454		its = append(its, newSampleIterator(c))
455	}
456
457	var res []sample
458	x := NewCounterSeriesIterator(its...)
459
460	x.Next()
461
462	ok := x.Seek(50)
463	testutil.Assert(t, ok, "Seek should return true")
464	testutil.Ok(t, x.Err())
465	for {
466		ts, v := x.At()
467		res = append(res, sample{ts, v})
468
469		ok = x.Next()
470		if !ok {
471			break
472		}
473	}
474	testutil.Equals(t, exp, res)
475}
476
477type sampleIterator struct {
478	l []sample
479	i int
480}
481
482func newSampleIterator(l []sample) *sampleIterator {
483	return &sampleIterator{l: l, i: -1}
484}
485
486func (it *sampleIterator) Err() error {
487	return nil
488}
489
490func (it *sampleIterator) Next() bool {
491	if it.i >= len(it.l)-1 {
492		return false
493	}
494	it.i++
495	return true
496}
497
498func (it *sampleIterator) Seek(int64) bool {
499	panic("unexpected")
500}
501
502func (it *sampleIterator) At() (t int64, v float64) {
503	return it.l[it.i].t, it.l[it.i].v
504}
505
506// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
507// to allow tsdb.StreamedBlockWriter to persist the data as a block.
508type memBlock struct {
509	// Dummies to implement unused methods.
510	tsdb.IndexReader
511
512	symbols  map[string]struct{}
513	postings []uint64
514	series   []*series
515	chunks   []chunkenc.Chunk
516
517	numberOfChunks uint64
518
519	minTime, maxTime int64
520}
521
522type series struct {
523	lset   labels.Labels
524	chunks []chunks.Meta
525}
526
527func newMemBlock() *memBlock {
528	return &memBlock{symbols: map[string]struct{}{}, minTime: -1, maxTime: -1}
529}
530
531func (b *memBlock) addSeries(s *series) {
532	sid := uint64(len(b.series))
533	b.postings = append(b.postings, sid)
534	b.series = append(b.series, s)
535
536	for _, l := range s.lset {
537		b.symbols[l.Name] = struct{}{}
538		b.symbols[l.Value] = struct{}{}
539	}
540
541	for i, cm := range s.chunks {
542		if b.minTime == -1 || cm.MinTime < b.minTime {
543			b.minTime = cm.MinTime
544		}
545		if b.maxTime == -1 || cm.MaxTime < b.maxTime {
546			b.maxTime = cm.MaxTime
547		}
548		s.chunks[i].Ref = b.numberOfChunks
549		b.chunks = append(b.chunks, cm.Chunk)
550		b.numberOfChunks++
551	}
552}
553
554func (b *memBlock) MinTime() int64 {
555	if b.minTime == -1 {
556		return 0
557	}
558
559	return b.minTime
560}
561
562func (b *memBlock) MaxTime() int64 {
563	if b.maxTime == -1 {
564		return 0
565	}
566
567	return b.maxTime
568}
569
570func (b *memBlock) Meta() tsdb.BlockMeta {
571	return tsdb.BlockMeta{}
572}
573
574func (b *memBlock) Postings(name string, val ...string) (index.Postings, error) {
575	allName, allVal := index.AllPostingsKey()
576
577	if name != allName || val[0] != allVal {
578		return nil, errors.New("unexpected call to Postings() that is not AllVall")
579	}
580	sort.Slice(b.postings, func(i, j int) bool {
581		return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0
582	})
583	return index.NewListPostings(b.postings), nil
584}
585
586func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
587	if id >= uint64(len(b.series)) {
588		return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id)
589	}
590	s := b.series[id]
591
592	*lset = append((*lset)[:0], s.lset...)
593	*chks = append((*chks)[:0], s.chunks...)
594
595	return nil
596}
597
598func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) {
599	if id >= b.numberOfChunks {
600		return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id)
601	}
602
603	return b.chunks[id], nil
604}
605
606func (b *memBlock) Symbols() index.StringIter {
607	res := make([]string, 0, len(b.symbols))
608	for s := range b.symbols {
609		res = append(res, s)
610	}
611	sort.Strings(res)
612	return index.NewStringListIter(res)
613}
614
615func (b *memBlock) SortedPostings(p index.Postings) index.Postings {
616	return p
617}
618
619func (b *memBlock) Index() (tsdb.IndexReader, error) {
620	return b, nil
621}
622
623func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
624	return b, nil
625}
626
627func (b *memBlock) Tombstones() (tombstones.Reader, error) {
628	return emptyTombstoneReader{}, nil
629}
630
631func (b *memBlock) Close() error {
632	return nil
633}
634
635type emptyTombstoneReader struct{}
636
637func (emptyTombstoneReader) Get(ref uint64) (tombstones.Intervals, error)        { return nil, nil }
638func (emptyTombstoneReader) Iter(func(uint64, tombstones.Intervals) error) error { return nil }
639func (emptyTombstoneReader) Total() uint64                                       { return 0 }
640func (emptyTombstoneReader) Close() error                                        { return nil }
641