1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "context" 18 "fmt" 19 "io/ioutil" 20 "math" 21 "os" 22 "path" 23 "path/filepath" 24 "testing" 25 "time" 26 27 "github.com/go-kit/log" 28 "github.com/pkg/errors" 29 prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" 30 "github.com/stretchr/testify/require" 31 32 "github.com/prometheus/prometheus/pkg/labels" 33 "github.com/prometheus/prometheus/tsdb/chunkenc" 34 "github.com/prometheus/prometheus/tsdb/chunks" 35 "github.com/prometheus/prometheus/tsdb/fileutil" 36 "github.com/prometheus/prometheus/tsdb/tombstones" 37) 38 39func TestSplitByRange(t *testing.T) { 40 cases := []struct { 41 trange int64 42 ranges [][2]int64 43 output [][][2]int64 44 }{ 45 { 46 trange: 60, 47 ranges: [][2]int64{{0, 10}}, 48 output: [][][2]int64{ 49 {{0, 10}}, 50 }, 51 }, 52 { 53 trange: 60, 54 ranges: [][2]int64{{0, 60}}, 55 output: [][][2]int64{ 56 {{0, 60}}, 57 }, 58 }, 59 { 60 trange: 60, 61 ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}}, 62 output: [][][2]int64{ 63 {{0, 10}, {9, 15}, {30, 60}}, 64 }, 65 }, 66 { 67 trange: 60, 68 ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}}, 69 output: [][][2]int64{ 70 {{70, 90}}, 71 {{125, 130}, {130, 180}}, 72 {{1000, 1001}}, 73 }, 74 }, 75 // Mis-aligned or too-large blocks are ignored. 76 { 77 trange: 60, 78 ranges: [][2]int64{{50, 70}, {70, 80}}, 79 output: [][][2]int64{ 80 {{70, 80}}, 81 }, 82 }, 83 { 84 trange: 72, 85 ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}}, 86 output: [][][2]int64{ 87 {{144, 216}}, 88 {{216, 288}}, 89 }, 90 }, 91 // Various awkward edge cases easy to hit with negative numbers. 92 { 93 trange: 60, 94 ranges: [][2]int64{{-10, -5}}, 95 output: [][][2]int64{ 96 {{-10, -5}}, 97 }, 98 }, 99 { 100 trange: 60, 101 ranges: [][2]int64{{-60, -50}, {-10, -5}}, 102 output: [][][2]int64{ 103 {{-60, -50}, {-10, -5}}, 104 }, 105 }, 106 { 107 trange: 60, 108 ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}}, 109 output: [][][2]int64{ 110 {{-60, -50}, {-10, -5}}, 111 {{0, 15}}, 112 }, 113 }, 114 } 115 116 for _, c := range cases { 117 // Transform input range tuples into dirMetas. 118 blocks := make([]dirMeta, 0, len(c.ranges)) 119 for _, r := range c.ranges { 120 blocks = append(blocks, dirMeta{ 121 meta: &BlockMeta{ 122 MinTime: r[0], 123 MaxTime: r[1], 124 }, 125 }) 126 } 127 128 // Transform output range tuples into dirMetas. 129 exp := make([][]dirMeta, len(c.output)) 130 for i, group := range c.output { 131 for _, r := range group { 132 exp[i] = append(exp[i], dirMeta{ 133 meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]}, 134 }) 135 } 136 } 137 138 require.Equal(t, exp, splitByRange(blocks, c.trange)) 139 } 140} 141 142// See https://github.com/prometheus/prometheus/issues/3064 143func TestNoPanicFor0Tombstones(t *testing.T) { 144 metas := []dirMeta{ 145 { 146 dir: "1", 147 meta: &BlockMeta{ 148 MinTime: 0, 149 MaxTime: 100, 150 }, 151 }, 152 { 153 dir: "2", 154 meta: &BlockMeta{ 155 MinTime: 101, 156 MaxTime: 200, 157 }, 158 }, 159 } 160 161 c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) 162 require.NoError(t, err) 163 164 c.plan(metas) 165} 166 167func TestLeveledCompactor_plan(t *testing.T) { 168 // This mimics our default ExponentialBlockRanges with min block size equals to 20. 169 compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 170 20, 171 60, 172 180, 173 540, 174 1620, 175 }, nil, nil) 176 require.NoError(t, err) 177 178 cases := map[string]struct { 179 metas []dirMeta 180 expected []string 181 }{ 182 "Outside Range": { 183 metas: []dirMeta{ 184 metaRange("1", 0, 20, nil), 185 }, 186 expected: nil, 187 }, 188 "We should wait for four blocks of size 20 to appear before compacting.": { 189 metas: []dirMeta{ 190 metaRange("1", 0, 20, nil), 191 metaRange("2", 20, 40, nil), 192 }, 193 expected: nil, 194 }, 195 `We should wait for a next block of size 20 to appear before compacting 196 the existing ones. We have three, but we ignore the fresh one from WAl`: { 197 metas: []dirMeta{ 198 metaRange("1", 0, 20, nil), 199 metaRange("2", 20, 40, nil), 200 metaRange("3", 40, 60, nil), 201 }, 202 expected: nil, 203 }, 204 "Block to fill the entire parent range appeared – should be compacted": { 205 metas: []dirMeta{ 206 metaRange("1", 0, 20, nil), 207 metaRange("2", 20, 40, nil), 208 metaRange("3", 40, 60, nil), 209 metaRange("4", 60, 80, nil), 210 }, 211 expected: []string{"1", "2", "3"}, 212 }, 213 `Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one 214 anymore but we ignore fresh one still, so no compaction`: { 215 metas: []dirMeta{ 216 metaRange("1", 0, 20, nil), 217 metaRange("2", 20, 40, nil), 218 metaRange("3", 60, 80, nil), 219 }, 220 expected: nil, 221 }, 222 `Block for the next parent range appeared, and we have a gap with size 20 between second and third block. 223 We will not get this missed gap anymore and we should compact just these two.`: { 224 metas: []dirMeta{ 225 metaRange("1", 0, 20, nil), 226 metaRange("2", 20, 40, nil), 227 metaRange("3", 60, 80, nil), 228 metaRange("4", 80, 100, nil), 229 }, 230 expected: []string{"1", "2"}, 231 }, 232 "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one": { 233 metas: []dirMeta{ 234 metaRange("1", 0, 20, nil), 235 metaRange("2", 20, 40, nil), 236 metaRange("3", 40, 60, nil), 237 metaRange("4", 60, 120, nil), 238 metaRange("5", 120, 180, nil), 239 }, 240 expected: []string{"1", "2", "3"}, 241 }, 242 "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60": { 243 metas: []dirMeta{ 244 metaRange("2", 20, 40, nil), 245 metaRange("4", 60, 120, nil), 246 metaRange("5", 960, 980, nil), // Fresh one. 247 metaRange("6", 120, 180, nil), 248 metaRange("7", 720, 960, nil), 249 }, 250 expected: []string{"2", "4", "6"}, 251 }, 252 "Do not select large blocks that have many tombstones when there is no fresh block": { 253 metas: []dirMeta{ 254 metaRange("1", 0, 540, &BlockStats{ 255 NumSeries: 10, 256 NumTombstones: 3, 257 }), 258 }, 259 expected: nil, 260 }, 261 "Select large blocks that have many tombstones when fresh appears": { 262 metas: []dirMeta{ 263 metaRange("1", 0, 540, &BlockStats{ 264 NumSeries: 10, 265 NumTombstones: 3, 266 }), 267 metaRange("2", 540, 560, nil), 268 }, 269 expected: []string{"1"}, 270 }, 271 "For small blocks, do not compact tombstones, even when fresh appears.": { 272 metas: []dirMeta{ 273 metaRange("1", 0, 60, &BlockStats{ 274 NumSeries: 10, 275 NumTombstones: 3, 276 }), 277 metaRange("2", 60, 80, nil), 278 }, 279 expected: nil, 280 }, 281 `Regression test: we were stuck in a compact loop where we always recompacted 282 the same block when tombstones and series counts were zero`: { 283 metas: []dirMeta{ 284 metaRange("1", 0, 540, &BlockStats{ 285 NumSeries: 0, 286 NumTombstones: 0, 287 }), 288 metaRange("2", 540, 560, nil), 289 }, 290 expected: nil, 291 }, 292 `Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest. 293 We need to actually look on max time instead. 294 295 With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing 296 block overlaps`: { 297 metas: []dirMeta{ 298 metaRange("5", 0, 360, nil), 299 metaRange("6", 540, 560, nil), // Fresh one. 300 metaRange("7", 360, 420, nil), 301 metaRange("8", 420, 540, nil), 302 }, 303 expected: []string{"7", "8"}, 304 }, 305 // |--------------| 306 // |----------------| 307 // |--------------| 308 "Overlapping blocks 1": { 309 metas: []dirMeta{ 310 metaRange("1", 0, 20, nil), 311 metaRange("2", 19, 40, nil), 312 metaRange("3", 40, 60, nil), 313 }, 314 expected: []string{"1", "2"}, 315 }, 316 // |--------------| 317 // |--------------| 318 // |--------------| 319 "Overlapping blocks 2": { 320 metas: []dirMeta{ 321 metaRange("1", 0, 20, nil), 322 metaRange("2", 20, 40, nil), 323 metaRange("3", 30, 50, nil), 324 }, 325 expected: []string{"2", "3"}, 326 }, 327 // |--------------| 328 // |---------------------| 329 // |--------------| 330 "Overlapping blocks 3": { 331 metas: []dirMeta{ 332 metaRange("1", 0, 20, nil), 333 metaRange("2", 10, 40, nil), 334 metaRange("3", 30, 50, nil), 335 }, 336 expected: []string{"1", "2", "3"}, 337 }, 338 // |--------------| 339 // |--------------------------------| 340 // |--------------| 341 // |--------------| 342 "Overlapping blocks 4": { 343 metas: []dirMeta{ 344 metaRange("5", 0, 360, nil), 345 metaRange("6", 340, 560, nil), 346 metaRange("7", 360, 420, nil), 347 metaRange("8", 420, 540, nil), 348 }, 349 expected: []string{"5", "6", "7", "8"}, 350 }, 351 // |--------------| 352 // |--------------| 353 // |--------------| 354 // |--------------| 355 "Overlapping blocks 5": { 356 metas: []dirMeta{ 357 metaRange("1", 0, 10, nil), 358 metaRange("2", 9, 20, nil), 359 metaRange("3", 30, 40, nil), 360 metaRange("4", 39, 50, nil), 361 }, 362 expected: []string{"1", "2"}, 363 }, 364 } 365 366 for title, c := range cases { 367 if !t.Run(title, func(t *testing.T) { 368 res, err := compactor.plan(c.metas) 369 require.NoError(t, err) 370 require.Equal(t, c.expected, res) 371 }) { 372 return 373 } 374 } 375} 376 377func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 378 compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ 379 20, 380 60, 381 240, 382 720, 383 2160, 384 }, nil, nil) 385 require.NoError(t, err) 386 387 cases := []struct { 388 metas []dirMeta 389 }{ 390 { 391 metas: []dirMeta{ 392 metaRange("1", 0, 20, nil), 393 metaRange("2", 20, 40, nil), 394 metaRange("3", 40, 60, nil), 395 metaRange("4", 60, 80, nil), 396 }, 397 }, 398 { 399 metas: []dirMeta{ 400 metaRange("1", 0, 20, nil), 401 metaRange("2", 20, 40, nil), 402 metaRange("3", 60, 80, nil), 403 metaRange("4", 80, 100, nil), 404 }, 405 }, 406 { 407 metas: []dirMeta{ 408 metaRange("1", 0, 20, nil), 409 metaRange("2", 20, 40, nil), 410 metaRange("3", 40, 60, nil), 411 metaRange("4", 60, 120, nil), 412 metaRange("5", 120, 180, nil), 413 metaRange("6", 180, 200, nil), 414 }, 415 }, 416 } 417 418 for _, c := range cases { 419 c.metas[1].meta.Compaction.Failed = true 420 res, err := compactor.plan(c.metas) 421 require.NoError(t, err) 422 423 require.Equal(t, []string(nil), res) 424 } 425} 426 427func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 428 compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{ 429 20, 430 60, 431 240, 432 720, 433 2160, 434 }, nil, nil) 435 require.NoError(t, err) 436 437 tmpdir, err := ioutil.TempDir("", "test") 438 require.NoError(t, err) 439 defer func() { 440 require.NoError(t, os.RemoveAll(tmpdir)) 441 }() 442 443 require.Error(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) 444 _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix) 445 require.True(t, os.IsNotExist(err), "directory is not cleaned up") 446} 447 448func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { 449 meta := &BlockMeta{MinTime: mint, MaxTime: maxt} 450 if stats != nil { 451 meta.Stats = *stats 452 } 453 return dirMeta{ 454 dir: name, 455 meta: meta, 456 } 457} 458 459type erringBReader struct{} 460 461func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } 462func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } 463func (erringBReader) Tombstones() (tombstones.Reader, error) { return nil, errors.New("tombstones") } 464func (erringBReader) Meta() BlockMeta { return BlockMeta{} } 465func (erringBReader) Size() int64 { return 0 } 466 467type nopChunkWriter struct{} 468 469func (nopChunkWriter) WriteChunks(chunks ...chunks.Meta) error { return nil } 470func (nopChunkWriter) Close() error { return nil } 471 472func samplesForRange(minTime, maxTime int64, maxSamplesPerChunk int) (ret [][]sample) { 473 var curr []sample 474 for i := minTime; i <= maxTime; i++ { 475 curr = append(curr, sample{t: i}) 476 if len(curr) >= maxSamplesPerChunk { 477 ret = append(ret, curr) 478 curr = []sample{} 479 } 480 } 481 if len(curr) > 0 { 482 ret = append(ret, curr) 483 } 484 return ret 485} 486 487func TestCompaction_populateBlock(t *testing.T) { 488 for _, tc := range []struct { 489 title string 490 inputSeriesSamples [][]seriesSamples 491 compactMinTime int64 492 compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64. 493 expSeriesSamples []seriesSamples 494 expErr error 495 }{ 496 { 497 title: "Populate block from empty input should return error.", 498 inputSeriesSamples: [][]seriesSamples{}, 499 expErr: errors.New("cannot populate block from no readers"), 500 }, 501 { 502 // Populate from single block without chunks. We expect these kind of series being ignored. 503 inputSeriesSamples: [][]seriesSamples{ 504 {{lset: map[string]string{"a": "b"}}}, 505 }, 506 }, 507 { 508 title: "Populate from single block. We expect the same samples at the output.", 509 inputSeriesSamples: [][]seriesSamples{ 510 { 511 { 512 lset: map[string]string{"a": "b"}, 513 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, 514 }, 515 }, 516 }, 517 expSeriesSamples: []seriesSamples{ 518 { 519 lset: map[string]string{"a": "b"}, 520 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, 521 }, 522 }, 523 }, 524 { 525 title: "Populate from two blocks.", 526 inputSeriesSamples: [][]seriesSamples{ 527 { 528 { 529 lset: map[string]string{"a": "b"}, 530 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, 531 }, 532 { 533 lset: map[string]string{"a": "c"}, 534 chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, 535 }, 536 { 537 // no-chunk series should be dropped. 538 lset: map[string]string{"a": "empty"}, 539 }, 540 }, 541 { 542 { 543 lset: map[string]string{"a": "b"}, 544 chunks: [][]sample{{{t: 21}, {t: 30}}}, 545 }, 546 { 547 lset: map[string]string{"a": "c"}, 548 chunks: [][]sample{{{t: 40}, {t: 45}}}, 549 }, 550 }, 551 }, 552 expSeriesSamples: []seriesSamples{ 553 { 554 lset: map[string]string{"a": "b"}, 555 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}}, 556 }, 557 { 558 lset: map[string]string{"a": "c"}, 559 chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}}, 560 }, 561 }, 562 }, 563 { 564 title: "Populate from two blocks; chunks with negative time.", 565 inputSeriesSamples: [][]seriesSamples{ 566 { 567 { 568 lset: map[string]string{"a": "b"}, 569 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, 570 }, 571 { 572 lset: map[string]string{"a": "c"}, 573 chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}}, 574 }, 575 { 576 // no-chunk series should be dropped. 577 lset: map[string]string{"a": "empty"}, 578 }, 579 }, 580 { 581 { 582 lset: map[string]string{"a": "b"}, 583 chunks: [][]sample{{{t: 21}, {t: 30}}}, 584 }, 585 { 586 lset: map[string]string{"a": "c"}, 587 chunks: [][]sample{{{t: 40}, {t: 45}}}, 588 }, 589 }, 590 }, 591 compactMinTime: -11, 592 expSeriesSamples: []seriesSamples{ 593 { 594 lset: map[string]string{"a": "b"}, 595 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}}, 596 }, 597 { 598 lset: map[string]string{"a": "c"}, 599 chunks: [][]sample{{{t: -11}, {t: -9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}}, 600 }, 601 }, 602 }, 603 { 604 title: "Populate from two blocks showing that order is maintained.", 605 inputSeriesSamples: [][]seriesSamples{ 606 { 607 { 608 lset: map[string]string{"a": "b"}, 609 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, 610 }, 611 { 612 lset: map[string]string{"a": "c"}, 613 chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, 614 }, 615 }, 616 { 617 { 618 lset: map[string]string{"a": "b"}, 619 chunks: [][]sample{{{t: 21}, {t: 30}}}, 620 }, 621 { 622 lset: map[string]string{"a": "c"}, 623 chunks: [][]sample{{{t: 40}, {t: 45}}}, 624 }, 625 }, 626 }, 627 expSeriesSamples: []seriesSamples{ 628 { 629 lset: map[string]string{"a": "b"}, 630 chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}}, 631 }, 632 { 633 lset: map[string]string{"a": "c"}, 634 chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}}, 635 }, 636 }, 637 }, 638 { 639 title: "Populate from two blocks showing that order of series is sorted.", 640 inputSeriesSamples: [][]seriesSamples{ 641 { 642 { 643 lset: map[string]string{"a": "4"}, 644 chunks: [][]sample{{{t: 5}, {t: 7}}}, 645 }, 646 { 647 lset: map[string]string{"a": "3"}, 648 chunks: [][]sample{{{t: 5}, {t: 6}}}, 649 }, 650 { 651 lset: map[string]string{"a": "same"}, 652 chunks: [][]sample{{{t: 1}, {t: 4}}}, 653 }, 654 }, 655 { 656 { 657 lset: map[string]string{"a": "2"}, 658 chunks: [][]sample{{{t: 1}, {t: 3}}}, 659 }, 660 { 661 lset: map[string]string{"a": "1"}, 662 chunks: [][]sample{{{t: 1}, {t: 2}}}, 663 }, 664 { 665 lset: map[string]string{"a": "same"}, 666 chunks: [][]sample{{{t: 5}, {t: 8}}}, 667 }, 668 }, 669 }, 670 expSeriesSamples: []seriesSamples{ 671 { 672 lset: map[string]string{"a": "1"}, 673 chunks: [][]sample{{{t: 1}, {t: 2}}}, 674 }, 675 { 676 lset: map[string]string{"a": "2"}, 677 chunks: [][]sample{{{t: 1}, {t: 3}}}, 678 }, 679 { 680 lset: map[string]string{"a": "3"}, 681 chunks: [][]sample{{{t: 5}, {t: 6}}}, 682 }, 683 { 684 lset: map[string]string{"a": "4"}, 685 chunks: [][]sample{{{t: 5}, {t: 7}}}, 686 }, 687 { 688 lset: map[string]string{"a": "same"}, 689 chunks: [][]sample{{{t: 1}, {t: 4}}, {{t: 5}, {t: 8}}}, 690 }, 691 }, 692 }, 693 { 694 title: "Populate from two blocks 1:1 duplicated chunks; with negative timestamps.", 695 inputSeriesSamples: [][]seriesSamples{ 696 { 697 { 698 lset: map[string]string{"a": "1"}, 699 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}}, 700 }, 701 { 702 lset: map[string]string{"a": "2"}, 703 chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}}, 704 }, 705 }, 706 { 707 { 708 lset: map[string]string{"a": "1"}, 709 chunks: [][]sample{{{t: 3}, {t: 4}}}, 710 }, 711 { 712 lset: map[string]string{"a": "2"}, 713 chunks: [][]sample{{{t: 1}, {t: 3}, {t: 4}}, {{t: 7}, {t: 8}}}, 714 }, 715 }, 716 }, 717 compactMinTime: -3, 718 expSeriesSamples: []seriesSamples{ 719 { 720 lset: map[string]string{"a": "1"}, 721 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 3}, {t: 4}}}, 722 }, 723 { 724 lset: map[string]string{"a": "2"}, 725 chunks: [][]sample{{{t: -3}, {t: -2}}, {{t: 1}, {t: 3}, {t: 4}}, {{t: 5}, {t: 6}}, {{t: 7}, {t: 8}}}, 726 }, 727 }, 728 }, 729 { 730 // This should not happened because head block is making sure the chunks are not crossing block boundaries. 731 // We used to return error, but now chunk is trimmed. 732 title: "Populate from single block containing chunk outside of compact meta time range.", 733 inputSeriesSamples: [][]seriesSamples{ 734 { 735 { 736 lset: map[string]string{"a": "b"}, 737 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 30}}}, 738 }, 739 }, 740 }, 741 compactMinTime: 0, 742 compactMaxTime: 20, 743 expSeriesSamples: []seriesSamples{ 744 { 745 lset: map[string]string{"a": "b"}, 746 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}}}, 747 }, 748 }, 749 }, 750 { 751 // Introduced by https://github.com/prometheus/tsdb/issues/347. We used to return error, but now chunk is trimmed. 752 title: "Populate from single block containing extra chunk", 753 inputSeriesSamples: [][]seriesSamples{ 754 { 755 { 756 lset: map[string]string{"a": "issue347"}, 757 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}}, 758 }, 759 }, 760 }, 761 compactMinTime: 0, 762 compactMaxTime: 10, 763 expSeriesSamples: []seriesSamples{ 764 { 765 lset: map[string]string{"a": "issue347"}, 766 chunks: [][]sample{{{t: 1}, {t: 2}}}, 767 }, 768 }, 769 }, 770 { 771 // Deduplication expected. 772 // Introduced by pull/370 and pull/539. 773 title: "Populate from two blocks containing duplicated chunk.", 774 inputSeriesSamples: [][]seriesSamples{ 775 { 776 { 777 lset: map[string]string{"a": "b"}, 778 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}}, 779 }, 780 }, 781 { 782 { 783 lset: map[string]string{"a": "b"}, 784 chunks: [][]sample{{{t: 10}, {t: 20}}}, 785 }, 786 }, 787 }, 788 expSeriesSamples: []seriesSamples{ 789 { 790 lset: map[string]string{"a": "b"}, 791 chunks: [][]sample{{{t: 1}, {t: 2}}, {{t: 10}, {t: 20}}}, 792 }, 793 }, 794 }, 795 { 796 // Introduced by https://github.com/prometheus/tsdb/pull/539. 797 title: "Populate from three overlapping blocks.", 798 inputSeriesSamples: [][]seriesSamples{ 799 { 800 { 801 lset: map[string]string{"a": "overlap-all"}, 802 chunks: [][]sample{{{t: 19}, {t: 30}}}, 803 }, 804 { 805 lset: map[string]string{"a": "overlap-beginning"}, 806 chunks: [][]sample{{{t: 0}, {t: 5}}}, 807 }, 808 { 809 lset: map[string]string{"a": "overlap-ending"}, 810 chunks: [][]sample{{{t: 21}, {t: 30}}}, 811 }, 812 }, 813 { 814 { 815 lset: map[string]string{"a": "overlap-all"}, 816 chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 20}}}, 817 }, 818 { 819 lset: map[string]string{"a": "overlap-beginning"}, 820 chunks: [][]sample{{{t: 0}, {t: 10}, {t: 12}, {t: 20}}}, 821 }, 822 { 823 lset: map[string]string{"a": "overlap-ending"}, 824 chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}}, 825 }, 826 }, 827 { 828 { 829 lset: map[string]string{"a": "overlap-all"}, 830 chunks: [][]sample{{{t: 27}, {t: 35}}}, 831 }, 832 { 833 lset: map[string]string{"a": "overlap-ending"}, 834 chunks: [][]sample{{{t: 27}, {t: 35}}}, 835 }, 836 }, 837 }, 838 expSeriesSamples: []seriesSamples{ 839 { 840 lset: map[string]string{"a": "overlap-all"}, 841 chunks: [][]sample{{{t: 0}, {t: 10}, {t: 11}, {t: 19}, {t: 20}, {t: 27}, {t: 30}, {t: 35}}}, 842 }, 843 { 844 lset: map[string]string{"a": "overlap-beginning"}, 845 chunks: [][]sample{{{t: 0}, {t: 5}, {t: 10}, {t: 12}, {t: 20}}}, 846 }, 847 { 848 lset: map[string]string{"a": "overlap-ending"}, 849 chunks: [][]sample{{{t: 0}, {t: 10}, {t: 13}, {t: 20}}, {{t: 21}, {t: 27}, {t: 30}, {t: 35}}}, 850 }, 851 }, 852 }, 853 { 854 title: "Populate from three partially overlapping blocks with few full chunks.", 855 inputSeriesSamples: [][]seriesSamples{ 856 { 857 { 858 lset: map[string]string{"a": "1", "b": "1"}, 859 chunks: samplesForRange(0, 659, 120), // 5 chunks and half. 860 }, 861 { 862 lset: map[string]string{"a": "1", "b": "2"}, 863 chunks: samplesForRange(0, 659, 120), 864 }, 865 }, 866 { 867 { 868 lset: map[string]string{"a": "1", "b": "2"}, 869 chunks: samplesForRange(480, 1199, 120), // two chunks overlapping with previous, two non overlapping and two overlapping with next block. 870 }, 871 { 872 lset: map[string]string{"a": "1", "b": "3"}, 873 chunks: samplesForRange(480, 1199, 120), 874 }, 875 }, 876 { 877 { 878 lset: map[string]string{"a": "1", "b": "2"}, 879 chunks: samplesForRange(960, 1499, 120), // 5 chunks and half. 880 }, 881 { 882 lset: map[string]string{"a": "1", "b": "4"}, 883 chunks: samplesForRange(960, 1499, 120), 884 }, 885 }, 886 }, 887 expSeriesSamples: []seriesSamples{ 888 { 889 lset: map[string]string{"a": "1", "b": "1"}, 890 chunks: samplesForRange(0, 659, 120), 891 }, 892 { 893 lset: map[string]string{"a": "1", "b": "2"}, 894 chunks: samplesForRange(0, 1499, 120), 895 }, 896 { 897 lset: map[string]string{"a": "1", "b": "3"}, 898 chunks: samplesForRange(480, 1199, 120), 899 }, 900 { 901 lset: map[string]string{"a": "1", "b": "4"}, 902 chunks: samplesForRange(960, 1499, 120), 903 }, 904 }, 905 }, 906 { 907 title: "Populate from three partially overlapping blocks with chunks that are expected to merge into single big chunks.", 908 inputSeriesSamples: [][]seriesSamples{ 909 { 910 { 911 lset: map[string]string{"a": "1", "b": "2"}, 912 chunks: [][]sample{{{t: 0}, {t: 6902464}}, {{t: 6961968}, {t: 7080976}}}, 913 }, 914 }, 915 { 916 { 917 lset: map[string]string{"a": "1", "b": "2"}, 918 chunks: [][]sample{{{t: 3600000}, {t: 13953696}}, {{t: 14042952}, {t: 14221464}}}, 919 }, 920 }, 921 { 922 { 923 lset: map[string]string{"a": "1", "b": "2"}, 924 chunks: [][]sample{{{t: 10800000}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}}, 925 }, 926 }, 927 }, 928 expSeriesSamples: []seriesSamples{ 929 { 930 lset: map[string]string{"a": "1", "b": "2"}, 931 chunks: [][]sample{{{t: 0}, {t: 3600000}, {t: 6902464}, {t: 6961968}, {t: 7080976}, {t: 10800000}, {t: 13953696}, {t: 14042952}, {t: 14221464}, {t: 14251232}}, {{t: 14280984}, {t: 14340488}}}, 932 }, 933 }, 934 }, 935 } { 936 t.Run(tc.title, func(t *testing.T) { 937 blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples)) 938 for _, b := range tc.inputSeriesSamples { 939 ir, cr, mint, maxt := createIdxChkReaders(t, b) 940 blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) 941 } 942 943 c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) 944 require.NoError(t, err) 945 946 meta := &BlockMeta{ 947 MinTime: tc.compactMinTime, 948 MaxTime: tc.compactMaxTime, 949 } 950 if meta.MaxTime == 0 { 951 meta.MaxTime = math.MaxInt64 952 } 953 954 iw := &mockIndexWriter{} 955 err = c.populateBlock(blocks, meta, iw, nopChunkWriter{}) 956 if tc.expErr != nil { 957 require.Error(t, err) 958 require.Equal(t, tc.expErr.Error(), err.Error()) 959 return 960 } 961 require.NoError(t, err) 962 963 // Check if response is expected and chunk is valid. 964 var raw []seriesSamples 965 for _, s := range iw.seriesChunks { 966 ss := seriesSamples{lset: s.l.Map()} 967 var iter chunkenc.Iterator 968 for _, chk := range s.chunks { 969 var ( 970 samples = make([]sample, 0, chk.Chunk.NumSamples()) 971 iter = chk.Chunk.Iterator(iter) 972 firstTs int64 = math.MaxInt64 973 s sample 974 ) 975 for iter.Next() { 976 s.t, s.v = iter.At() 977 if firstTs == math.MaxInt64 { 978 firstTs = s.t 979 } 980 samples = append(samples, s) 981 } 982 983 // Check if chunk has correct min, max times. 984 require.Equal(t, firstTs, chk.MinTime, "chunk Meta %v does not match the first encoded sample timestamp: %v", chk, firstTs) 985 require.Equal(t, s.t, chk.MaxTime, "chunk Meta %v does not match the last encoded sample timestamp %v", chk, s.t) 986 987 require.NoError(t, iter.Err()) 988 ss.chunks = append(ss.chunks, samples) 989 } 990 raw = append(raw, ss) 991 } 992 require.Equal(t, tc.expSeriesSamples, raw) 993 994 // Check if stats are calculated properly. 995 s := BlockStats{NumSeries: uint64(len(tc.expSeriesSamples))} 996 for _, series := range tc.expSeriesSamples { 997 s.NumChunks += uint64(len(series.chunks)) 998 for _, chk := range series.chunks { 999 s.NumSamples += uint64(len(chk)) 1000 } 1001 } 1002 require.Equal(t, s, meta.Stats) 1003 }) 1004 } 1005} 1006 1007func BenchmarkCompaction(b *testing.B) { 1008 cases := []struct { 1009 ranges [][2]int64 1010 compactionType string 1011 }{ 1012 { 1013 ranges: [][2]int64{{0, 100}, {200, 300}, {400, 500}, {600, 700}}, 1014 compactionType: "normal", 1015 }, 1016 { 1017 ranges: [][2]int64{{0, 1000}, {2000, 3000}, {4000, 5000}, {6000, 7000}}, 1018 compactionType: "normal", 1019 }, 1020 { 1021 ranges: [][2]int64{{0, 2000}, {3000, 5000}, {6000, 8000}, {9000, 11000}}, 1022 compactionType: "normal", 1023 }, 1024 { 1025 ranges: [][2]int64{{0, 5000}, {6000, 11000}, {12000, 17000}, {18000, 23000}}, 1026 compactionType: "normal", 1027 }, 1028 // 40% overlaps. 1029 { 1030 ranges: [][2]int64{{0, 100}, {60, 160}, {120, 220}, {180, 280}}, 1031 compactionType: "vertical", 1032 }, 1033 { 1034 ranges: [][2]int64{{0, 1000}, {600, 1600}, {1200, 2200}, {1800, 2800}}, 1035 compactionType: "vertical", 1036 }, 1037 { 1038 ranges: [][2]int64{{0, 2000}, {1200, 3200}, {2400, 4400}, {3600, 5600}}, 1039 compactionType: "vertical", 1040 }, 1041 { 1042 ranges: [][2]int64{{0, 5000}, {3000, 8000}, {6000, 11000}, {9000, 14000}}, 1043 compactionType: "vertical", 1044 }, 1045 } 1046 1047 nSeries := 10000 1048 for _, c := range cases { 1049 nBlocks := len(c.ranges) 1050 b.Run(fmt.Sprintf("type=%s,blocks=%d,series=%d,samplesPerSeriesPerBlock=%d", c.compactionType, nBlocks, nSeries, c.ranges[0][1]-c.ranges[0][0]+1), func(b *testing.B) { 1051 dir, err := ioutil.TempDir("", "bench_compaction") 1052 require.NoError(b, err) 1053 defer func() { 1054 require.NoError(b, os.RemoveAll(dir)) 1055 }() 1056 blockDirs := make([]string, 0, len(c.ranges)) 1057 var blocks []*Block 1058 for _, r := range c.ranges { 1059 block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) 1060 require.NoError(b, err) 1061 blocks = append(blocks, block) 1062 defer func() { 1063 require.NoError(b, block.Close()) 1064 }() 1065 blockDirs = append(blockDirs, block.Dir()) 1066 } 1067 1068 c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) 1069 require.NoError(b, err) 1070 1071 b.ResetTimer() 1072 b.ReportAllocs() 1073 for i := 0; i < b.N; i++ { 1074 _, err = c.Compact(dir, blockDirs, blocks) 1075 require.NoError(b, err) 1076 } 1077 }) 1078 } 1079} 1080 1081func BenchmarkCompactionFromHead(b *testing.B) { 1082 dir, err := ioutil.TempDir("", "bench_compaction_from_head") 1083 require.NoError(b, err) 1084 defer func() { 1085 require.NoError(b, os.RemoveAll(dir)) 1086 }() 1087 totalSeries := 100000 1088 for labelNames := 1; labelNames < totalSeries; labelNames *= 10 { 1089 labelValues := totalSeries / labelNames 1090 b.Run(fmt.Sprintf("labelnames=%d,labelvalues=%d", labelNames, labelValues), func(b *testing.B) { 1091 chunkDir, err := ioutil.TempDir("", "chunk_dir") 1092 require.NoError(b, err) 1093 defer func() { 1094 require.NoError(b, os.RemoveAll(chunkDir)) 1095 }() 1096 opts := DefaultHeadOptions() 1097 opts.ChunkRange = 1000 1098 opts.ChunkDirRoot = chunkDir 1099 h, err := NewHead(nil, nil, nil, opts, nil) 1100 require.NoError(b, err) 1101 for ln := 0; ln < labelNames; ln++ { 1102 app := h.Appender(context.Background()) 1103 for lv := 0; lv < labelValues; lv++ { 1104 app.Append(0, labels.FromStrings(fmt.Sprintf("%d", ln), fmt.Sprintf("%d%s%d", lv, postingsBenchSuffix, ln)), 0, 0) 1105 } 1106 require.NoError(b, app.Commit()) 1107 } 1108 1109 b.ResetTimer() 1110 b.ReportAllocs() 1111 for i := 0; i < b.N; i++ { 1112 createBlockFromHead(b, filepath.Join(dir, fmt.Sprintf("%d-%d", i, labelNames)), h) 1113 } 1114 h.Close() 1115 }) 1116 } 1117} 1118 1119// TestDisableAutoCompactions checks that we can 1120// disable and enable the auto compaction. 1121// This is needed for unit tests that rely on 1122// checking state before and after a compaction. 1123func TestDisableAutoCompactions(t *testing.T) { 1124 db := openTestDB(t, nil, nil) 1125 defer func() { 1126 require.NoError(t, db.Close()) 1127 }() 1128 1129 blockRange := db.compactor.(*LeveledCompactor).ranges[0] 1130 label := labels.FromStrings("foo", "bar") 1131 1132 // Trigger a compaction to check that it was skipped and 1133 // no new blocks were created when compaction is disabled. 1134 db.DisableCompactions() 1135 app := db.Appender(context.Background()) 1136 for i := int64(0); i < 3; i++ { 1137 _, err := app.Append(0, label, i*blockRange, 0) 1138 require.NoError(t, err) 1139 _, err = app.Append(0, label, i*blockRange+1000, 0) 1140 require.NoError(t, err) 1141 } 1142 require.NoError(t, app.Commit()) 1143 1144 select { 1145 case db.compactc <- struct{}{}: 1146 default: 1147 } 1148 1149 for x := 0; x < 10; x++ { 1150 if prom_testutil.ToFloat64(db.metrics.compactionsSkipped) > 0.0 { 1151 break 1152 } 1153 time.Sleep(10 * time.Millisecond) 1154 } 1155 1156 require.Greater(t, prom_testutil.ToFloat64(db.metrics.compactionsSkipped), 0.0, "No compaction was skipped after the set timeout.") 1157 require.Equal(t, 0, len(db.blocks)) 1158 1159 // Enable the compaction, trigger it and check that the block is persisted. 1160 db.EnableCompactions() 1161 select { 1162 case db.compactc <- struct{}{}: 1163 default: 1164 } 1165 for x := 0; x < 100; x++ { 1166 if len(db.Blocks()) > 0 { 1167 break 1168 } 1169 time.Sleep(100 * time.Millisecond) 1170 } 1171 require.Greater(t, len(db.Blocks()), 0, "No block was persisted after the set timeout.") 1172} 1173 1174// TestCancelCompactions ensures that when the db is closed 1175// any running compaction is cancelled to unblock closing the db. 1176func TestCancelCompactions(t *testing.T) { 1177 tmpdir, err := ioutil.TempDir("", "testCancelCompaction") 1178 require.NoError(t, err) 1179 defer func() { 1180 require.NoError(t, os.RemoveAll(tmpdir)) 1181 }() 1182 1183 // Create some blocks to fall within the compaction range. 1184 createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000)) 1185 createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000)) 1186 createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. 1187 1188 // Copy the db so we have an exact copy to compare compaction times. 1189 tmpdirCopy := tmpdir + "Copy" 1190 err = fileutil.CopyDirs(tmpdir, tmpdirCopy) 1191 require.NoError(t, err) 1192 defer func() { 1193 require.NoError(t, os.RemoveAll(tmpdirCopy)) 1194 }() 1195 1196 // Measure the compaction time without interrupting it. 1197 var timeCompactionUninterrupted time.Duration 1198 { 1199 db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) 1200 require.NoError(t, err) 1201 require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch") 1202 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") 1203 db.compactc <- struct{}{} // Trigger a compaction. 1204 var start time.Time 1205 for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { 1206 time.Sleep(3 * time.Millisecond) 1207 } 1208 start = time.Now() 1209 1210 for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran) != 1 { 1211 time.Sleep(3 * time.Millisecond) 1212 } 1213 timeCompactionUninterrupted = time.Since(start) 1214 1215 require.NoError(t, db.Close()) 1216 } 1217 // Measure the compaction time when closing the db in the middle of compaction. 1218 { 1219 db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) 1220 require.NoError(t, err) 1221 require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch") 1222 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") 1223 db.compactc <- struct{}{} // Trigger a compaction. 1224 dbClosed := make(chan struct{}) 1225 1226 for prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.populatingBlocks) <= 0 { 1227 time.Sleep(3 * time.Millisecond) 1228 } 1229 go func() { 1230 require.NoError(t, db.Close()) 1231 close(dbClosed) 1232 }() 1233 1234 start := time.Now() 1235 <-dbClosed 1236 actT := time.Since(start) 1237 expT := time.Duration(timeCompactionUninterrupted / 2) // Closing the db in the middle of compaction should less than half the time. 1238 require.True(t, actT < expT, "closing the db took more than expected. exp: <%v, act: %v", expT, actT) 1239 } 1240} 1241 1242// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reloadBlocks immediately after a compaction 1243// deletes the resulting block to avoid creatings blocks with the same time range. 1244func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { 1245 tests := map[string]func(*DB) int{ 1246 "Test Head Compaction": func(db *DB) int { 1247 rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 - 1 1248 defaultLabel := labels.FromStrings("foo", "bar") 1249 1250 // Add some data to the head that is enough to trigger a compaction. 1251 app := db.Appender(context.Background()) 1252 _, err := app.Append(0, defaultLabel, 1, 0) 1253 require.NoError(t, err) 1254 _, err = app.Append(0, defaultLabel, 2, 0) 1255 require.NoError(t, err) 1256 _, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0) 1257 require.NoError(t, err) 1258 require.NoError(t, app.Commit()) 1259 1260 return 0 1261 }, 1262 "Test Block Compaction": func(db *DB) int { 1263 blocks := []*BlockMeta{ 1264 {MinTime: 0, MaxTime: 100}, 1265 {MinTime: 100, MaxTime: 150}, 1266 {MinTime: 150, MaxTime: 200}, 1267 } 1268 for _, m := range blocks { 1269 createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) 1270 } 1271 require.NoError(t, db.reload()) 1272 require.Equal(t, len(blocks), len(db.Blocks()), "unexpected block count after a reloadBlocks") 1273 1274 return len(blocks) 1275 }, 1276 } 1277 1278 for title, bootStrap := range tests { 1279 t.Run(title, func(t *testing.T) { 1280 db := openTestDB(t, nil, []int64{1, 100}) 1281 defer func() { 1282 require.NoError(t, db.Close()) 1283 }() 1284 db.DisableCompactions() 1285 1286 expBlocks := bootStrap(db) 1287 1288 // Create a block that will trigger the reloadBlocks to fail. 1289 blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) 1290 lastBlockIndex := path.Join(blockPath, indexFilename) 1291 actBlocks, err := blockDirs(db.Dir()) 1292 require.NoError(t, err) 1293 require.Equal(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block. 1294 require.NoError(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. 1295 1296 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reloadBlocks' count metrics mismatch") 1297 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") 1298 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "initial `compactions failed` count metric mismatch") 1299 1300 // Do the compaction and check the metrics. 1301 // Compaction should succeed, but the reloadBlocks should fail and 1302 // the new block created from the compaction should be deleted. 1303 require.Error(t, db.Compact()) 1304 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reloadBlocks' count metrics mismatch") 1305 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") 1306 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.metrics.compactionsFailed), "`compactions failed` count metric mismatch") 1307 1308 actBlocks, err = blockDirs(db.Dir()) 1309 require.NoError(t, err) 1310 require.Equal(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block. 1311 }) 1312 } 1313} 1314