1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "encoding/binary" 18 "fmt" 19 "hash/crc32" 20 "io/ioutil" 21 "math" 22 "math/rand" 23 "os" 24 "path" 25 "path/filepath" 26 "sort" 27 "strconv" 28 "sync" 29 "testing" 30 "time" 31 32 "github.com/prometheus/prometheus/tsdb/fileutil" 33 34 "github.com/go-kit/kit/log" 35 "github.com/oklog/ulid" 36 "github.com/pkg/errors" 37 "github.com/prometheus/client_golang/prometheus" 38 prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" 39 "github.com/prometheus/prometheus/pkg/labels" 40 "github.com/prometheus/prometheus/tsdb/chunks" 41 "github.com/prometheus/prometheus/tsdb/index" 42 "github.com/prometheus/prometheus/tsdb/record" 43 "github.com/prometheus/prometheus/tsdb/tombstones" 44 "github.com/prometheus/prometheus/tsdb/tsdbutil" 45 "github.com/prometheus/prometheus/tsdb/wal" 46 "github.com/prometheus/prometheus/util/testutil" 47) 48 49func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { 50 tmpdir, err := ioutil.TempDir("", "test") 51 testutil.Ok(t, err) 52 53 db, err = Open(tmpdir, nil, nil, opts) 54 testutil.Ok(t, err) 55 56 // Do not close the test database by default as it will deadlock on test failures. 57 return db, func() { 58 testutil.Ok(t, os.RemoveAll(tmpdir)) 59 } 60} 61 62// query runs a matcher query against the querier and fully expands its data. 63func query(t testing.TB, q Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample { 64 ss, err := q.Select(matchers...) 65 defer func() { 66 testutil.Ok(t, q.Close()) 67 }() 68 testutil.Ok(t, err) 69 70 result := map[string][]tsdbutil.Sample{} 71 72 for ss.Next() { 73 series := ss.At() 74 75 samples := []tsdbutil.Sample{} 76 it := series.Iterator() 77 for it.Next() { 78 t, v := it.At() 79 samples = append(samples, sample{t: t, v: v}) 80 } 81 testutil.Ok(t, it.Err()) 82 83 name := series.Labels().String() 84 result[name] = samples 85 } 86 testutil.Ok(t, ss.Err()) 87 88 return result 89} 90 91// Ensure that blocks are held in memory in their time order 92// and not in ULID order as they are read from the directory. 93func TestDB_reloadOrder(t *testing.T) { 94 db, delete := openTestDB(t, nil) 95 defer func() { 96 testutil.Ok(t, db.Close()) 97 delete() 98 }() 99 100 metas := []BlockMeta{ 101 {MinTime: 90, MaxTime: 100}, 102 {MinTime: 70, MaxTime: 80}, 103 {MinTime: 100, MaxTime: 110}, 104 } 105 for _, m := range metas { 106 createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) 107 } 108 109 testutil.Ok(t, db.reload()) 110 blocks := db.Blocks() 111 testutil.Equals(t, 3, len(blocks)) 112 testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) 113 testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) 114 testutil.Equals(t, metas[0].MinTime, blocks[1].Meta().MinTime) 115 testutil.Equals(t, metas[0].MaxTime, blocks[1].Meta().MaxTime) 116 testutil.Equals(t, metas[2].MinTime, blocks[2].Meta().MinTime) 117 testutil.Equals(t, metas[2].MaxTime, blocks[2].Meta().MaxTime) 118} 119 120func TestDataAvailableOnlyAfterCommit(t *testing.T) { 121 db, delete := openTestDB(t, nil) 122 defer func() { 123 testutil.Ok(t, db.Close()) 124 delete() 125 }() 126 127 app := db.Appender() 128 129 _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) 130 testutil.Ok(t, err) 131 132 querier, err := db.Querier(0, 1) 133 testutil.Ok(t, err) 134 seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 135 testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet) 136 137 err = app.Commit() 138 testutil.Ok(t, err) 139 140 querier, err = db.Querier(0, 1) 141 testutil.Ok(t, err) 142 defer querier.Close() 143 144 seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 145 146 testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet) 147} 148 149func TestDataNotAvailableAfterRollback(t *testing.T) { 150 db, delete := openTestDB(t, nil) 151 defer func() { 152 testutil.Ok(t, db.Close()) 153 delete() 154 }() 155 156 app := db.Appender() 157 _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) 158 testutil.Ok(t, err) 159 160 err = app.Rollback() 161 testutil.Ok(t, err) 162 163 querier, err := db.Querier(0, 1) 164 testutil.Ok(t, err) 165 defer querier.Close() 166 167 seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 168 169 testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet) 170} 171 172func TestDBAppenderAddRef(t *testing.T) { 173 db, delete := openTestDB(t, nil) 174 defer func() { 175 testutil.Ok(t, db.Close()) 176 delete() 177 }() 178 179 app1 := db.Appender() 180 181 ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) 182 testutil.Ok(t, err) 183 184 // Reference should already work before commit. 185 err = app1.AddFast(ref1, 124, 1) 186 testutil.Ok(t, err) 187 188 err = app1.Commit() 189 testutil.Ok(t, err) 190 191 app2 := db.Appender() 192 193 // first ref should already work in next transaction. 194 err = app2.AddFast(ref1, 125, 0) 195 testutil.Ok(t, err) 196 197 ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1) 198 testutil.Ok(t, err) 199 200 testutil.Assert(t, ref1 == ref2, "") 201 202 // Reference must be valid to add another sample. 203 err = app2.AddFast(ref2, 143, 2) 204 testutil.Ok(t, err) 205 206 err = app2.AddFast(9999999, 1, 1) 207 testutil.Equals(t, ErrNotFound, errors.Cause(err)) 208 209 testutil.Ok(t, app2.Commit()) 210 211 q, err := db.Querier(0, 200) 212 testutil.Ok(t, err) 213 214 res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 215 216 testutil.Equals(t, map[string][]tsdbutil.Sample{ 217 labels.FromStrings("a", "b").String(): { 218 sample{t: 123, v: 0}, 219 sample{t: 124, v: 1}, 220 sample{t: 125, v: 0}, 221 sample{t: 133, v: 1}, 222 sample{t: 143, v: 2}, 223 }, 224 }, res) 225} 226 227func TestAppendEmptyLabelsIgnored(t *testing.T) { 228 db, delete := openTestDB(t, nil) 229 defer func() { 230 testutil.Ok(t, db.Close()) 231 delete() 232 }() 233 234 app1 := db.Appender() 235 236 ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0) 237 testutil.Ok(t, err) 238 239 // Construct labels manually so there is an empty label. 240 ref2, err := app1.Add(labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0) 241 testutil.Ok(t, err) 242 243 // Should be the same series. 244 testutil.Equals(t, ref1, ref2) 245 246 err = app1.Commit() 247 testutil.Ok(t, err) 248} 249 250func TestDeleteSimple(t *testing.T) { 251 numSamples := int64(10) 252 253 cases := []struct { 254 Intervals tombstones.Intervals 255 remaint []int64 256 }{ 257 { 258 Intervals: tombstones.Intervals{{Mint: 0, Maxt: 3}}, 259 remaint: []int64{4, 5, 6, 7, 8, 9}, 260 }, 261 { 262 Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}}, 263 remaint: []int64{0, 4, 5, 6, 7, 8, 9}, 264 }, 265 { 266 Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, 267 remaint: []int64{0, 8, 9}, 268 }, 269 { 270 Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}}, 271 remaint: []int64{0}, 272 }, 273 { // This case is to ensure that labels and symbols are deleted. 274 Intervals: tombstones.Intervals{{Mint: 0, Maxt: 9}}, 275 remaint: []int64{}, 276 }, 277 } 278 279Outer: 280 for _, c := range cases { 281 db, delete := openTestDB(t, nil) 282 defer func() { 283 testutil.Ok(t, db.Close()) 284 delete() 285 }() 286 287 app := db.Appender() 288 289 smpls := make([]float64, numSamples) 290 for i := int64(0); i < numSamples; i++ { 291 smpls[i] = rand.Float64() 292 app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) 293 } 294 295 testutil.Ok(t, app.Commit()) 296 297 // TODO(gouthamve): Reset the tombstones somehow. 298 // Delete the ranges. 299 for _, r := range c.Intervals { 300 testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) 301 } 302 303 // Compare the result. 304 q, err := db.Querier(0, numSamples) 305 testutil.Ok(t, err) 306 307 res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 308 testutil.Ok(t, err) 309 310 expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) 311 for _, ts := range c.remaint { 312 expSamples = append(expSamples, sample{ts, smpls[ts]}) 313 } 314 315 expss := newMockSeriesSet([]Series{ 316 newSeries(map[string]string{"a": "b"}, expSamples), 317 }) 318 319 lns, err := q.LabelNames() 320 testutil.Ok(t, err) 321 lvs, err := q.LabelValues("a") 322 testutil.Ok(t, err) 323 if len(expSamples) == 0 { 324 testutil.Equals(t, 0, len(lns)) 325 testutil.Equals(t, 0, len(lvs)) 326 testutil.Assert(t, res.Next() == false, "") 327 continue 328 } else { 329 testutil.Equals(t, 1, len(lns)) 330 testutil.Equals(t, 1, len(lvs)) 331 testutil.Equals(t, "a", lns[0]) 332 testutil.Equals(t, "b", lvs[0]) 333 } 334 335 for { 336 eok, rok := expss.Next(), res.Next() 337 testutil.Equals(t, eok, rok) 338 339 if !eok { 340 continue Outer 341 } 342 sexp := expss.At() 343 sres := res.At() 344 345 testutil.Equals(t, sexp.Labels(), sres.Labels()) 346 347 smplExp, errExp := expandSeriesIterator(sexp.Iterator()) 348 smplRes, errRes := expandSeriesIterator(sres.Iterator()) 349 350 testutil.Equals(t, errExp, errRes) 351 testutil.Equals(t, smplExp, smplRes) 352 } 353 } 354} 355 356func TestAmendDatapointCausesError(t *testing.T) { 357 db, delete := openTestDB(t, nil) 358 defer func() { 359 testutil.Ok(t, db.Close()) 360 delete() 361 }() 362 363 app := db.Appender() 364 _, err := app.Add(labels.Labels{}, 0, 0) 365 testutil.Ok(t, err) 366 testutil.Ok(t, app.Commit()) 367 368 app = db.Appender() 369 _, err = app.Add(labels.Labels{}, 0, 1) 370 testutil.Equals(t, ErrAmendSample, err) 371 testutil.Ok(t, app.Rollback()) 372} 373 374func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { 375 db, delete := openTestDB(t, nil) 376 defer func() { 377 testutil.Ok(t, db.Close()) 378 delete() 379 }() 380 381 app := db.Appender() 382 _, err := app.Add(labels.Labels{}, 0, math.NaN()) 383 testutil.Ok(t, err) 384 testutil.Ok(t, app.Commit()) 385 386 app = db.Appender() 387 _, err = app.Add(labels.Labels{}, 0, math.NaN()) 388 testutil.Ok(t, err) 389} 390 391func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { 392 db, delete := openTestDB(t, nil) 393 defer func() { 394 testutil.Ok(t, db.Close()) 395 delete() 396 }() 397 app := db.Appender() 398 _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) 399 testutil.Ok(t, err) 400 testutil.Ok(t, app.Commit()) 401 402 app = db.Appender() 403 _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002)) 404 testutil.Equals(t, ErrAmendSample, err) 405} 406 407func TestSkippingInvalidValuesInSameTxn(t *testing.T) { 408 db, delete := openTestDB(t, nil) 409 defer func() { 410 testutil.Ok(t, db.Close()) 411 delete() 412 }() 413 414 // Append AmendedValue. 415 app := db.Appender() 416 _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1) 417 testutil.Ok(t, err) 418 _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2) 419 testutil.Ok(t, err) 420 testutil.Ok(t, app.Commit()) 421 422 // Make sure the right value is stored. 423 q, err := db.Querier(0, 10) 424 testutil.Ok(t, err) 425 426 ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 427 428 testutil.Equals(t, map[string][]tsdbutil.Sample{ 429 labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}}, 430 }, ssMap) 431 432 // Append Out of Order Value. 433 app = db.Appender() 434 _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3) 435 testutil.Ok(t, err) 436 _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5) 437 testutil.Ok(t, err) 438 testutil.Ok(t, app.Commit()) 439 440 q, err = db.Querier(0, 10) 441 testutil.Ok(t, err) 442 443 ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 444 445 testutil.Equals(t, map[string][]tsdbutil.Sample{ 446 labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}, sample{10, 3}}, 447 }, ssMap) 448} 449 450func TestDB_Snapshot(t *testing.T) { 451 db, delete := openTestDB(t, nil) 452 defer delete() 453 454 // append data 455 app := db.Appender() 456 mint := int64(1414141414000) 457 for i := 0; i < 1000; i++ { 458 _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) 459 testutil.Ok(t, err) 460 } 461 testutil.Ok(t, app.Commit()) 462 testutil.Ok(t, app.Rollback()) 463 464 // create snapshot 465 snap, err := ioutil.TempDir("", "snap") 466 testutil.Ok(t, err) 467 468 defer func() { 469 testutil.Ok(t, os.RemoveAll(snap)) 470 }() 471 testutil.Ok(t, db.Snapshot(snap, true)) 472 testutil.Ok(t, db.Close()) 473 474 // reopen DB from snapshot 475 db, err = Open(snap, nil, nil, nil) 476 testutil.Ok(t, err) 477 defer func() { testutil.Ok(t, db.Close()) }() 478 479 querier, err := db.Querier(mint, mint+1000) 480 testutil.Ok(t, err) 481 defer func() { testutil.Ok(t, querier.Close()) }() 482 483 // sum values 484 seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 485 testutil.Ok(t, err) 486 487 sum := 0.0 488 for seriesSet.Next() { 489 series := seriesSet.At().Iterator() 490 for series.Next() { 491 _, v := series.At() 492 sum += v 493 } 494 testutil.Ok(t, series.Err()) 495 } 496 testutil.Ok(t, seriesSet.Err()) 497 testutil.Equals(t, 1000.0, sum) 498} 499 500// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples 501// that are outside the set block time range. 502// See https://github.com/prometheus/prometheus/issues/5105 503func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { 504 db, delete := openTestDB(t, nil) 505 defer delete() 506 507 app := db.Appender() 508 mint := int64(1414141414000) 509 for i := 0; i < 1000; i++ { 510 _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0) 511 testutil.Ok(t, err) 512 } 513 testutil.Ok(t, app.Commit()) 514 testutil.Ok(t, app.Rollback()) 515 516 snap, err := ioutil.TempDir("", "snap") 517 testutil.Ok(t, err) 518 519 // Hackingly introduce "race", by having lower max time then maxTime in last chunk. 520 db.head.maxTime = db.head.maxTime - 10 521 522 defer func() { 523 testutil.Ok(t, os.RemoveAll(snap)) 524 }() 525 testutil.Ok(t, db.Snapshot(snap, true)) 526 testutil.Ok(t, db.Close()) 527 528 // Reopen DB from snapshot. 529 db, err = Open(snap, nil, nil, nil) 530 testutil.Ok(t, err) 531 defer func() { testutil.Ok(t, db.Close()) }() 532 533 querier, err := db.Querier(mint, mint+1000) 534 testutil.Ok(t, err) 535 defer func() { testutil.Ok(t, querier.Close()) }() 536 537 // Sum values. 538 seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 539 testutil.Ok(t, err) 540 541 sum := 0.0 542 for seriesSet.Next() { 543 series := seriesSet.At().Iterator() 544 for series.Next() { 545 _, v := series.At() 546 sum += v 547 } 548 testutil.Ok(t, series.Err()) 549 } 550 testutil.Ok(t, seriesSet.Err()) 551 552 // Since we snapshotted with MaxTime - 10, so expect 10 less samples. 553 testutil.Equals(t, 1000.0-10, sum) 554} 555 556func TestDB_SnapshotWithDelete(t *testing.T) { 557 numSamples := int64(10) 558 559 db, delete := openTestDB(t, nil) 560 defer delete() 561 562 app := db.Appender() 563 564 smpls := make([]float64, numSamples) 565 for i := int64(0); i < numSamples; i++ { 566 smpls[i] = rand.Float64() 567 app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) 568 } 569 570 testutil.Ok(t, app.Commit()) 571 cases := []struct { 572 intervals tombstones.Intervals 573 remaint []int64 574 }{ 575 { 576 intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, 577 remaint: []int64{0, 8, 9}, 578 }, 579 } 580 581Outer: 582 for _, c := range cases { 583 // TODO(gouthamve): Reset the tombstones somehow. 584 // Delete the ranges. 585 for _, r := range c.intervals { 586 testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) 587 } 588 589 // create snapshot 590 snap, err := ioutil.TempDir("", "snap") 591 testutil.Ok(t, err) 592 593 defer func() { 594 testutil.Ok(t, os.RemoveAll(snap)) 595 }() 596 testutil.Ok(t, db.Snapshot(snap, true)) 597 testutil.Ok(t, db.Close()) 598 599 // reopen DB from snapshot 600 db, err = Open(snap, nil, nil, nil) 601 testutil.Ok(t, err) 602 defer func() { testutil.Ok(t, db.Close()) }() 603 604 // Compare the result. 605 q, err := db.Querier(0, numSamples) 606 testutil.Ok(t, err) 607 defer func() { testutil.Ok(t, q.Close()) }() 608 609 res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 610 testutil.Ok(t, err) 611 612 expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) 613 for _, ts := range c.remaint { 614 expSamples = append(expSamples, sample{ts, smpls[ts]}) 615 } 616 617 expss := newMockSeriesSet([]Series{ 618 newSeries(map[string]string{"a": "b"}, expSamples), 619 }) 620 621 if len(expSamples) == 0 { 622 testutil.Assert(t, res.Next() == false, "") 623 continue 624 } 625 626 for { 627 eok, rok := expss.Next(), res.Next() 628 testutil.Equals(t, eok, rok) 629 630 if !eok { 631 continue Outer 632 } 633 sexp := expss.At() 634 sres := res.At() 635 636 testutil.Equals(t, sexp.Labels(), sres.Labels()) 637 638 smplExp, errExp := expandSeriesIterator(sexp.Iterator()) 639 smplRes, errRes := expandSeriesIterator(sres.Iterator()) 640 641 testutil.Equals(t, errExp, errRes) 642 testutil.Equals(t, smplExp, smplRes) 643 } 644 } 645} 646 647func TestDB_e2e(t *testing.T) { 648 const ( 649 numDatapoints = 1000 650 numRanges = 1000 651 timeInterval = int64(3) 652 ) 653 // Create 8 series with 1000 data-points of different ranges and run queries. 654 lbls := []labels.Labels{ 655 { 656 {Name: "a", Value: "b"}, 657 {Name: "instance", Value: "localhost:9090"}, 658 {Name: "job", Value: "prometheus"}, 659 }, 660 { 661 {Name: "a", Value: "b"}, 662 {Name: "instance", Value: "127.0.0.1:9090"}, 663 {Name: "job", Value: "prometheus"}, 664 }, 665 { 666 {Name: "a", Value: "b"}, 667 {Name: "instance", Value: "127.0.0.1:9090"}, 668 {Name: "job", Value: "prom-k8s"}, 669 }, 670 { 671 {Name: "a", Value: "b"}, 672 {Name: "instance", Value: "localhost:9090"}, 673 {Name: "job", Value: "prom-k8s"}, 674 }, 675 { 676 {Name: "a", Value: "c"}, 677 {Name: "instance", Value: "localhost:9090"}, 678 {Name: "job", Value: "prometheus"}, 679 }, 680 { 681 {Name: "a", Value: "c"}, 682 {Name: "instance", Value: "127.0.0.1:9090"}, 683 {Name: "job", Value: "prometheus"}, 684 }, 685 { 686 {Name: "a", Value: "c"}, 687 {Name: "instance", Value: "127.0.0.1:9090"}, 688 {Name: "job", Value: "prom-k8s"}, 689 }, 690 { 691 {Name: "a", Value: "c"}, 692 {Name: "instance", Value: "localhost:9090"}, 693 {Name: "job", Value: "prom-k8s"}, 694 }, 695 } 696 697 seriesMap := map[string][]tsdbutil.Sample{} 698 for _, l := range lbls { 699 seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} 700 } 701 702 db, delete := openTestDB(t, nil) 703 defer func() { 704 testutil.Ok(t, db.Close()) 705 delete() 706 }() 707 708 app := db.Appender() 709 710 for _, l := range lbls { 711 lset := labels.New(l...) 712 series := []tsdbutil.Sample{} 713 714 ts := rand.Int63n(300) 715 for i := 0; i < numDatapoints; i++ { 716 v := rand.Float64() 717 718 series = append(series, sample{ts, v}) 719 720 _, err := app.Add(lset, ts, v) 721 testutil.Ok(t, err) 722 723 ts += rand.Int63n(timeInterval) + 1 724 } 725 726 seriesMap[lset.String()] = series 727 } 728 729 testutil.Ok(t, app.Commit()) 730 731 // Query each selector on 1000 random time-ranges. 732 queries := []struct { 733 ms []*labels.Matcher 734 }{ 735 { 736 ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")}, 737 }, 738 { 739 ms: []*labels.Matcher{ 740 labels.MustNewMatcher(labels.MatchEqual, "a", "b"), 741 labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"), 742 }, 743 }, 744 { 745 ms: []*labels.Matcher{ 746 labels.MustNewMatcher(labels.MatchEqual, "a", "c"), 747 labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"), 748 labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"), 749 }, 750 }, 751 // TODO: Add Regexp Matchers. 752 } 753 754 for _, qry := range queries { 755 matched := labels.Slice{} 756 for _, ls := range lbls { 757 s := labels.Selector(qry.ms) 758 if s.Matches(ls) { 759 matched = append(matched, ls) 760 } 761 } 762 763 sort.Sort(matched) 764 765 for i := 0; i < numRanges; i++ { 766 mint := rand.Int63n(300) 767 maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) 768 769 expected := map[string][]tsdbutil.Sample{} 770 771 // Build the mockSeriesSet. 772 for _, m := range matched { 773 smpls := boundedSamples(seriesMap[m.String()], mint, maxt) 774 if len(smpls) > 0 { 775 expected[m.String()] = smpls 776 } 777 } 778 779 q, err := db.Querier(mint, maxt) 780 testutil.Ok(t, err) 781 782 ss, err := q.Select(qry.ms...) 783 testutil.Ok(t, err) 784 785 result := map[string][]tsdbutil.Sample{} 786 787 for ss.Next() { 788 x := ss.At() 789 790 smpls, err := expandSeriesIterator(x.Iterator()) 791 testutil.Ok(t, err) 792 793 if len(smpls) > 0 { 794 result[x.Labels().String()] = smpls 795 } 796 } 797 798 testutil.Ok(t, ss.Err()) 799 testutil.Equals(t, expected, result) 800 801 q.Close() 802 } 803 } 804} 805 806func TestWALFlushedOnDBClose(t *testing.T) { 807 db, delete := openTestDB(t, nil) 808 defer delete() 809 810 dirDb := db.Dir() 811 812 lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} 813 814 app := db.Appender() 815 _, err := app.Add(lbls, 0, 1) 816 testutil.Ok(t, err) 817 testutil.Ok(t, app.Commit()) 818 819 testutil.Ok(t, db.Close()) 820 821 db, err = Open(dirDb, nil, nil, nil) 822 testutil.Ok(t, err) 823 defer func() { testutil.Ok(t, db.Close()) }() 824 825 q, err := db.Querier(0, 1) 826 testutil.Ok(t, err) 827 828 values, err := q.LabelValues("labelname") 829 testutil.Ok(t, err) 830 testutil.Equals(t, []string{"labelvalue"}, values) 831} 832 833func TestWALSegmentSizeOptions(t *testing.T) { 834 tests := map[int]func(dbdir string, segmentSize int){ 835 // Default Wal Size. 836 0: func(dbDir string, segmentSize int) { 837 files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) 838 testutil.Ok(t, err) 839 for _, f := range files[:len(files)-1] { 840 testutil.Equals(t, int64(DefaultOptions.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) 841 } 842 lastFile := files[len(files)-1] 843 testutil.Assert(t, int64(DefaultOptions.WALSegmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name()) 844 }, 845 // Custom Wal Size. 846 2 * 32 * 1024: func(dbDir string, segmentSize int) { 847 files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal")) 848 testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.") 849 testutil.Ok(t, err) 850 for _, f := range files[:len(files)-1] { 851 testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name()) 852 } 853 lastFile := files[len(files)-1] 854 testutil.Assert(t, int64(segmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name()) 855 }, 856 // Wal disabled. 857 -1: func(dbDir string, segmentSize int) { 858 if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) { 859 t.Fatal("wal directory is present when the wal is disabled") 860 } 861 }, 862 } 863 for segmentSize, testFunc := range tests { 864 t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) { 865 options := *DefaultOptions 866 options.WALSegmentSize = segmentSize 867 db, delete := openTestDB(t, &options) 868 defer delete() 869 app := db.Appender() 870 for i := int64(0); i < 155; i++ { 871 _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) 872 testutil.Ok(t, err) 873 testutil.Ok(t, app.Commit()) 874 } 875 876 dbDir := db.Dir() 877 db.Close() 878 testFunc(dbDir, options.WALSegmentSize) 879 }) 880 } 881} 882 883func TestTombstoneClean(t *testing.T) { 884 numSamples := int64(10) 885 886 db, delete := openTestDB(t, nil) 887 defer delete() 888 889 app := db.Appender() 890 891 smpls := make([]float64, numSamples) 892 for i := int64(0); i < numSamples; i++ { 893 smpls[i] = rand.Float64() 894 app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) 895 } 896 897 testutil.Ok(t, app.Commit()) 898 cases := []struct { 899 intervals tombstones.Intervals 900 remaint []int64 901 }{ 902 { 903 intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, 904 remaint: []int64{0, 8, 9}, 905 }, 906 } 907 908 for _, c := range cases { 909 // Delete the ranges. 910 911 // create snapshot 912 snap, err := ioutil.TempDir("", "snap") 913 testutil.Ok(t, err) 914 915 defer func() { 916 testutil.Ok(t, os.RemoveAll(snap)) 917 }() 918 testutil.Ok(t, db.Snapshot(snap, true)) 919 testutil.Ok(t, db.Close()) 920 921 // reopen DB from snapshot 922 db, err = Open(snap, nil, nil, nil) 923 testutil.Ok(t, err) 924 defer db.Close() 925 926 for _, r := range c.intervals { 927 testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) 928 } 929 930 // All of the setup for THIS line. 931 testutil.Ok(t, db.CleanTombstones()) 932 933 // Compare the result. 934 q, err := db.Querier(0, numSamples) 935 testutil.Ok(t, err) 936 defer q.Close() 937 938 res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 939 testutil.Ok(t, err) 940 941 expSamples := make([]tsdbutil.Sample, 0, len(c.remaint)) 942 for _, ts := range c.remaint { 943 expSamples = append(expSamples, sample{ts, smpls[ts]}) 944 } 945 946 expss := newMockSeriesSet([]Series{ 947 newSeries(map[string]string{"a": "b"}, expSamples), 948 }) 949 950 if len(expSamples) == 0 { 951 testutil.Assert(t, res.Next() == false, "") 952 continue 953 } 954 955 for { 956 eok, rok := expss.Next(), res.Next() 957 testutil.Equals(t, eok, rok) 958 959 if !eok { 960 break 961 } 962 sexp := expss.At() 963 sres := res.At() 964 965 testutil.Equals(t, sexp.Labels(), sres.Labels()) 966 967 smplExp, errExp := expandSeriesIterator(sexp.Iterator()) 968 smplRes, errRes := expandSeriesIterator(sres.Iterator()) 969 970 testutil.Equals(t, errExp, errRes) 971 testutil.Equals(t, smplExp, smplRes) 972 } 973 974 for _, b := range db.Blocks() { 975 testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones) 976 } 977 } 978} 979 980// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind. 981// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so 982// if TombstoneClean leaves any blocks behind these will overlap. 983func TestTombstoneCleanFail(t *testing.T) { 984 985 db, delete := openTestDB(t, nil) 986 defer func() { 987 testutil.Ok(t, db.Close()) 988 delete() 989 }() 990 991 var expectedBlockDirs []string 992 993 // Create some empty blocks pending for compaction. 994 // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. 995 totalBlocks := 2 996 for i := 0; i < totalBlocks; i++ { 997 blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1)) 998 block, err := OpenBlock(nil, blockDir, nil) 999 testutil.Ok(t, err) 1000 // Add some some fake tombstones to trigger the compaction. 1001 tomb := tombstones.NewMemTombstones() 1002 tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1}) 1003 block.tombstones = tomb 1004 1005 db.blocks = append(db.blocks, block) 1006 expectedBlockDirs = append(expectedBlockDirs, blockDir) 1007 } 1008 1009 // Initialize the mockCompactorFailing with a room for a single compaction iteration. 1010 // mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected. 1011 db.compactor = &mockCompactorFailing{ 1012 t: t, 1013 blocks: db.blocks, 1014 max: totalBlocks + 1, 1015 } 1016 1017 // The compactor should trigger a failure here. 1018 testutil.NotOk(t, db.CleanTombstones()) 1019 1020 // Now check that the CleanTombstones didn't leave any blocks behind after a failure. 1021 actualBlockDirs, err := blockDirs(db.dir) 1022 testutil.Ok(t, err) 1023 testutil.Equals(t, expectedBlockDirs, actualBlockDirs) 1024} 1025 1026// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total. 1027type mockCompactorFailing struct { 1028 t *testing.T 1029 blocks []*Block 1030 max int 1031} 1032 1033func (*mockCompactorFailing) Plan(dir string) ([]string, error) { 1034 return nil, nil 1035} 1036func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { 1037 if len(c.blocks) >= c.max { 1038 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") 1039 } 1040 1041 block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) 1042 testutil.Ok(c.t, err) 1043 testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. 1044 c.blocks = append(c.blocks, block) 1045 1046 // Now check that all expected blocks are actually persisted on disk. 1047 // This way we make sure that the we have some blocks that are supposed to be removed. 1048 var expectedBlocks []string 1049 for _, b := range c.blocks { 1050 expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String())) 1051 } 1052 actualBlockDirs, err := blockDirs(dest) 1053 testutil.Ok(c.t, err) 1054 1055 testutil.Equals(c.t, expectedBlocks, actualBlockDirs) 1056 1057 return block.Meta().ULID, nil 1058} 1059 1060func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) { 1061 return ulid.ULID{}, nil 1062 1063} 1064 1065func TestTimeRetention(t *testing.T) { 1066 db, delete := openTestDB(t, &Options{ 1067 BlockRanges: []int64{1000}, 1068 }) 1069 defer func() { 1070 testutil.Ok(t, db.Close()) 1071 delete() 1072 }() 1073 1074 blocks := []*BlockMeta{ 1075 {MinTime: 500, MaxTime: 900}, // Oldest block 1076 {MinTime: 1000, MaxTime: 1500}, 1077 {MinTime: 1500, MaxTime: 2000}, // Newest Block 1078 } 1079 1080 for _, m := range blocks { 1081 createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime)) 1082 } 1083 1084 testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. 1085 testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. 1086 1087 db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) 1088 testutil.Ok(t, db.reload()) 1089 1090 expBlocks := blocks[1:] 1091 actBlocks := db.Blocks() 1092 1093 testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") 1094 testutil.Equals(t, len(expBlocks), len(actBlocks)) 1095 testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) 1096 testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) 1097} 1098 1099func TestSizeRetention(t *testing.T) { 1100 db, delete := openTestDB(t, &Options{ 1101 BlockRanges: []int64{100}, 1102 }) 1103 defer func() { 1104 testutil.Ok(t, db.Close()) 1105 delete() 1106 }() 1107 1108 blocks := []*BlockMeta{ 1109 {MinTime: 100, MaxTime: 200}, // Oldest block 1110 {MinTime: 200, MaxTime: 300}, 1111 {MinTime: 300, MaxTime: 400}, 1112 {MinTime: 400, MaxTime: 500}, 1113 {MinTime: 500, MaxTime: 600}, // Newest Block 1114 } 1115 1116 for _, m := range blocks { 1117 createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime)) 1118 } 1119 1120 headBlocks := []*BlockMeta{ 1121 {MinTime: 700, MaxTime: 800}, 1122 } 1123 1124 // Add some data to the WAL. 1125 headApp := db.Head().Appender() 1126 for _, m := range headBlocks { 1127 series := genSeries(100, 10, m.MinTime, m.MaxTime) 1128 for _, s := range series { 1129 it := s.Iterator() 1130 for it.Next() { 1131 tim, v := it.At() 1132 _, err := headApp.Add(s.Labels(), tim, v) 1133 testutil.Ok(t, err) 1134 } 1135 testutil.Ok(t, it.Err()) 1136 } 1137 } 1138 testutil.Ok(t, headApp.Commit()) 1139 1140 // Test that registered size matches the actual disk size. 1141 testutil.Ok(t, db.reload()) // Reload the db to register the new db size. 1142 testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. 1143 blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. 1144 walSize, err := db.Head().wal.Size() 1145 testutil.Ok(t, err) 1146 // Expected size should take into account block size + WAL size 1147 expSize := blockSize + walSize 1148 actSize, err := fileutil.DirSize(db.Dir()) 1149 testutil.Ok(t, err) 1150 testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") 1151 1152 // Create a WAL checkpoint, and compare sizes. 1153 first, last, err := db.Head().wal.Segments() 1154 testutil.Ok(t, err) 1155 _, err = wal.Checkpoint(db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0) 1156 testutil.Ok(t, err) 1157 blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. 1158 walSize, err = db.Head().wal.Size() 1159 testutil.Ok(t, err) 1160 expSize = blockSize + walSize 1161 actSize, err = fileutil.DirSize(db.Dir()) 1162 testutil.Ok(t, err) 1163 testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") 1164 1165 // Decrease the max bytes limit so that a delete is triggered. 1166 // Check total size, total count and check that the oldest block was deleted. 1167 firstBlockSize := db.Blocks()[0].Size() 1168 sizeLimit := actSize - firstBlockSize 1169 db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. 1170 testutil.Ok(t, db.reload()) // Reload the db to register the new db size. 1171 1172 expBlocks := blocks[1:] 1173 actBlocks := db.Blocks() 1174 blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) 1175 walSize, err = db.Head().wal.Size() 1176 testutil.Ok(t, err) 1177 // Expected size should take into account block size + WAL size 1178 expSize = blockSize + walSize 1179 actRetentionCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) 1180 actSize, err = fileutil.DirSize(db.Dir()) 1181 testutil.Ok(t, err) 1182 1183 testutil.Equals(t, 1, actRetentionCount, "metric retention count mismatch") 1184 testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") 1185 testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) 1186 testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) 1187 testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") 1188 testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") 1189} 1190 1191func TestSizeRetentionMetric(t *testing.T) { 1192 cases := []struct { 1193 maxBytes int64 1194 expMaxBytes int64 1195 }{ 1196 {maxBytes: 1000, expMaxBytes: 1000}, 1197 {maxBytes: 0, expMaxBytes: 0}, 1198 {maxBytes: -1000, expMaxBytes: 0}, 1199 } 1200 1201 for _, c := range cases { 1202 db, delete := openTestDB(t, &Options{ 1203 BlockRanges: []int64{100}, 1204 MaxBytes: c.maxBytes, 1205 }) 1206 1207 actMaxBytes := int64(prom_testutil.ToFloat64(db.metrics.maxBytes)) 1208 testutil.Equals(t, actMaxBytes, c.expMaxBytes, "metric retention limit bytes mismatch") 1209 1210 testutil.Ok(t, db.Close()) 1211 delete() 1212 } 1213} 1214 1215func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { 1216 db, delete := openTestDB(t, nil) 1217 defer func() { 1218 testutil.Ok(t, db.Close()) 1219 delete() 1220 }() 1221 1222 labelpairs := []labels.Labels{ 1223 labels.FromStrings("a", "abcd", "b", "abcde"), 1224 labels.FromStrings("labelname", "labelvalue"), 1225 } 1226 1227 app := db.Appender() 1228 for _, lbls := range labelpairs { 1229 _, err := app.Add(lbls, 0, 1) 1230 testutil.Ok(t, err) 1231 } 1232 testutil.Ok(t, app.Commit()) 1233 1234 cases := []struct { 1235 selector labels.Selector 1236 series []labels.Labels 1237 }{{ 1238 selector: labels.Selector{ 1239 labels.MustNewMatcher(labels.MatchNotEqual, "lname", "lvalue"), 1240 }, 1241 series: labelpairs, 1242 }, { 1243 selector: labels.Selector{ 1244 labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"), 1245 labels.MustNewMatcher(labels.MatchNotEqual, "b", "abcde"), 1246 }, 1247 series: []labels.Labels{}, 1248 }, { 1249 selector: labels.Selector{ 1250 labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"), 1251 labels.MustNewMatcher(labels.MatchNotEqual, "b", "abc"), 1252 }, 1253 series: []labels.Labels{labelpairs[0]}, 1254 }, { 1255 selector: labels.Selector{ 1256 labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abd.*"), 1257 }, 1258 series: labelpairs, 1259 }, { 1260 selector: labels.Selector{ 1261 labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abc.*"), 1262 }, 1263 series: labelpairs[1:], 1264 }, { 1265 selector: labels.Selector{ 1266 labels.MustNewMatcher(labels.MatchNotRegexp, "c", "abd.*"), 1267 }, 1268 series: labelpairs, 1269 }, { 1270 selector: labels.Selector{ 1271 labels.MustNewMatcher(labels.MatchNotRegexp, "labelname", "labelvalue"), 1272 }, 1273 series: labelpairs[:1], 1274 }} 1275 1276 q, err := db.Querier(0, 10) 1277 testutil.Ok(t, err) 1278 defer func() { testutil.Ok(t, q.Close()) }() 1279 1280 for _, c := range cases { 1281 ss, err := q.Select(c.selector...) 1282 testutil.Ok(t, err) 1283 1284 lres, err := expandSeriesSet(ss) 1285 testutil.Ok(t, err) 1286 1287 testutil.Equals(t, c.series, lres) 1288 } 1289} 1290 1291func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { 1292 result := []labels.Labels{} 1293 for ss.Next() { 1294 result = append(result, ss.At().Labels()) 1295 } 1296 1297 return result, ss.Err() 1298} 1299 1300func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { 1301 // Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm 1302 // will handle that. 1303 var metas = make([]BlockMeta, 11) 1304 for i := 10; i >= 0; i-- { 1305 metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)} 1306 } 1307 1308 testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps") 1309 1310 // Add overlapping blocks. We've to establish order again since we aren't interested 1311 // in trivial overlaps caused by unorderedness. 1312 add := func(ms ...BlockMeta) []BlockMeta { 1313 repl := append(append([]BlockMeta{}, metas...), ms...) 1314 sort.Slice(repl, func(i, j int) bool { 1315 return repl[i].MinTime < repl[j].MinTime 1316 }) 1317 return repl 1318 } 1319 1320 // o1 overlaps with 10-20. 1321 o1 := BlockMeta{MinTime: 15, MaxTime: 17} 1322 testutil.Equals(t, Overlaps{ 1323 {Min: 15, Max: 17}: {metas[1], o1}, 1324 }, OverlappingBlocks(add(o1))) 1325 1326 // o2 overlaps with 20-30 and 30-40. 1327 o2 := BlockMeta{MinTime: 21, MaxTime: 31} 1328 testutil.Equals(t, Overlaps{ 1329 {Min: 21, Max: 30}: {metas[2], o2}, 1330 {Min: 30, Max: 31}: {o2, metas[3]}, 1331 }, OverlappingBlocks(add(o2))) 1332 1333 // o3a and o3b overlaps with 30-40 and each other. 1334 o3a := BlockMeta{MinTime: 33, MaxTime: 39} 1335 o3b := BlockMeta{MinTime: 34, MaxTime: 36} 1336 testutil.Equals(t, Overlaps{ 1337 {Min: 34, Max: 36}: {metas[3], o3a, o3b}, 1338 }, OverlappingBlocks(add(o3a, o3b))) 1339 1340 // o4 is 1:1 overlap with 50-60. 1341 o4 := BlockMeta{MinTime: 50, MaxTime: 60} 1342 testutil.Equals(t, Overlaps{ 1343 {Min: 50, Max: 60}: {metas[5], o4}, 1344 }, OverlappingBlocks(add(o4))) 1345 1346 // o5 overlaps with 60-70, 70-80 and 80-90. 1347 o5 := BlockMeta{MinTime: 61, MaxTime: 85} 1348 testutil.Equals(t, Overlaps{ 1349 {Min: 61, Max: 70}: {metas[6], o5}, 1350 {Min: 70, Max: 80}: {o5, metas[7]}, 1351 {Min: 80, Max: 85}: {o5, metas[8]}, 1352 }, OverlappingBlocks(add(o5))) 1353 1354 // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. 1355 o6a := BlockMeta{MinTime: 92, MaxTime: 105} 1356 o6b := BlockMeta{MinTime: 94, MaxTime: 99} 1357 testutil.Equals(t, Overlaps{ 1358 {Min: 94, Max: 99}: {metas[9], o6a, o6b}, 1359 {Min: 100, Max: 105}: {o6a, metas[10]}, 1360 }, OverlappingBlocks(add(o6a, o6b))) 1361 1362 // All together. 1363 testutil.Equals(t, Overlaps{ 1364 {Min: 15, Max: 17}: {metas[1], o1}, 1365 {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]}, 1366 {Min: 34, Max: 36}: {metas[3], o3a, o3b}, 1367 {Min: 50, Max: 60}: {metas[5], o4}, 1368 {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, 1369 {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, 1370 }, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b))) 1371 1372 // Additional case. 1373 var nc1 []BlockMeta 1374 nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5}) 1375 nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) 1376 nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) 1377 nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) 1378 nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) 1379 nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 6}) 1380 nc1 = append(nc1, BlockMeta{MinTime: 3, MaxTime: 5}) 1381 nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7}) 1382 nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10}) 1383 nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9}) 1384 testutil.Equals(t, Overlaps{ 1385 {Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6 1386 {Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]}, // 1-5, 2-6, 3-5 1387 {Min: 5, Max: 6}: {nc1[5], nc1[7]}, // 2-6, 5-7 1388 {Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9 1389 }, OverlappingBlocks(nc1)) 1390} 1391 1392// Regression test for https://github.com/prometheus/prometheus/tsdb/issues/347 1393func TestChunkAtBlockBoundary(t *testing.T) { 1394 db, delete := openTestDB(t, nil) 1395 defer func() { 1396 testutil.Ok(t, db.Close()) 1397 delete() 1398 }() 1399 1400 app := db.Appender() 1401 1402 blockRange := DefaultOptions.BlockRanges[0] 1403 label := labels.FromStrings("foo", "bar") 1404 1405 for i := int64(0); i < 3; i++ { 1406 _, err := app.Add(label, i*blockRange, 0) 1407 testutil.Ok(t, err) 1408 _, err = app.Add(label, i*blockRange+1000, 0) 1409 testutil.Ok(t, err) 1410 } 1411 1412 err := app.Commit() 1413 testutil.Ok(t, err) 1414 1415 err = db.compact() 1416 testutil.Ok(t, err) 1417 1418 for _, block := range db.Blocks() { 1419 r, err := block.Index() 1420 testutil.Ok(t, err) 1421 defer r.Close() 1422 1423 meta := block.Meta() 1424 1425 k, v := index.AllPostingsKey() 1426 p, err := r.Postings(k, v) 1427 testutil.Ok(t, err) 1428 1429 var ( 1430 lset labels.Labels 1431 chks []chunks.Meta 1432 ) 1433 1434 chunkCount := 0 1435 1436 for p.Next() { 1437 err = r.Series(p.At(), &lset, &chks) 1438 testutil.Ok(t, err) 1439 for _, c := range chks { 1440 testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime, 1441 "chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]", 1442 meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime) 1443 chunkCount++ 1444 } 1445 } 1446 testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount) 1447 } 1448} 1449 1450func TestQuerierWithBoundaryChunks(t *testing.T) { 1451 db, delete := openTestDB(t, nil) 1452 defer func() { 1453 testutil.Ok(t, db.Close()) 1454 delete() 1455 }() 1456 1457 app := db.Appender() 1458 1459 blockRange := DefaultOptions.BlockRanges[0] 1460 label := labels.FromStrings("foo", "bar") 1461 1462 for i := int64(0); i < 5; i++ { 1463 _, err := app.Add(label, i*blockRange, 0) 1464 testutil.Ok(t, err) 1465 } 1466 1467 err := app.Commit() 1468 testutil.Ok(t, err) 1469 1470 err = db.compact() 1471 testutil.Ok(t, err) 1472 1473 testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB") 1474 1475 q, err := db.Querier(blockRange, 2*blockRange) 1476 testutil.Ok(t, err) 1477 defer q.Close() 1478 1479 // The requested interval covers 2 blocks, so the querier should contain 2 blocks. 1480 count := len(q.(*querier).blocks) 1481 testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) 1482} 1483 1484// TestInitializeHeadTimestamp ensures that the h.minTime is set properly. 1485// - no blocks no WAL: set to the time of the first appended sample 1486// - no blocks with WAL: set to the smallest sample from the WAL 1487// - with blocks no WAL: set to the last block maxT 1488// - with blocks with WAL: same as above 1489func TestInitializeHeadTimestamp(t *testing.T) { 1490 t.Run("clean", func(t *testing.T) { 1491 dir, err := ioutil.TempDir("", "test_head_init") 1492 testutil.Ok(t, err) 1493 defer func() { 1494 testutil.Ok(t, os.RemoveAll(dir)) 1495 }() 1496 1497 db, err := Open(dir, nil, nil, nil) 1498 testutil.Ok(t, err) 1499 defer db.Close() 1500 1501 // Should be set to init values if no WAL or blocks exist so far. 1502 testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime()) 1503 testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime()) 1504 1505 // First added sample initializes the writable range. 1506 app := db.Appender() 1507 _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1) 1508 testutil.Ok(t, err) 1509 1510 testutil.Equals(t, int64(1000), db.head.MinTime()) 1511 testutil.Equals(t, int64(1000), db.head.MaxTime()) 1512 }) 1513 t.Run("wal-only", func(t *testing.T) { 1514 dir, err := ioutil.TempDir("", "test_head_init") 1515 testutil.Ok(t, err) 1516 defer func() { 1517 testutil.Ok(t, os.RemoveAll(dir)) 1518 }() 1519 1520 testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) 1521 w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) 1522 testutil.Ok(t, err) 1523 1524 var enc record.Encoder 1525 err = w.Log( 1526 enc.Series([]record.RefSeries{ 1527 {Ref: 123, Labels: labels.FromStrings("a", "1")}, 1528 {Ref: 124, Labels: labels.FromStrings("a", "2")}, 1529 }, nil), 1530 enc.Samples([]record.RefSample{ 1531 {Ref: 123, T: 5000, V: 1}, 1532 {Ref: 124, T: 15000, V: 1}, 1533 }, nil), 1534 ) 1535 testutil.Ok(t, err) 1536 testutil.Ok(t, w.Close()) 1537 1538 db, err := Open(dir, nil, nil, nil) 1539 testutil.Ok(t, err) 1540 defer db.Close() 1541 1542 testutil.Equals(t, int64(5000), db.head.MinTime()) 1543 testutil.Equals(t, int64(15000), db.head.MaxTime()) 1544 }) 1545 t.Run("existing-block", func(t *testing.T) { 1546 dir, err := ioutil.TempDir("", "test_head_init") 1547 testutil.Ok(t, err) 1548 defer func() { 1549 testutil.Ok(t, os.RemoveAll(dir)) 1550 }() 1551 1552 createBlock(t, dir, genSeries(1, 1, 1000, 2000)) 1553 1554 db, err := Open(dir, nil, nil, nil) 1555 testutil.Ok(t, err) 1556 defer db.Close() 1557 1558 testutil.Equals(t, int64(2000), db.head.MinTime()) 1559 testutil.Equals(t, int64(2000), db.head.MaxTime()) 1560 }) 1561 t.Run("existing-block-and-wal", func(t *testing.T) { 1562 dir, err := ioutil.TempDir("", "test_head_init") 1563 testutil.Ok(t, err) 1564 defer func() { 1565 testutil.Ok(t, os.RemoveAll(dir)) 1566 }() 1567 1568 createBlock(t, dir, genSeries(1, 1, 1000, 6000)) 1569 1570 testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777)) 1571 w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) 1572 testutil.Ok(t, err) 1573 1574 var enc record.Encoder 1575 err = w.Log( 1576 enc.Series([]record.RefSeries{ 1577 {Ref: 123, Labels: labels.FromStrings("a", "1")}, 1578 {Ref: 124, Labels: labels.FromStrings("a", "2")}, 1579 }, nil), 1580 enc.Samples([]record.RefSample{ 1581 {Ref: 123, T: 5000, V: 1}, 1582 {Ref: 124, T: 15000, V: 1}, 1583 }, nil), 1584 ) 1585 testutil.Ok(t, err) 1586 testutil.Ok(t, w.Close()) 1587 1588 r := prometheus.NewRegistry() 1589 1590 db, err := Open(dir, nil, r, nil) 1591 testutil.Ok(t, err) 1592 defer db.Close() 1593 1594 testutil.Equals(t, int64(6000), db.head.MinTime()) 1595 testutil.Equals(t, int64(15000), db.head.MaxTime()) 1596 // Check that old series has been GCed. 1597 testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series)) 1598 }) 1599} 1600 1601func TestNoEmptyBlocks(t *testing.T) { 1602 db, delete := openTestDB(t, &Options{ 1603 BlockRanges: []int64{100}, 1604 }) 1605 defer func() { 1606 testutil.Ok(t, db.Close()) 1607 delete() 1608 }() 1609 db.DisableCompactions() 1610 1611 rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 1612 defaultLabel := labels.FromStrings("foo", "bar") 1613 defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*") 1614 1615 t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { 1616 testutil.Ok(t, db.compact()) 1617 actBlocks, err := blockDirs(db.Dir()) 1618 testutil.Ok(t, err) 1619 testutil.Equals(t, len(db.Blocks()), len(actBlocks)) 1620 testutil.Equals(t, 0, len(actBlocks)) 1621 testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") 1622 }) 1623 1624 t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { 1625 app := db.Appender() 1626 _, err := app.Add(defaultLabel, 1, 0) 1627 testutil.Ok(t, err) 1628 _, err = app.Add(defaultLabel, 2, 0) 1629 testutil.Ok(t, err) 1630 _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) 1631 testutil.Ok(t, err) 1632 testutil.Ok(t, app.Commit()) 1633 testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) 1634 testutil.Ok(t, db.compact()) 1635 testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") 1636 1637 actBlocks, err := blockDirs(db.Dir()) 1638 testutil.Ok(t, err) 1639 testutil.Equals(t, len(db.Blocks()), len(actBlocks)) 1640 testutil.Equals(t, 0, len(actBlocks)) 1641 1642 app = db.Appender() 1643 _, err = app.Add(defaultLabel, 1, 0) 1644 testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") 1645 1646 // Adding new blocks. 1647 currentTime := db.Head().MaxTime() 1648 _, err = app.Add(defaultLabel, currentTime, 0) 1649 testutil.Ok(t, err) 1650 _, err = app.Add(defaultLabel, currentTime+1, 0) 1651 testutil.Ok(t, err) 1652 _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) 1653 testutil.Ok(t, err) 1654 testutil.Ok(t, app.Commit()) 1655 1656 testutil.Ok(t, db.compact()) 1657 testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") 1658 actBlocks, err = blockDirs(db.Dir()) 1659 testutil.Ok(t, err) 1660 testutil.Equals(t, len(db.Blocks()), len(actBlocks)) 1661 testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") 1662 }) 1663 1664 t.Run(`When no new block is created from head, and there are some blocks on disk 1665 compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { 1666 oldBlocks := db.Blocks() 1667 app := db.Appender() 1668 currentTime := db.Head().MaxTime() 1669 _, err := app.Add(defaultLabel, currentTime, 0) 1670 testutil.Ok(t, err) 1671 _, err = app.Add(defaultLabel, currentTime+1, 0) 1672 testutil.Ok(t, err) 1673 _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) 1674 testutil.Ok(t, err) 1675 testutil.Ok(t, app.Commit()) 1676 testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) 1677 testutil.Ok(t, db.compact()) 1678 testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") 1679 testutil.Equals(t, oldBlocks, db.Blocks()) 1680 }) 1681 1682 t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { 1683 currentTime := db.Head().MaxTime() 1684 blocks := []*BlockMeta{ 1685 {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, 1686 {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, 1687 } 1688 for _, m := range blocks { 1689 createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime)) 1690 } 1691 1692 oldBlocks := db.Blocks() 1693 testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. 1694 testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. 1695 testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) 1696 testutil.Ok(t, db.compact()) 1697 testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") 1698 1699 actBlocks, err := blockDirs(db.Dir()) 1700 testutil.Ok(t, err) 1701 testutil.Equals(t, len(db.Blocks()), len(actBlocks)) 1702 testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") 1703 }) 1704} 1705 1706func TestDB_LabelNames(t *testing.T) { 1707 tests := []struct { 1708 // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk -> 1709 // -> Add 'sampleLabels2' -> Test Head+Disk 1710 1711 sampleLabels1 [][2]string // For checking head and disk separately. 1712 // To test Head+Disk, sampleLabels2 should have 1713 // at least 1 unique label name which is not in sampleLabels1. 1714 sampleLabels2 [][2]string // // For checking head and disk together. 1715 exp1 []string // after adding sampleLabels1. 1716 exp2 []string // after adding sampleLabels1 and sampleLabels2. 1717 }{ 1718 { 1719 sampleLabels1: [][2]string{ 1720 {"name1", "1"}, 1721 {"name3", "3"}, 1722 {"name2", "2"}, 1723 }, 1724 sampleLabels2: [][2]string{ 1725 {"name4", "4"}, 1726 {"name1", "1"}, 1727 }, 1728 exp1: []string{"name1", "name2", "name3"}, 1729 exp2: []string{"name1", "name2", "name3", "name4"}, 1730 }, 1731 { 1732 sampleLabels1: [][2]string{ 1733 {"name2", "2"}, 1734 {"name1", "1"}, 1735 {"name2", "2"}, 1736 }, 1737 sampleLabels2: [][2]string{ 1738 {"name6", "6"}, 1739 {"name0", "0"}, 1740 }, 1741 exp1: []string{"name1", "name2"}, 1742 exp2: []string{"name0", "name1", "name2", "name6"}, 1743 }, 1744 } 1745 1746 blockRange := DefaultOptions.BlockRanges[0] 1747 // Appends samples into the database. 1748 appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) { 1749 t.Helper() 1750 app := db.Appender() 1751 for i := mint; i <= maxt; i++ { 1752 for _, tuple := range sampleLabels { 1753 label := labels.FromStrings(tuple[0], tuple[1]) 1754 _, err := app.Add(label, i*blockRange, 0) 1755 testutil.Ok(t, err) 1756 } 1757 } 1758 err := app.Commit() 1759 testutil.Ok(t, err) 1760 } 1761 for _, tst := range tests { 1762 db, delete := openTestDB(t, nil) 1763 defer func() { 1764 testutil.Ok(t, db.Close()) 1765 delete() 1766 }() 1767 1768 appendSamples(db, 0, 4, tst.sampleLabels1) 1769 1770 // Testing head. 1771 headIndexr, err := db.head.Index() 1772 testutil.Ok(t, err) 1773 labelNames, err := headIndexr.LabelNames() 1774 testutil.Ok(t, err) 1775 testutil.Equals(t, tst.exp1, labelNames) 1776 testutil.Ok(t, headIndexr.Close()) 1777 1778 // Testing disk. 1779 err = db.compact() 1780 testutil.Ok(t, err) 1781 // All blocks have same label names, hence check them individually. 1782 // No need to aggregate and check. 1783 for _, b := range db.Blocks() { 1784 blockIndexr, err := b.Index() 1785 testutil.Ok(t, err) 1786 labelNames, err = blockIndexr.LabelNames() 1787 testutil.Ok(t, err) 1788 testutil.Equals(t, tst.exp1, labelNames) 1789 testutil.Ok(t, blockIndexr.Close()) 1790 } 1791 1792 // Adding more samples to head with new label names 1793 // so that we can test (head+disk).LabelNames() (the union). 1794 appendSamples(db, 5, 9, tst.sampleLabels2) 1795 1796 // Testing DB (union). 1797 q, err := db.Querier(math.MinInt64, math.MaxInt64) 1798 testutil.Ok(t, err) 1799 labelNames, err = q.LabelNames() 1800 testutil.Ok(t, err) 1801 testutil.Ok(t, q.Close()) 1802 testutil.Equals(t, tst.exp2, labelNames) 1803 } 1804} 1805 1806func TestCorrectNumTombstones(t *testing.T) { 1807 db, delete := openTestDB(t, nil) 1808 defer func() { 1809 testutil.Ok(t, db.Close()) 1810 delete() 1811 }() 1812 1813 blockRange := DefaultOptions.BlockRanges[0] 1814 defaultLabel := labels.FromStrings("foo", "bar") 1815 defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value) 1816 1817 app := db.Appender() 1818 for i := int64(0); i < 3; i++ { 1819 for j := int64(0); j < 15; j++ { 1820 _, err := app.Add(defaultLabel, i*blockRange+j, 0) 1821 testutil.Ok(t, err) 1822 } 1823 } 1824 testutil.Ok(t, app.Commit()) 1825 1826 err := db.compact() 1827 testutil.Ok(t, err) 1828 testutil.Equals(t, 1, len(db.blocks)) 1829 1830 testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) 1831 testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) 1832 1833 // {0, 1} and {2, 3} are merged to form 1 tombstone. 1834 testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) 1835 testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) 1836 1837 testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) 1838 testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) 1839 1840 testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) 1841 testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) 1842} 1843 1844func TestVerticalCompaction(t *testing.T) { 1845 cases := []struct { 1846 blockSeries [][]Series 1847 expSeries map[string][]tsdbutil.Sample 1848 expBlockNum int 1849 expOverlappingBlocks int 1850 }{ 1851 // Case 0 1852 // |--------------| 1853 // |----------------| 1854 { 1855 blockSeries: [][]Series{ 1856 { 1857 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1858 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1859 sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, 1860 }), 1861 }, 1862 { 1863 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1864 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1865 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, 1866 sample{12, 99}, sample{13, 99}, sample{14, 99}, 1867 }), 1868 }, 1869 }, 1870 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 1871 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 1872 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1873 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, 1874 sample{12, 99}, sample{13, 99}, sample{14, 99}, 1875 }}, 1876 expBlockNum: 1, 1877 expOverlappingBlocks: 1, 1878 }, 1879 // Case 1 1880 // |-------------------------------| 1881 // |----------------| 1882 { 1883 blockSeries: [][]Series{ 1884 { 1885 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1886 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1887 sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, 1888 sample{11, 0}, sample{13, 0}, sample{17, 0}, 1889 }), 1890 }, 1891 { 1892 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1893 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1894 sample{8, 99}, sample{9, 99}, sample{10, 99}, 1895 }), 1896 }, 1897 }, 1898 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 1899 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 1900 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1901 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0}, 1902 sample{13, 0}, sample{17, 0}, 1903 }}, 1904 expBlockNum: 1, 1905 expOverlappingBlocks: 1, 1906 }, 1907 // Case 2 1908 // |-------------------------------| 1909 // |------------| 1910 // |--------------------| 1911 { 1912 blockSeries: [][]Series{ 1913 { 1914 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1915 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1916 sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, 1917 sample{11, 0}, sample{13, 0}, sample{17, 0}, 1918 }), 1919 }, 1920 { 1921 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1922 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1923 sample{8, 99}, sample{9, 99}, 1924 }), 1925 }, 1926 { 1927 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1928 sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, 1929 sample{21, 59}, sample{22, 59}, 1930 }), 1931 }, 1932 }, 1933 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 1934 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 1935 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 1936 sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0}, 1937 sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, 1938 sample{21, 59}, sample{22, 59}, 1939 }}, 1940 expBlockNum: 1, 1941 expOverlappingBlocks: 1, 1942 }, 1943 // Case 3 1944 // |-------------------| 1945 // |--------------------| 1946 // |----------------| 1947 { 1948 blockSeries: [][]Series{ 1949 { 1950 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1951 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1952 sample{5, 0}, sample{8, 0}, sample{9, 0}, 1953 }), 1954 }, 1955 { 1956 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1957 sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, 1958 sample{21, 59}, sample{22, 59}, 1959 }), 1960 }, 1961 { 1962 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1963 sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99}, 1964 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, 1965 sample{16, 99}, sample{17, 99}, 1966 }), 1967 }, 1968 }, 1969 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 1970 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1971 sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99}, 1972 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59}, 1973 sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59}, 1974 sample{21, 59}, sample{22, 59}, 1975 }}, 1976 expBlockNum: 1, 1977 expOverlappingBlocks: 1, 1978 }, 1979 // Case 4 1980 // |-------------------------------------| 1981 // |------------| 1982 // |-------------------------| 1983 { 1984 blockSeries: [][]Series{ 1985 { 1986 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1987 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 1988 sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, 1989 sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, 1990 sample{20, 0}, sample{22, 0}, 1991 }), 1992 }, 1993 { 1994 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 1995 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 1996 sample{11, 59}, 1997 }), 1998 }, 1999 { 2000 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2001 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, 2002 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, 2003 sample{16, 99}, sample{17, 99}, 2004 }), 2005 }, 2006 }, 2007 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 2008 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 2009 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, 2010 sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, 2011 sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, 2012 sample{20, 0}, sample{22, 0}, 2013 }}, 2014 expBlockNum: 1, 2015 expOverlappingBlocks: 1, 2016 }, 2017 // Case 5: series are merged properly when there are multiple series. 2018 // |-------------------------------------| 2019 // |------------| 2020 // |-------------------------| 2021 { 2022 blockSeries: [][]Series{ 2023 { 2024 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2025 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 2026 sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, 2027 sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, 2028 sample{20, 0}, sample{22, 0}, 2029 }), 2030 newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{ 2031 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 2032 sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, 2033 sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, 2034 sample{20, 0}, sample{22, 0}, 2035 }), 2036 newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ 2037 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 2038 sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, 2039 sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, 2040 sample{20, 0}, sample{22, 0}, 2041 }), 2042 }, 2043 { 2044 newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{ 2045 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 2046 sample{11, 59}, 2047 }), 2048 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2049 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 2050 sample{11, 59}, 2051 }), 2052 newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{ 2053 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 2054 sample{11, 59}, 2055 }), 2056 newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ 2057 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 2058 sample{11, 59}, 2059 }), 2060 }, 2061 { 2062 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2063 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, 2064 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, 2065 sample{16, 99}, sample{17, 99}, 2066 }), 2067 newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{ 2068 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, 2069 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, 2070 sample{16, 99}, sample{17, 99}, 2071 }), 2072 newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ 2073 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, 2074 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, 2075 sample{16, 99}, sample{17, 99}, 2076 }), 2077 }, 2078 }, 2079 expSeries: map[string][]tsdbutil.Sample{ 2080 `{__name__="a"}`: { 2081 sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, 2082 sample{11, 59}, 2083 }, 2084 `{a="b"}`: { 2085 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 2086 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, 2087 sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, 2088 sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, 2089 sample{20, 0}, sample{22, 0}, 2090 }, 2091 `{aa="bb"}`: { 2092 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59}, 2093 sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, 2094 sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, 2095 }, 2096 `{b="c"}`: { 2097 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 2098 sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, 2099 sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, 2100 sample{20, 0}, sample{22, 0}, 2101 }, 2102 `{c="d"}`: { 2103 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 2104 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, 2105 sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, 2106 sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, 2107 sample{20, 0}, sample{22, 0}, 2108 }, 2109 }, 2110 expBlockNum: 1, 2111 expOverlappingBlocks: 1, 2112 }, 2113 // Case 6 2114 // |--------------| 2115 // |----------------| 2116 // |--------------| 2117 // |----------------| 2118 { 2119 blockSeries: [][]Series{ 2120 { 2121 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2122 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, 2123 sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, 2124 }), 2125 }, 2126 { 2127 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2128 sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 2129 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, 2130 sample{12, 99}, sample{13, 99}, sample{14, 99}, 2131 }), 2132 }, 2133 { 2134 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2135 sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{24, 0}, 2136 sample{25, 0}, sample{27, 0}, sample{28, 0}, sample{29, 0}, 2137 }), 2138 }, 2139 { 2140 newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ 2141 sample{23, 99}, sample{25, 99}, sample{26, 99}, sample{27, 99}, 2142 sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99}, 2143 }), 2144 }, 2145 }, 2146 expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { 2147 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, 2148 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, 2149 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, 2150 sample{12, 99}, sample{13, 99}, sample{14, 99}, 2151 sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{23, 99}, 2152 sample{24, 0}, sample{25, 99}, sample{26, 99}, sample{27, 99}, 2153 sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99}, 2154 }}, 2155 expBlockNum: 2, 2156 expOverlappingBlocks: 2, 2157 }, 2158 } 2159 2160 defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*") 2161 for _, c := range cases { 2162 if ok := t.Run("", func(t *testing.T) { 2163 2164 tmpdir, err := ioutil.TempDir("", "data") 2165 testutil.Ok(t, err) 2166 defer func() { 2167 testutil.Ok(t, os.RemoveAll(tmpdir)) 2168 }() 2169 2170 for _, series := range c.blockSeries { 2171 createBlock(t, tmpdir, series) 2172 } 2173 opts := *DefaultOptions 2174 opts.AllowOverlappingBlocks = true 2175 db, err := Open(tmpdir, nil, nil, &opts) 2176 testutil.Ok(t, err) 2177 defer func() { 2178 testutil.Ok(t, db.Close()) 2179 }() 2180 db.DisableCompactions() 2181 testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].") 2182 2183 // Vertical Query Merging test. 2184 querier, err := db.Querier(0, 100) 2185 testutil.Ok(t, err) 2186 actSeries := query(t, querier, defaultMatcher) 2187 testutil.Equals(t, c.expSeries, actSeries) 2188 2189 // Vertical compaction. 2190 lc := db.compactor.(*LeveledCompactor) 2191 testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here") 2192 err = db.compact() 2193 testutil.Ok(t, err) 2194 testutil.Equals(t, c.expBlockNum, len(db.Blocks()), "Wrong number of blocks [after compact]") 2195 2196 testutil.Equals(t, c.expOverlappingBlocks, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch") 2197 2198 // Query test after merging the overlapping blocks. 2199 querier, err = db.Querier(0, 100) 2200 testutil.Ok(t, err) 2201 actSeries = query(t, querier, defaultMatcher) 2202 testutil.Equals(t, c.expSeries, actSeries) 2203 }); !ok { 2204 return 2205 } 2206 } 2207} 2208 2209// TestBlockRanges checks the following use cases: 2210// - No samples can be added with timestamps lower than the last block maxt. 2211// - The compactor doesn't create overlapping blocks 2212// even when the last blocks is not within the default boundaries. 2213// - Lower boundary is based on the smallest sample in the head and 2214// upper boundary is rounded to the configured block range. 2215// 2216// This ensures that a snapshot that includes the head and creates a block with a custom time range 2217// will not overlap with the first block created by the next compaction. 2218func TestBlockRanges(t *testing.T) { 2219 logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) 2220 2221 dir, err := ioutil.TempDir("", "test_storage") 2222 if err != nil { 2223 t.Fatalf("Opening test dir failed: %s", err) 2224 } 2225 2226 rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 2227 2228 // Test that the compactor doesn't create overlapping blocks 2229 // when a non standard block already exists. 2230 firstBlockMaxT := int64(3) 2231 createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) 2232 db, err := Open(dir, logger, nil, DefaultOptions) 2233 if err != nil { 2234 t.Fatalf("Opening test storage failed: %s", err) 2235 } 2236 defer func() { 2237 os.RemoveAll(dir) 2238 }() 2239 app := db.Appender() 2240 lbl := labels.Labels{{Name: "a", Value: "b"}} 2241 _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64()) 2242 if err == nil { 2243 t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible") 2244 } 2245 _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64()) 2246 testutil.Ok(t, err) 2247 _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64()) 2248 testutil.Ok(t, err) 2249 secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction 2250 _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction 2251 2252 testutil.Ok(t, err) 2253 testutil.Ok(t, app.Commit()) 2254 for x := 0; x < 100; x++ { 2255 if len(db.Blocks()) == 2 { 2256 break 2257 } 2258 time.Sleep(100 * time.Millisecond) 2259 } 2260 testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout") 2261 2262 if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime { 2263 t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta()) 2264 } 2265 2266 // Test that wal records are skipped when an existing block covers the same time ranges 2267 // and compaction doesn't create an overlapping block. 2268 db.DisableCompactions() 2269 _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64()) 2270 testutil.Ok(t, err) 2271 _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64()) 2272 testutil.Ok(t, err) 2273 _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64()) 2274 testutil.Ok(t, err) 2275 _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64()) 2276 testutil.Ok(t, err) 2277 testutil.Ok(t, app.Commit()) 2278 testutil.Ok(t, db.Close()) 2279 2280 thirdBlockMaxt := secondBlockMaxt + 2 2281 createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt)) 2282 2283 db, err = Open(dir, logger, nil, DefaultOptions) 2284 if err != nil { 2285 t.Fatalf("Opening test storage failed: %s", err) 2286 } 2287 defer db.Close() 2288 testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks") 2289 testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block") 2290 2291 app = db.Appender() 2292 _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction 2293 testutil.Ok(t, err) 2294 testutil.Ok(t, app.Commit()) 2295 for x := 0; x < 100; x++ { 2296 if len(db.Blocks()) == 4 { 2297 break 2298 } 2299 time.Sleep(100 * time.Millisecond) 2300 } 2301 2302 testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout") 2303 2304 if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime { 2305 t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) 2306 } 2307} 2308 2309// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. 2310// It also checks that the API calls return equivalent results as a normal db.Open() mode. 2311func TestDBReadOnly(t *testing.T) { 2312 var ( 2313 dbDir string 2314 logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) 2315 expBlocks []*Block 2316 expSeries map[string][]tsdbutil.Sample 2317 expSeriesCount int 2318 expDBHash []byte 2319 matchAll = labels.MustNewMatcher(labels.MatchEqual, "", "") 2320 err error 2321 ) 2322 2323 // Bootstrap the db. 2324 { 2325 dbDir, err = ioutil.TempDir("", "test") 2326 testutil.Ok(t, err) 2327 2328 defer func() { 2329 testutil.Ok(t, os.RemoveAll(dbDir)) 2330 }() 2331 2332 dbBlocks := []*BlockMeta{ 2333 {MinTime: 10, MaxTime: 11}, 2334 {MinTime: 11, MaxTime: 12}, 2335 {MinTime: 12, MaxTime: 13}, 2336 } 2337 2338 for _, m := range dbBlocks { 2339 createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime)) 2340 } 2341 expSeriesCount++ 2342 } 2343 2344 // Open a normal db to use for a comparison. 2345 { 2346 dbWritable, err := Open(dbDir, logger, nil, nil) 2347 testutil.Ok(t, err) 2348 dbWritable.DisableCompactions() 2349 2350 dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir()) 2351 testutil.Ok(t, err) 2352 app := dbWritable.Appender() 2353 _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) 2354 testutil.Ok(t, err) 2355 testutil.Ok(t, app.Commit()) 2356 expSeriesCount++ 2357 2358 expBlocks = dbWritable.Blocks() 2359 expDbSize, err := fileutil.DirSize(dbWritable.Dir()) 2360 testutil.Ok(t, err) 2361 testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") 2362 2363 q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) 2364 testutil.Ok(t, err) 2365 expSeries = query(t, q, matchAll) 2366 2367 testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows. 2368 expDBHash = testutil.DirHash(t, dbWritable.Dir()) 2369 } 2370 2371 // Open a read only db and ensure that the API returns the same result as the normal DB. 2372 { 2373 dbReadOnly, err := OpenDBReadOnly(dbDir, logger) 2374 testutil.Ok(t, err) 2375 defer func() { 2376 testutil.Ok(t, dbReadOnly.Close()) 2377 }() 2378 blocks, err := dbReadOnly.Blocks() 2379 testutil.Ok(t, err) 2380 testutil.Equals(t, len(expBlocks), len(blocks)) 2381 2382 for i, expBlock := range expBlocks { 2383 testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch") 2384 } 2385 2386 q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) 2387 testutil.Ok(t, err) 2388 readOnlySeries := query(t, q, matchAll) 2389 readOnlyDBHash := testutil.DirHash(t, dbDir) 2390 2391 testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch") 2392 testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") 2393 testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same") 2394 } 2395} 2396 2397// TestDBReadOnlyClosing ensures that after closing the db 2398// all api methods return an ErrClosed. 2399func TestDBReadOnlyClosing(t *testing.T) { 2400 dbDir, err := ioutil.TempDir("", "test") 2401 testutil.Ok(t, err) 2402 2403 defer func() { 2404 testutil.Ok(t, os.RemoveAll(dbDir)) 2405 }() 2406 db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))) 2407 testutil.Ok(t, err) 2408 testutil.Ok(t, db.Close()) 2409 testutil.Equals(t, db.Close(), ErrClosed) 2410 _, err = db.Blocks() 2411 testutil.Equals(t, err, ErrClosed) 2412 _, err = db.Querier(0, 1) 2413 testutil.Equals(t, err, ErrClosed) 2414} 2415 2416func TestDBReadOnly_FlushWAL(t *testing.T) { 2417 var ( 2418 dbDir string 2419 logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) 2420 err error 2421 maxt int 2422 ) 2423 2424 // Bootstrap the db. 2425 { 2426 dbDir, err = ioutil.TempDir("", "test") 2427 testutil.Ok(t, err) 2428 2429 defer func() { 2430 testutil.Ok(t, os.RemoveAll(dbDir)) 2431 }() 2432 2433 // Append data to the WAL. 2434 db, err := Open(dbDir, logger, nil, nil) 2435 testutil.Ok(t, err) 2436 db.DisableCompactions() 2437 app := db.Appender() 2438 maxt = 1000 2439 for i := 0; i < maxt; i++ { 2440 _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) 2441 testutil.Ok(t, err) 2442 } 2443 testutil.Ok(t, app.Commit()) 2444 defer func() { testutil.Ok(t, db.Close()) }() 2445 } 2446 2447 // Flush WAL. 2448 db, err := OpenDBReadOnly(dbDir, logger) 2449 testutil.Ok(t, err) 2450 2451 flush, err := ioutil.TempDir("", "flush") 2452 testutil.Ok(t, err) 2453 2454 defer func() { 2455 testutil.Ok(t, os.RemoveAll(flush)) 2456 }() 2457 testutil.Ok(t, db.FlushWAL(flush)) 2458 testutil.Ok(t, db.Close()) 2459 2460 // Reopen the DB from the flushed WAL block. 2461 db, err = OpenDBReadOnly(flush, logger) 2462 testutil.Ok(t, err) 2463 defer func() { testutil.Ok(t, db.Close()) }() 2464 blocks, err := db.Blocks() 2465 testutil.Ok(t, err) 2466 testutil.Equals(t, len(blocks), 1) 2467 2468 querier, err := db.Querier(0, int64(maxt)-1) 2469 testutil.Ok(t, err) 2470 defer func() { testutil.Ok(t, querier.Close()) }() 2471 2472 // Sum the values. 2473 seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) 2474 testutil.Ok(t, err) 2475 2476 sum := 0.0 2477 for seriesSet.Next() { 2478 series := seriesSet.At().Iterator() 2479 for series.Next() { 2480 _, v := series.At() 2481 sum += v 2482 } 2483 testutil.Ok(t, series.Err()) 2484 } 2485 testutil.Ok(t, seriesSet.Err()) 2486 testutil.Equals(t, 1000.0, sum) 2487} 2488 2489// TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and 2490// that the resulted segments includes the expected chunks data. 2491func TestChunkWriter_ReadAfterWrite(t *testing.T) { 2492 chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}) 2493 chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}) 2494 chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}) 2495 chk4 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}) 2496 chk5 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}) 2497 chunkSize := len(chk1.Chunk.Bytes()) + chunks.MaxChunkLengthFieldSize + chunks.ChunkEncodingSize + crc32.Size 2498 2499 tests := []struct { 2500 chks [][]chunks.Meta 2501 segmentSize, 2502 expSegmentsCount int 2503 expSegmentSizes []int 2504 }{ 2505 // 0:Last chunk ends at the segment boundary so 2506 // all chunks should fit in a single segment. 2507 { 2508 chks: [][]chunks.Meta{ 2509 []chunks.Meta{ 2510 chk1, 2511 chk2, 2512 chk3, 2513 }, 2514 }, 2515 segmentSize: 3 * chunkSize, 2516 expSegmentSizes: []int{3 * chunkSize}, 2517 expSegmentsCount: 1, 2518 }, 2519 // 1:Two chunks can fit in a single segment so the last one should result in a new segment. 2520 { 2521 chks: [][]chunks.Meta{ 2522 []chunks.Meta{ 2523 chk1, 2524 chk2, 2525 chk3, 2526 chk4, 2527 chk5, 2528 }, 2529 }, 2530 segmentSize: 2 * chunkSize, 2531 expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize, chunkSize}, 2532 expSegmentsCount: 3, 2533 }, 2534 // 2:When the segment size is smaller than the size of 2 chunks 2535 // the last segment should still create a new segment. 2536 { 2537 chks: [][]chunks.Meta{ 2538 []chunks.Meta{ 2539 chk1, 2540 chk2, 2541 chk3, 2542 }, 2543 }, 2544 segmentSize: 2*chunkSize - 1, 2545 expSegmentSizes: []int{chunkSize, chunkSize, chunkSize}, 2546 expSegmentsCount: 3, 2547 }, 2548 // 3:When the segment is smaller than a single chunk 2549 // it should still be written by ignoring the max segment size. 2550 { 2551 chks: [][]chunks.Meta{ 2552 []chunks.Meta{ 2553 chk1, 2554 }, 2555 }, 2556 segmentSize: chunkSize - 1, 2557 expSegmentSizes: []int{chunkSize}, 2558 expSegmentsCount: 1, 2559 }, 2560 // 4:All chunks are bigger than the max segment size, but 2561 // these should still be written even when this will result in bigger segment than the set size. 2562 // Each segment will hold a single chunk. 2563 { 2564 chks: [][]chunks.Meta{ 2565 []chunks.Meta{ 2566 chk1, 2567 chk2, 2568 chk3, 2569 }, 2570 }, 2571 segmentSize: 1, 2572 expSegmentSizes: []int{chunkSize, chunkSize, chunkSize}, 2573 expSegmentsCount: 3, 2574 }, 2575 // 5:Adding multiple batches of chunks. 2576 { 2577 chks: [][]chunks.Meta{ 2578 []chunks.Meta{ 2579 chk1, 2580 chk2, 2581 chk3, 2582 }, 2583 []chunks.Meta{ 2584 chk4, 2585 chk5, 2586 }, 2587 }, 2588 segmentSize: 3 * chunkSize, 2589 expSegmentSizes: []int{3 * chunkSize, 2 * chunkSize}, 2590 expSegmentsCount: 2, 2591 }, 2592 // 6:Adding multiple batches of chunks. 2593 { 2594 chks: [][]chunks.Meta{ 2595 []chunks.Meta{ 2596 chk1, 2597 }, 2598 []chunks.Meta{ 2599 chk2, 2600 chk3, 2601 }, 2602 []chunks.Meta{ 2603 chk4, 2604 }, 2605 }, 2606 segmentSize: 2 * chunkSize, 2607 expSegmentSizes: []int{2 * chunkSize, 2 * chunkSize}, 2608 expSegmentsCount: 2, 2609 }, 2610 } 2611 2612 for i, test := range tests { 2613 t.Run(strconv.Itoa(i), func(t *testing.T) { 2614 2615 tempDir, err := ioutil.TempDir("", "test_chunk_writer") 2616 testutil.Ok(t, err) 2617 defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }() 2618 2619 chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize)) 2620 testutil.Ok(t, err) 2621 2622 for _, chks := range test.chks { 2623 testutil.Ok(t, chunkw.WriteChunks(chks...)) 2624 } 2625 testutil.Ok(t, chunkw.Close()) 2626 2627 files, err := ioutil.ReadDir(tempDir) 2628 testutil.Ok(t, err) 2629 testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch") 2630 2631 // Verify that all data is written to the segments. 2632 sizeExp := 0 2633 sizeAct := 0 2634 2635 for _, chks := range test.chks { 2636 for _, chk := range chks { 2637 l := make([]byte, binary.MaxVarintLen32) 2638 sizeExp += binary.PutUvarint(l, uint64(len(chk.Chunk.Bytes()))) // The length field. 2639 sizeExp += chunks.ChunkEncodingSize 2640 sizeExp += len(chk.Chunk.Bytes()) // The data itself. 2641 sizeExp += crc32.Size // The 4 bytes of crc32 2642 } 2643 } 2644 sizeExp += test.expSegmentsCount * chunks.SegmentHeaderSize // The segment header bytes. 2645 2646 for i, f := range files { 2647 size := int(f.Size()) 2648 // Verify that the segment is the same or smaller than the expected size. 2649 testutil.Assert(t, chunks.SegmentHeaderSize+test.expSegmentSizes[i] >= size, "Segment:%v should NOT be bigger than:%v actual:%v", i, chunks.SegmentHeaderSize+test.expSegmentSizes[i], size) 2650 2651 sizeAct += size 2652 } 2653 testutil.Equals(t, sizeExp, sizeAct) 2654 2655 // Check the content of the chunks. 2656 r, err := chunks.NewDirReader(tempDir, nil) 2657 testutil.Ok(t, err) 2658 defer func() { testutil.Ok(t, r.Close()) }() 2659 2660 for _, chks := range test.chks { 2661 for _, chkExp := range chks { 2662 chkAct, err := r.Chunk(chkExp.Ref) 2663 testutil.Ok(t, err) 2664 testutil.Equals(t, chkExp.Chunk.Bytes(), chkAct.Bytes()) 2665 } 2666 } 2667 }) 2668 } 2669} 2670 2671// TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently. 2672// Regression test for https://github.com/prometheus/prometheus/pull/6514. 2673func TestChunkReader_ConcurrentReads(t *testing.T) { 2674 chks := []chunks.Meta{ 2675 tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}), 2676 tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}), 2677 tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}), 2678 tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}), 2679 tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}), 2680 } 2681 2682 tempDir, err := ioutil.TempDir("", "test_chunk_writer") 2683 testutil.Ok(t, err) 2684 defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }() 2685 2686 chunkw, err := chunks.NewWriter(tempDir) 2687 testutil.Ok(t, err) 2688 2689 testutil.Ok(t, chunkw.WriteChunks(chks...)) 2690 testutil.Ok(t, chunkw.Close()) 2691 2692 r, err := chunks.NewDirReader(tempDir, nil) 2693 testutil.Ok(t, err) 2694 2695 var wg sync.WaitGroup 2696 for _, chk := range chks { 2697 for i := 0; i < 100; i++ { 2698 wg.Add(1) 2699 go func(chunk chunks.Meta) { 2700 defer wg.Done() 2701 2702 chkAct, err := r.Chunk(chunk.Ref) 2703 testutil.Ok(t, err) 2704 testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes()) 2705 }(chk) 2706 } 2707 wg.Wait() 2708 } 2709 testutil.Ok(t, r.Close()) 2710} 2711