1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package downsample 5 6import ( 7 "io/ioutil" 8 "math" 9 "os" 10 "path/filepath" 11 "sort" 12 "testing" 13 "time" 14 15 "github.com/fortytw2/leaktest" 16 "github.com/go-kit/kit/log" 17 "github.com/pkg/errors" 18 "github.com/prometheus/prometheus/pkg/labels" 19 "github.com/prometheus/prometheus/pkg/value" 20 "github.com/prometheus/prometheus/tsdb" 21 "github.com/prometheus/prometheus/tsdb/chunkenc" 22 "github.com/prometheus/prometheus/tsdb/chunks" 23 "github.com/prometheus/prometheus/tsdb/index" 24 "github.com/prometheus/prometheus/tsdb/tombstones" 25 "github.com/thanos-io/thanos/pkg/block" 26 "github.com/thanos-io/thanos/pkg/block/metadata" 27 "github.com/thanos-io/thanos/pkg/testutil" 28) 29 30func TestDownsampleCounterBoundaryReset(t *testing.T) { 31 32 toAggrChunks := func(t *testing.T, cm []chunks.Meta) (res []*AggrChunk) { 33 for i := range cm { 34 achk, ok := cm[i].Chunk.(*AggrChunk) 35 testutil.Assert(t, ok, "expected *AggrChunk") 36 res = append(res, achk) 37 } 38 return 39 } 40 41 counterSamples := func(t *testing.T, achks []*AggrChunk) (res []sample) { 42 for _, achk := range achks { 43 chk, err := achk.Get(AggrCounter) 44 testutil.Ok(t, err) 45 46 iter := chk.Iterator(nil) 47 for iter.Next() { 48 t, v := iter.At() 49 res = append(res, sample{t, v}) 50 } 51 } 52 return 53 } 54 55 counterIterate := func(t *testing.T, achks []*AggrChunk) (res []sample) { 56 var iters []chunkenc.Iterator 57 for _, achk := range achks { 58 chk, err := achk.Get(AggrCounter) 59 testutil.Ok(t, err) 60 iters = append(iters, chk.Iterator(nil)) 61 } 62 63 citer := NewCounterSeriesIterator(iters...) 64 for citer.Next() { 65 t, v := citer.At() 66 res = append(res, sample{t: t, v: v}) 67 } 68 return 69 } 70 71 type test struct { 72 raw []sample 73 rawAggrResolution int64 74 expectedRawAggrChunks int 75 rawCounterSamples []sample 76 rawCounterIterate []sample 77 aggrAggrResolution int64 78 aggrChunks int 79 aggrCounterSamples []sample 80 aggrCounterIterate []sample 81 } 82 83 tests := []test{ 84 { 85 // In this test case, counter resets occur at the 86 // boundaries between the t=49,t=99 and t=99,t=149 87 // windows, and the values in the t=49, t=99, and 88 // t=149 windows are high enough that the resets 89 // will only be accounted for if the first raw value 90 // of a chunk is maintained during aggregation. 91 // See #1568 for more details. 92 raw: []sample{ 93 {t: 10, v: 1}, {t: 20, v: 3}, {t: 30, v: 5}, 94 {t: 50, v: 1}, {t: 60, v: 8}, {t: 70, v: 10}, 95 {t: 120, v: 1}, {t: 130, v: 18}, {t: 140, v: 20}, 96 {t: 160, v: 21}, {t: 170, v: 38}, {t: 180, v: 40}, 97 }, 98 rawAggrResolution: 50, 99 expectedRawAggrChunks: 4, 100 rawCounterSamples: []sample{ 101 {t: 10, v: 1}, {t: 30, v: 5}, {t: 30, v: 5}, 102 {t: 50, v: 1}, {t: 70, v: 10}, {t: 70, v: 10}, 103 {t: 120, v: 1}, {t: 140, v: 20}, {t: 140, v: 20}, 104 {t: 160, v: 21}, {t: 180, v: 40}, {t: 180, v: 40}, 105 }, 106 rawCounterIterate: []sample{ 107 {t: 10, v: 1}, {t: 30, v: 5}, 108 {t: 50, v: 6}, {t: 70, v: 15}, 109 {t: 120, v: 16}, {t: 140, v: 35}, 110 {t: 160, v: 36}, {t: 180, v: 55}, 111 }, 112 aggrAggrResolution: 2 * 50, 113 aggrChunks: 2, 114 aggrCounterSamples: []sample{ 115 {t: 10, v: 1}, {t: 70, v: 15}, {t: 70, v: 10}, 116 {t: 120, v: 1}, {t: 180, v: 40}, {t: 180, v: 40}, 117 }, 118 aggrCounterIterate: []sample{ 119 {t: 10, v: 1}, {t: 70, v: 15}, 120 {t: 120, v: 16}, {t: 180, v: 55}, 121 }, 122 }, 123 } 124 125 doTest := func(t *testing.T, test *test) { 126 // Asking for more chunks than raw samples ensures that downsampleRawLoop 127 // will create chunks with samples from a single window. 128 cm := downsampleRawLoop(test.raw, test.rawAggrResolution, len(test.raw)+1) 129 testutil.Equals(t, test.expectedRawAggrChunks, len(cm)) 130 131 rawAggrChunks := toAggrChunks(t, cm) 132 testutil.Equals(t, test.rawCounterSamples, counterSamples(t, rawAggrChunks)) 133 testutil.Equals(t, test.rawCounterIterate, counterIterate(t, rawAggrChunks)) 134 135 var buf []sample 136 acm, err := downsampleAggrLoop(rawAggrChunks, &buf, test.aggrAggrResolution, test.aggrChunks) 137 testutil.Ok(t, err) 138 testutil.Equals(t, test.aggrChunks, len(acm)) 139 140 aggrAggrChunks := toAggrChunks(t, acm) 141 testutil.Equals(t, test.aggrCounterSamples, counterSamples(t, aggrAggrChunks)) 142 testutil.Equals(t, test.aggrCounterIterate, counterIterate(t, aggrAggrChunks)) 143 } 144 145 doTest(t, &tests[0]) 146} 147 148func TestExpandChunkIterator(t *testing.T) { 149 // Validate that expanding the chunk iterator filters out-of-order samples 150 // and staleness markers. 151 // Same timestamps are okay since we use them for counter markers. 152 var res []sample 153 testutil.Ok(t, 154 expandChunkIterator( 155 newSampleIterator([]sample{ 156 {100, 1}, {200, 2}, {200, 3}, {201, 4}, {200, 5}, 157 {300, 6}, {400, math.Float64frombits(value.StaleNaN)}, {500, 5}, 158 }), &res, 159 ), 160 ) 161 162 testutil.Equals(t, []sample{{100, 1}, {200, 2}, {200, 3}, {201, 4}, {300, 6}, {500, 5}}, res) 163} 164 165func TestDownsampleRaw(t *testing.T) { 166 defer leaktest.CheckTimeout(t, 10*time.Second)() 167 168 staleMarker := math.Float64frombits(value.StaleNaN) 169 170 input := []*downsampleTestSet{ 171 { 172 lset: labels.FromStrings("__name__", "a"), 173 inRaw: []sample{ 174 {20, 1}, {40, 2}, {60, 3}, {80, 1}, {100, 2}, {101, staleMarker}, {120, 5}, {180, 10}, {250, 1}, 175 }, 176 output: map[AggrType][]sample{ 177 AggrCount: {{99, 4}, {199, 3}, {250, 1}}, 178 AggrSum: {{99, 7}, {199, 17}, {250, 1}}, 179 AggrMin: {{99, 1}, {199, 2}, {250, 1}}, 180 AggrMax: {{99, 3}, {199, 10}, {250, 1}}, 181 AggrCounter: {{20, 1}, {99, 4}, {199, 13}, {250, 14}, {250, 1}}, 182 }, 183 }, 184 } 185 testDownsample(t, input, &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 250}}, 100) 186} 187 188func TestDownsampleAggr(t *testing.T) { 189 defer leaktest.CheckTimeout(t, 10*time.Second)() 190 191 input := []*downsampleTestSet{ 192 { 193 lset: labels.FromStrings("__name__", "a"), 194 inAggr: map[AggrType][]sample{ 195 AggrCount: { 196 {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, 197 }, 198 AggrSum: { 199 {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, 200 }, 201 AggrMin: { 202 {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, 203 }, 204 AggrMax: { 205 {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, 206 }, 207 AggrCounter: { 208 {99, 100}, {299, 150}, {499, 210}, {499, 10}, // Chunk 1. 209 {599, 20}, {799, 50}, {999, 120}, {999, 50}, // Chunk 2, no reset. 210 {1099, 40}, {1199, 80}, {1299, 110}, // Chunk 3, reset. 211 }, 212 }, 213 output: map[AggrType][]sample{ 214 AggrCount: {{499, 29}, {999, 100}}, 215 AggrSum: {{499, 29}, {999, 100}}, 216 AggrMin: {{499, -3}, {999, 0}}, 217 AggrMax: {{499, 10}, {999, 100}}, 218 AggrCounter: {{99, 100}, {499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, 219 }, 220 }, 221 } 222 var meta metadata.Meta 223 meta.Thanos.Downsample.Resolution = 10 224 meta.BlockMeta = tsdb.BlockMeta{MinTime: 99, MaxTime: 1300} 225 226 testDownsample(t, input, &meta, 500) 227} 228 229func encodeTestAggrSeries(v map[AggrType][]sample) chunks.Meta { 230 b := newAggrChunkBuilder() 231 232 for at, d := range v { 233 for _, s := range d { 234 b.apps[at].Append(s.t, s.v) 235 } 236 } 237 238 return b.encode() 239} 240 241type downsampleTestSet struct { 242 lset labels.Labels 243 inRaw []sample 244 inAggr map[AggrType][]sample 245 output map[AggrType][]sample 246} 247 248// testDownsample inserts the input into a block and invokes the downsampler with the given resolution. 249// The chunk ranges within the input block are aligned at 500 time units. 250func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta, resolution int64) { 251 t.Helper() 252 253 dir, err := ioutil.TempDir("", "downsample-raw") 254 testutil.Ok(t, err) 255 defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() 256 257 // Ideally we would use tsdb.HeadBlock here for less dependency on our own code. However, 258 // it cannot accept the counter signal sample with the same timestamp as the previous sample. 259 mb := newMemBlock() 260 261 for _, d := range data { 262 if len(d.inRaw) > 0 && len(d.inAggr) > 0 { 263 t.Fatalf("test must not have raw and aggregate input data at once") 264 } 265 ser := &series{lset: d.lset} 266 267 if len(d.inRaw) > 0 { 268 chk := chunkenc.NewXORChunk() 269 app, _ := chk.Appender() 270 271 for _, s := range d.inRaw { 272 app.Append(s.t, s.v) 273 } 274 ser.chunks = append(ser.chunks, chunks.Meta{ 275 MinTime: d.inRaw[0].t, 276 MaxTime: d.inRaw[len(d.inRaw)-1].t, 277 Chunk: chk, 278 }) 279 } else { 280 ser.chunks = append(ser.chunks, encodeTestAggrSeries(d.inAggr)) 281 } 282 mb.addSeries(ser) 283 } 284 285 id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution) 286 testutil.Ok(t, err) 287 288 _, err = metadata.Read(filepath.Join(dir, id.String())) 289 testutil.Ok(t, err) 290 291 exp := map[uint64]map[AggrType][]sample{} 292 got := map[uint64]map[AggrType][]sample{} 293 294 for _, d := range data { 295 exp[d.lset.Hash()] = d.output 296 } 297 indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename)) 298 testutil.Ok(t, err) 299 defer func() { testutil.Ok(t, indexr.Close()) }() 300 301 chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), block.ChunksDirname), NewPool()) 302 testutil.Ok(t, err) 303 defer func() { testutil.Ok(t, chunkr.Close()) }() 304 305 pall, err := indexr.Postings(index.AllPostingsKey()) 306 testutil.Ok(t, err) 307 308 for pall.Next() { 309 id := pall.At() 310 311 var lset labels.Labels 312 var chks []chunks.Meta 313 testutil.Ok(t, indexr.Series(id, &lset, &chks)) 314 315 m := map[AggrType][]sample{} 316 got[lset.Hash()] = m 317 318 for _, c := range chks { 319 chk, err := chunkr.Chunk(c.Ref) 320 testutil.Ok(t, err) 321 322 for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} { 323 c, err := chk.(*AggrChunk).Get(at) 324 if err == ErrAggrNotExist { 325 continue 326 } 327 testutil.Ok(t, err) 328 329 buf := m[at] 330 testutil.Ok(t, expandChunkIterator(c.Iterator(nil), &buf)) 331 m[at] = buf 332 } 333 } 334 } 335 336 testutil.Equals(t, len(exp), len(got)) 337 338 for h, ser := range exp { 339 for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} { 340 t.Logf("series %d, type %s", h, at) 341 testutil.Equals(t, ser[at], got[h][at]) 342 } 343 } 344} 345 346func TestAverageChunkIterator(t *testing.T) { 347 sum := []sample{{100, 30}, {200, 40}, {300, 5}, {400, -10}} 348 cnt := []sample{{100, 1}, {200, 5}, {300, 2}, {400, 10}} 349 exp := []sample{{100, 30}, {200, 8}, {300, 2.5}, {400, -1}} 350 351 x := NewAverageChunkIterator(newSampleIterator(cnt), newSampleIterator(sum)) 352 353 var res []sample 354 for x.Next() { 355 t, v := x.At() 356 res = append(res, sample{t, v}) 357 } 358 testutil.Ok(t, x.Err()) 359 testutil.Equals(t, exp, res) 360} 361 362func TestCounterSeriesIterator(t *testing.T) { 363 defer leaktest.CheckTimeout(t, 10*time.Second)() 364 365 staleMarker := math.Float64frombits(value.StaleNaN) 366 367 chunks := [][]sample{ 368 {{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}}, 369 {{500, 10}, {600, 20}, {700, 30}, {800, 40}, {800, 10}}, // No actual reset. 370 {{900, 5}, {1000, 10}, {1100, 15}}, // Actual reset. 371 {{1200, 20}, {1250, staleMarker}, {1300, 40}}, // No special last sample, no reset. 372 {{1400, 30}, {1500, 30}, {1600, 50}}, // No special last sample, reset. 373 } 374 exp := []sample{ 375 {100, 10}, {200, 20}, {300, 30}, {400, 40}, {500, 45}, 376 {600, 55}, {700, 65}, {800, 75}, {900, 80}, {1000, 85}, 377 {1100, 90}, {1200, 95}, {1300, 115}, {1400, 145}, {1500, 145}, {1600, 165}, 378 } 379 380 var its []chunkenc.Iterator 381 for _, c := range chunks { 382 its = append(its, newSampleIterator(c)) 383 } 384 385 x := NewCounterSeriesIterator(its...) 386 387 var res []sample 388 for x.Next() { 389 t, v := x.At() 390 res = append(res, sample{t, v}) 391 } 392 testutil.Ok(t, x.Err()) 393 testutil.Equals(t, exp, res) 394} 395 396func TestCounterSeriesIteratorSeek(t *testing.T) { 397 chunks := [][]sample{ 398 {{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}}, 399 } 400 401 exp := []sample{ 402 {200, 20}, {300, 30}, {400, 40}, 403 } 404 405 var its []chunkenc.Iterator 406 for _, c := range chunks { 407 its = append(its, newSampleIterator(c)) 408 } 409 410 var res []sample 411 x := NewCounterSeriesIterator(its...) 412 413 ok := x.Seek(150) 414 testutil.Assert(t, ok, "Seek should return true") 415 testutil.Ok(t, x.Err()) 416 for { 417 ts, v := x.At() 418 res = append(res, sample{ts, v}) 419 420 ok = x.Next() 421 if !ok { 422 break 423 } 424 } 425 testutil.Equals(t, exp, res) 426} 427 428func TestCounterSeriesIteratorSeekExtendTs(t *testing.T) { 429 chunks := [][]sample{ 430 {{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}}, 431 } 432 433 var its []chunkenc.Iterator 434 for _, c := range chunks { 435 its = append(its, newSampleIterator(c)) 436 } 437 438 x := NewCounterSeriesIterator(its...) 439 440 ok := x.Seek(500) 441 testutil.Assert(t, !ok, "Seek should return false") 442} 443 444func TestCounterSeriesIteratorSeekAfterNext(t *testing.T) { 445 chunks := [][]sample{ 446 {{100, 10}}, 447 } 448 exp := []sample{ 449 {100, 10}, 450 } 451 452 var its []chunkenc.Iterator 453 for _, c := range chunks { 454 its = append(its, newSampleIterator(c)) 455 } 456 457 var res []sample 458 x := NewCounterSeriesIterator(its...) 459 460 x.Next() 461 462 ok := x.Seek(50) 463 testutil.Assert(t, ok, "Seek should return true") 464 testutil.Ok(t, x.Err()) 465 for { 466 ts, v := x.At() 467 res = append(res, sample{ts, v}) 468 469 ok = x.Next() 470 if !ok { 471 break 472 } 473 } 474 testutil.Equals(t, exp, res) 475} 476 477type sampleIterator struct { 478 l []sample 479 i int 480} 481 482func newSampleIterator(l []sample) *sampleIterator { 483 return &sampleIterator{l: l, i: -1} 484} 485 486func (it *sampleIterator) Err() error { 487 return nil 488} 489 490func (it *sampleIterator) Next() bool { 491 if it.i >= len(it.l)-1 { 492 return false 493 } 494 it.i++ 495 return true 496} 497 498func (it *sampleIterator) Seek(int64) bool { 499 panic("unexpected") 500} 501 502func (it *sampleIterator) At() (t int64, v float64) { 503 return it.l[it.i].t, it.l[it.i].v 504} 505 506// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface 507// to allow tsdb.StreamedBlockWriter to persist the data as a block. 508type memBlock struct { 509 // Dummies to implement unused methods. 510 tsdb.IndexReader 511 512 symbols map[string]struct{} 513 postings []uint64 514 series []*series 515 chunks []chunkenc.Chunk 516 517 numberOfChunks uint64 518 519 minTime, maxTime int64 520} 521 522type series struct { 523 lset labels.Labels 524 chunks []chunks.Meta 525} 526 527func newMemBlock() *memBlock { 528 return &memBlock{symbols: map[string]struct{}{}, minTime: -1, maxTime: -1} 529} 530 531func (b *memBlock) addSeries(s *series) { 532 sid := uint64(len(b.series)) 533 b.postings = append(b.postings, sid) 534 b.series = append(b.series, s) 535 536 for _, l := range s.lset { 537 b.symbols[l.Name] = struct{}{} 538 b.symbols[l.Value] = struct{}{} 539 } 540 541 for i, cm := range s.chunks { 542 if b.minTime == -1 || cm.MinTime < b.minTime { 543 b.minTime = cm.MinTime 544 } 545 if b.maxTime == -1 || cm.MaxTime < b.maxTime { 546 b.maxTime = cm.MaxTime 547 } 548 s.chunks[i].Ref = b.numberOfChunks 549 b.chunks = append(b.chunks, cm.Chunk) 550 b.numberOfChunks++ 551 } 552} 553 554func (b *memBlock) MinTime() int64 { 555 if b.minTime == -1 { 556 return 0 557 } 558 559 return b.minTime 560} 561 562func (b *memBlock) MaxTime() int64 { 563 if b.maxTime == -1 { 564 return 0 565 } 566 567 return b.maxTime 568} 569 570func (b *memBlock) Meta() tsdb.BlockMeta { 571 return tsdb.BlockMeta{} 572} 573 574func (b *memBlock) Postings(name string, val ...string) (index.Postings, error) { 575 allName, allVal := index.AllPostingsKey() 576 577 if name != allName || val[0] != allVal { 578 return nil, errors.New("unexpected call to Postings() that is not AllVall") 579 } 580 sort.Slice(b.postings, func(i, j int) bool { 581 return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 582 }) 583 return index.NewListPostings(b.postings), nil 584} 585 586func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { 587 if id >= uint64(len(b.series)) { 588 return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) 589 } 590 s := b.series[id] 591 592 *lset = append((*lset)[:0], s.lset...) 593 *chks = append((*chks)[:0], s.chunks...) 594 595 return nil 596} 597 598func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { 599 if id >= b.numberOfChunks { 600 return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) 601 } 602 603 return b.chunks[id], nil 604} 605 606func (b *memBlock) Symbols() index.StringIter { 607 res := make([]string, 0, len(b.symbols)) 608 for s := range b.symbols { 609 res = append(res, s) 610 } 611 sort.Strings(res) 612 return index.NewStringListIter(res) 613} 614 615func (b *memBlock) SortedPostings(p index.Postings) index.Postings { 616 return p 617} 618 619func (b *memBlock) Index() (tsdb.IndexReader, error) { 620 return b, nil 621} 622 623func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { 624 return b, nil 625} 626 627func (b *memBlock) Tombstones() (tombstones.Reader, error) { 628 return emptyTombstoneReader{}, nil 629} 630 631func (b *memBlock) Close() error { 632 return nil 633} 634 635type emptyTombstoneReader struct{} 636 637func (emptyTombstoneReader) Get(ref uint64) (tombstones.Intervals, error) { return nil, nil } 638func (emptyTombstoneReader) Iter(func(uint64, tombstones.Intervals) error) error { return nil } 639func (emptyTombstoneReader) Total() uint64 { return 0 } 640func (emptyTombstoneReader) Close() error { return nil } 641