1package tsm1_test 2 3import ( 4 "archive/tar" 5 "bytes" 6 "context" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "math" 11 "math/rand" 12 "os" 13 "path" 14 "path/filepath" 15 "reflect" 16 "runtime" 17 "strings" 18 "sync" 19 "testing" 20 "time" 21 22 "github.com/google/go-cmp/cmp" 23 "github.com/influxdata/influxdb/logger" 24 "github.com/influxdata/influxdb/models" 25 "github.com/influxdata/influxdb/pkg/deep" 26 "github.com/influxdata/influxdb/query" 27 "github.com/influxdata/influxdb/tsdb" 28 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 29 "github.com/influxdata/influxdb/tsdb/index/inmem" 30 "github.com/influxdata/influxql" 31) 32 33// Ensure that deletes only sent to the WAL will clear out the data from the cache on restart 34func TestEngine_DeleteWALLoadMetadata(t *testing.T) { 35 for _, index := range tsdb.RegisteredIndexes() { 36 t.Run(index, func(t *testing.T) { 37 e := MustOpenEngine(index) 38 defer e.Close() 39 40 if err := e.WritePointsString( 41 `cpu,host=A value=1.1 1000000000`, 42 `cpu,host=B value=1.2 2000000000`, 43 ); err != nil { 44 t.Fatalf("failed to write points: %s", err.Error()) 45 } 46 47 // Remove series. 48 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} 49 if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { 50 t.Fatalf("failed to delete series: %s", err.Error()) 51 } 52 53 // Ensure we can close and load index from the WAL 54 if err := e.Reopen(); err != nil { 55 t.Fatal(err) 56 } 57 58 if exp, got := 0, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=A", "value"))); exp != got { 59 t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp) 60 } 61 62 if exp, got := 1, len(e.Cache.Values(tsm1.SeriesFieldKeyBytes("cpu,host=B", "value"))); exp != got { 63 t.Fatalf("unexpected number of values: got: %d. exp: %d", got, exp) 64 } 65 }) 66 } 67} 68 69// See https://github.com/influxdata/influxdb/issues/14229 70func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) { 71 for _, index := range tsdb.RegisteredIndexes() { 72 t.Run(index, func(t *testing.T) { 73 e := MustOpenEngine(index) 74 defer e.Close() 75 76 if err := e.WritePointsString( 77 `cpu,host=A value=1.1 1000000000`, 78 `cpu,host=B value=1.2 2000000000`, 79 ); err != nil { 80 t.Fatalf("failed to write points: %s", err.Error()) 81 } 82 83 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 84 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 85 e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"})) 86 87 // Verify series exist. 88 n, err := seriesExist(e, "cpu", []string{"host"}) 89 if err != nil { 90 t.Fatal(err) 91 } else if got, exp := n, 2; got != exp { 92 t.Fatalf("got %d points, expected %d", got, exp) 93 } 94 95 // Simulate restart of server 96 if err := e.Reopen(); err != nil { 97 t.Fatal(err) 98 } 99 100 // Snapshot the cache 101 if err := e.WriteSnapshot(); err != nil { 102 t.Fatalf("failed to snapshot: %s", err.Error()) 103 } 104 105 // Verify series exist. 106 n, err = seriesExist(e, "cpu", []string{"host"}) 107 if err != nil { 108 t.Fatal(err) 109 } else if got, exp := n, 2; got != exp { 110 t.Fatalf("got %d points, expected %d", got, exp) 111 } 112 113 // Delete the series 114 itr := &seriesIterator{keys: [][]byte{ 115 []byte("cpu,host=A"), 116 []byte("cpu,host=B"), 117 }, 118 } 119 if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { 120 t.Fatalf("failed to delete series: %s", err.Error()) 121 } 122 123 // Verify the series are no longer present. 124 n, err = seriesExist(e, "cpu", []string{"host"}) 125 if err != nil { 126 t.Fatal(err) 127 } else if got, exp := n, 0; got != exp { 128 t.Fatalf("got %d points, expected %d", got, exp) 129 } 130 131 // Simulate restart of server 132 if err := e.Reopen(); err != nil { 133 t.Fatal(err) 134 } 135 136 // Verify the series are no longer present. 137 n, err = seriesExist(e, "cpu", []string{"host"}) 138 if err != nil { 139 t.Fatal(err) 140 } else if got, exp := n, 0; got != exp { 141 t.Fatalf("got %d points, expected %d", got, exp) 142 } 143 }) 144 } 145} 146 147func seriesExist(e *Engine, m string, dims []string) (int, error) { 148 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 149 Expr: influxql.MustParseExpr(`value`), 150 Dimensions: []string{"host"}, 151 StartTime: influxql.MinTime, 152 EndTime: influxql.MaxTime, 153 Ascending: false, 154 }) 155 if err != nil { 156 return 0, err 157 } else if itr == nil { 158 return 0, nil 159 } 160 defer itr.Close() 161 fitr := itr.(query.FloatIterator) 162 163 var n int 164 for { 165 p, err := fitr.Next() 166 if err != nil { 167 return 0, err 168 } else if p == nil { 169 return n, nil 170 } 171 n++ 172 } 173} 174 175// Ensure that the engine can write & read shard digest files. 176func TestEngine_Digest(t *testing.T) { 177 e := MustOpenEngine(inmem.IndexName) 178 defer e.Close() 179 180 if err := e.Open(); err != nil { 181 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 182 } 183 184 // Create a few points. 185 points := []models.Point{ 186 MustParsePointString("cpu,host=A value=1.1 1000000000"), 187 MustParsePointString("cpu,host=B value=1.2 2000000000"), 188 } 189 190 if err := e.WritePoints(points); err != nil { 191 t.Fatalf("failed to write points: %s", err.Error()) 192 } 193 194 // Force a compaction. 195 e.ScheduleFullCompaction() 196 197 digest := func() ([]span, error) { 198 // Get a reader for the shard's digest. 199 r, sz, err := e.Digest() 200 if err != nil { 201 return nil, err 202 } 203 204 if sz <= 0 { 205 t.Fatalf("expected digest size > 0") 206 } 207 208 // Make sure the digest can be read. 209 dr, err := tsm1.NewDigestReader(r) 210 if err != nil { 211 r.Close() 212 return nil, err 213 } 214 defer dr.Close() 215 216 _, err = dr.ReadManifest() 217 if err != nil { 218 t.Fatal(err) 219 } 220 221 got := []span{} 222 223 for { 224 k, s, err := dr.ReadTimeSpan() 225 if err == io.EOF { 226 break 227 } else if err != nil { 228 return nil, err 229 } 230 231 got = append(got, span{ 232 key: k, 233 tspan: s, 234 }) 235 } 236 237 return got, nil 238 } 239 240 exp := []span{ 241 span{ 242 key: "cpu,host=A#!~#value", 243 tspan: &tsm1.DigestTimeSpan{ 244 Ranges: []tsm1.DigestTimeRange{ 245 tsm1.DigestTimeRange{ 246 Min: 1000000000, 247 Max: 1000000000, 248 N: 1, 249 CRC: 1048747083, 250 }, 251 }, 252 }, 253 }, 254 span{ 255 key: "cpu,host=B#!~#value", 256 tspan: &tsm1.DigestTimeSpan{ 257 Ranges: []tsm1.DigestTimeRange{ 258 tsm1.DigestTimeRange{ 259 Min: 2000000000, 260 Max: 2000000000, 261 N: 1, 262 CRC: 734984746, 263 }, 264 }, 265 }, 266 }, 267 } 268 269 for n := 0; n < 2; n++ { 270 got, err := digest() 271 if err != nil { 272 t.Fatalf("n = %d: %s", n, err) 273 } 274 275 // Make sure the data in the digest was valid. 276 if !reflect.DeepEqual(exp, got) { 277 t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got) 278 } 279 } 280 281 // Test that writing more points causes the digest to be updated. 282 points = []models.Point{ 283 MustParsePointString("cpu,host=C value=1.1 3000000000"), 284 } 285 286 if err := e.WritePoints(points); err != nil { 287 t.Fatalf("failed to write points: %s", err.Error()) 288 } 289 290 // Force a compaction. 291 e.ScheduleFullCompaction() 292 293 // Get new digest. 294 got, err := digest() 295 if err != nil { 296 t.Fatal(err) 297 } 298 299 exp = append(exp, span{ 300 key: "cpu,host=C#!~#value", 301 tspan: &tsm1.DigestTimeSpan{ 302 Ranges: []tsm1.DigestTimeRange{ 303 tsm1.DigestTimeRange{ 304 Min: 3000000000, 305 Max: 3000000000, 306 N: 1, 307 CRC: 2553233514, 308 }, 309 }, 310 }, 311 }) 312 313 if !reflect.DeepEqual(exp, got) { 314 t.Fatalf("\nexp = %v\ngot = %v\n", exp, got) 315 } 316} 317 318type span struct { 319 key string 320 tspan *tsm1.DigestTimeSpan 321} 322 323// Ensure engine handles concurrent calls to Digest(). 324func TestEngine_Digest_Concurrent(t *testing.T) { 325 e := MustOpenEngine(inmem.IndexName) 326 defer e.Close() 327 328 if err := e.Open(); err != nil { 329 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 330 } 331 332 // Create a few points. 333 points := []models.Point{ 334 MustParsePointString("cpu,host=A value=1.1 1000000000"), 335 MustParsePointString("cpu,host=B value=1.2 2000000000"), 336 } 337 338 if err := e.WritePoints(points); err != nil { 339 t.Fatalf("failed to write points: %s", err.Error()) 340 } 341 342 // Force a compaction. 343 e.ScheduleFullCompaction() 344 345 // Start multiple waiting goroutines, ready to call Digest(). 346 start := make(chan struct{}) 347 errs := make(chan error) 348 wg := &sync.WaitGroup{} 349 for n := 0; n < 100; n++ { 350 wg.Add(1) 351 go func() { 352 defer wg.Done() 353 <-start 354 if _, _, err := e.Digest(); err != nil { 355 errs <- err 356 } 357 }() 358 } 359 360 // Goroutine to close errs channel after all routines have finished. 361 go func() { wg.Wait(); close(errs) }() 362 363 // Signal all goroutines to call Digest(). 364 close(start) 365 366 // Check for digest errors. 367 for err := range errs { 368 if err != nil { 369 t.Fatal(err) 370 } 371 } 372} 373 374// Ensure that the engine will backup any TSM files created since the passed in time 375func TestEngine_Backup(t *testing.T) { 376 sfile := MustOpenSeriesFile() 377 defer sfile.Close() 378 379 // Generate temporary file. 380 f, _ := ioutil.TempFile("", "tsm") 381 f.Close() 382 os.Remove(f.Name()) 383 walPath := filepath.Join(f.Name(), "wal") 384 os.MkdirAll(walPath, 0777) 385 defer os.RemoveAll(f.Name()) 386 387 // Create a few points. 388 p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") 389 p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") 390 p3 := MustParsePointString("cpu,host=C value=1.3 3000000000") 391 392 // Write those points to the engine. 393 db := path.Base(f.Name()) 394 opt := tsdb.NewEngineOptions() 395 opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile) 396 idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt) 397 defer idx.Close() 398 399 e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine) 400 401 // mock the planner so compactions don't run during the test 402 e.CompactionPlan = &mockPlanner{} 403 404 if err := e.Open(); err != nil { 405 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 406 } 407 408 if err := e.WritePoints([]models.Point{p1}); err != nil { 409 t.Fatalf("failed to write points: %s", err.Error()) 410 } 411 if err := e.WriteSnapshot(); err != nil { 412 t.Fatalf("failed to snapshot: %s", err.Error()) 413 } 414 415 if err := e.WritePoints([]models.Point{p2}); err != nil { 416 t.Fatalf("failed to write points: %s", err.Error()) 417 } 418 419 b := bytes.NewBuffer(nil) 420 if err := e.Backup(b, "", time.Unix(0, 0)); err != nil { 421 t.Fatalf("failed to backup: %s", err.Error()) 422 } 423 424 tr := tar.NewReader(b) 425 if len(e.FileStore.Files()) != 2 { 426 t.Fatalf("file count wrong: exp: %d, got: %d", 2, len(e.FileStore.Files())) 427 } 428 429 fileNames := map[string]bool{} 430 for _, f := range e.FileStore.Files() { 431 fileNames[filepath.Base(f.Path())] = true 432 } 433 434 th, err := tr.Next() 435 for err == nil { 436 if !fileNames[th.Name] { 437 t.Errorf("Extra file in backup: %q", th.Name) 438 } 439 delete(fileNames, th.Name) 440 th, err = tr.Next() 441 } 442 443 if err != nil && err != io.EOF { 444 t.Fatalf("Problem reading tar header: %s", err) 445 } 446 447 for f := range fileNames { 448 t.Errorf("File missing from backup: %s", f) 449 } 450 451 if t.Failed() { 452 t.FailNow() 453 } 454 455 lastBackup := time.Now() 456 457 // we have to sleep for a second because last modified times only have second level precision. 458 // so this test won't work properly unless the file is at least a second past the last one 459 time.Sleep(time.Second) 460 461 if err := e.WritePoints([]models.Point{p3}); err != nil { 462 t.Fatalf("failed to write points: %s", err.Error()) 463 } 464 465 b = bytes.NewBuffer(nil) 466 if err := e.Backup(b, "", lastBackup); err != nil { 467 t.Fatalf("failed to backup: %s", err.Error()) 468 } 469 470 tr = tar.NewReader(b) 471 th, err = tr.Next() 472 if err != nil { 473 t.Fatalf("error getting next tar header: %s", err.Error()) 474 } 475 476 mostRecentFile := e.FileStore.Files()[e.FileStore.Count()-1].Path() 477 if !strings.Contains(mostRecentFile, th.Name) || th.Name == "" { 478 t.Fatalf("file name doesn't match:\n\tgot: %s\n\texp: %s", th.Name, mostRecentFile) 479 } 480} 481 482func TestEngine_Export(t *testing.T) { 483 // Generate temporary file. 484 f, _ := ioutil.TempFile("", "tsm") 485 f.Close() 486 os.Remove(f.Name()) 487 walPath := filepath.Join(f.Name(), "wal") 488 os.MkdirAll(walPath, 0777) 489 defer os.RemoveAll(f.Name()) 490 491 // Create a few points. 492 p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") 493 p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") 494 p3 := MustParsePointString("cpu,host=C value=1.3 3000000000") 495 496 sfile := MustOpenSeriesFile() 497 defer sfile.Close() 498 499 // Write those points to the engine. 500 db := path.Base(f.Name()) 501 opt := tsdb.NewEngineOptions() 502 opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile) 503 idx := tsdb.MustOpenIndex(1, db, filepath.Join(f.Name(), "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt) 504 defer idx.Close() 505 506 e := tsm1.NewEngine(1, idx, f.Name(), walPath, sfile.SeriesFile, opt).(*tsm1.Engine) 507 508 // mock the planner so compactions don't run during the test 509 e.CompactionPlan = &mockPlanner{} 510 511 if err := e.Open(); err != nil { 512 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 513 } 514 515 if err := e.WritePoints([]models.Point{p1}); err != nil { 516 t.Fatalf("failed to write points: %s", err.Error()) 517 } 518 if err := e.WriteSnapshot(); err != nil { 519 t.Fatalf("failed to snapshot: %s", err.Error()) 520 } 521 522 if err := e.WritePoints([]models.Point{p2}); err != nil { 523 t.Fatalf("failed to write points: %s", err.Error()) 524 } 525 if err := e.WriteSnapshot(); err != nil { 526 t.Fatalf("failed to snapshot: %s", err.Error()) 527 } 528 529 if err := e.WritePoints([]models.Point{p3}); err != nil { 530 t.Fatalf("failed to write points: %s", err.Error()) 531 } 532 533 // export the whole DB 534 var exBuf bytes.Buffer 535 if err := e.Export(&exBuf, "", time.Unix(0, 0), time.Unix(0, 4000000000)); err != nil { 536 t.Fatalf("failed to export: %s", err.Error()) 537 } 538 539 var bkBuf bytes.Buffer 540 if err := e.Backup(&bkBuf, "", time.Unix(0, 0)); err != nil { 541 t.Fatalf("failed to backup: %s", err.Error()) 542 } 543 544 if len(e.FileStore.Files()) != 3 { 545 t.Fatalf("file count wrong: exp: %d, got: %d", 3, len(e.FileStore.Files())) 546 } 547 548 fileNames := map[string]bool{} 549 for _, f := range e.FileStore.Files() { 550 fileNames[filepath.Base(f.Path())] = true 551 } 552 553 fileData, err := getExportData(&exBuf) 554 if err != nil { 555 t.Errorf("Error extracting data from export: %s", err.Error()) 556 } 557 558 // TEST 1: did we get any extra files not found in the store? 559 for k := range fileData { 560 if _, ok := fileNames[k]; !ok { 561 t.Errorf("exported a file not in the store: %s", k) 562 } 563 } 564 565 // TEST 2: did we miss any files that the store had? 566 for k := range fileNames { 567 if _, ok := fileData[k]; !ok { 568 t.Errorf("failed to export a file from the store: %s", k) 569 } 570 } 571 572 // TEST 3: Does 'backup' get the same files + bits? 573 tr := tar.NewReader(&bkBuf) 574 575 th, err := tr.Next() 576 for err == nil { 577 expData, ok := fileData[th.Name] 578 if !ok { 579 t.Errorf("Extra file in backup: %q", th.Name) 580 continue 581 } 582 583 buf := new(bytes.Buffer) 584 if _, err := io.Copy(buf, tr); err != nil { 585 t.Fatal(err) 586 } 587 588 if !equalBuffers(expData, buf) { 589 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 590 } 591 592 th, err = tr.Next() 593 } 594 595 if t.Failed() { 596 t.FailNow() 597 } 598 599 // TEST 4: Are subsets (1), (2), (3), (1,2), (2,3) accurately found in the larger export? 600 // export the whole DB 601 var ex1 bytes.Buffer 602 if err := e.Export(&ex1, "", time.Unix(0, 0), time.Unix(0, 1000000000)); err != nil { 603 t.Fatalf("failed to export: %s", err.Error()) 604 } 605 ex1Data, err := getExportData(&ex1) 606 if err != nil { 607 t.Errorf("Error extracting data from export: %s", err.Error()) 608 } 609 610 for k, v := range ex1Data { 611 fullExp, ok := fileData[k] 612 if !ok { 613 t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) 614 continue 615 } 616 if !equalBuffers(fullExp, v) { 617 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 618 } 619 620 } 621 622 var ex2 bytes.Buffer 623 if err := e.Export(&ex2, "", time.Unix(0, 1000000001), time.Unix(0, 2000000000)); err != nil { 624 t.Fatalf("failed to export: %s", err.Error()) 625 } 626 627 ex2Data, err := getExportData(&ex2) 628 if err != nil { 629 t.Errorf("Error extracting data from export: %s", err.Error()) 630 } 631 632 for k, v := range ex2Data { 633 fullExp, ok := fileData[k] 634 if !ok { 635 t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) 636 continue 637 } 638 if !equalBuffers(fullExp, v) { 639 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 640 } 641 642 } 643 644 var ex3 bytes.Buffer 645 if err := e.Export(&ex3, "", time.Unix(0, 2000000001), time.Unix(0, 3000000000)); err != nil { 646 t.Fatalf("failed to export: %s", err.Error()) 647 } 648 649 ex3Data, err := getExportData(&ex3) 650 if err != nil { 651 t.Errorf("Error extracting data from export: %s", err.Error()) 652 } 653 654 for k, v := range ex3Data { 655 fullExp, ok := fileData[k] 656 if !ok { 657 t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) 658 continue 659 } 660 if !equalBuffers(fullExp, v) { 661 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 662 } 663 664 } 665 666 var ex12 bytes.Buffer 667 if err := e.Export(&ex12, "", time.Unix(0, 0), time.Unix(0, 2000000000)); err != nil { 668 t.Fatalf("failed to export: %s", err.Error()) 669 } 670 671 ex12Data, err := getExportData(&ex12) 672 if err != nil { 673 t.Errorf("Error extracting data from export: %s", err.Error()) 674 } 675 676 for k, v := range ex12Data { 677 fullExp, ok := fileData[k] 678 if !ok { 679 t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) 680 continue 681 } 682 if !equalBuffers(fullExp, v) { 683 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 684 } 685 686 } 687 688 var ex23 bytes.Buffer 689 if err := e.Export(&ex23, "", time.Unix(0, 1000000001), time.Unix(0, 3000000000)); err != nil { 690 t.Fatalf("failed to export: %s", err.Error()) 691 } 692 693 ex23Data, err := getExportData(&ex23) 694 if err != nil { 695 t.Errorf("Error extracting data from export: %s", err.Error()) 696 } 697 698 for k, v := range ex23Data { 699 fullExp, ok := fileData[k] 700 if !ok { 701 t.Errorf("Extracting subset resulted in file not found in full export: %s", err.Error()) 702 continue 703 } 704 if !equalBuffers(fullExp, v) { 705 t.Errorf("2Difference in data between backup and Export for file %s", th.Name) 706 } 707 708 } 709} 710 711func equalBuffers(bufA, bufB *bytes.Buffer) bool { 712 for i, v := range bufA.Bytes() { 713 if v != bufB.Bytes()[i] { 714 return false 715 } 716 } 717 return true 718} 719 720func getExportData(exBuf *bytes.Buffer) (map[string]*bytes.Buffer, error) { 721 722 tr := tar.NewReader(exBuf) 723 724 fileData := make(map[string]*bytes.Buffer) 725 726 // TEST 1: Get the bits for each file. If we got a file the store doesn't know about, report error 727 for { 728 th, err := tr.Next() 729 if err == io.EOF { 730 break 731 } 732 if err != nil { 733 return nil, err 734 } 735 736 buf := new(bytes.Buffer) 737 if _, err := io.Copy(buf, tr); err != nil { 738 return nil, err 739 } 740 fileData[th.Name] = buf 741 742 } 743 744 return fileData, nil 745} 746 747// Ensure engine can create an ascending iterator for cached values. 748func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { 749 t.Parallel() 750 751 for _, index := range tsdb.RegisteredIndexes() { 752 t.Run(index, func(t *testing.T) { 753 e := MustOpenEngine(index) 754 defer e.Close() 755 756 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 757 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 758 759 if err := e.WritePointsString( 760 `cpu,host=A value=1.1 1000000000`, 761 `cpu,host=A value=1.2 2000000000`, 762 `cpu,host=A value=1.3 3000000000`, 763 ); err != nil { 764 t.Fatalf("failed to write points: %s", err.Error()) 765 } 766 767 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 768 Expr: influxql.MustParseExpr(`value`), 769 Dimensions: []string{"host"}, 770 StartTime: influxql.MinTime, 771 EndTime: influxql.MaxTime, 772 Ascending: true, 773 }) 774 if err != nil { 775 t.Fatal(err) 776 } 777 fitr := itr.(query.FloatIterator) 778 779 if p, err := fitr.Next(); err != nil { 780 t.Fatalf("unexpected error(0): %v", err) 781 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) { 782 t.Fatalf("unexpected point(0): %v", p) 783 } 784 if p, err := fitr.Next(); err != nil { 785 t.Fatalf("unexpected error(1): %v", err) 786 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) { 787 t.Fatalf("unexpected point(1): %v", p) 788 } 789 if p, err := fitr.Next(); err != nil { 790 t.Fatalf("unexpected error(2): %v", err) 791 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) { 792 t.Fatalf("unexpected point(2): %v", p) 793 } 794 if p, err := fitr.Next(); err != nil { 795 t.Fatalf("expected eof, got error: %v", err) 796 } else if p != nil { 797 t.Fatalf("expected eof: %v", p) 798 } 799 }) 800 } 801} 802 803// Ensure engine can create an descending iterator for cached values. 804func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { 805 t.Parallel() 806 807 for _, index := range tsdb.RegisteredIndexes() { 808 t.Run(index, func(t *testing.T) { 809 810 e := MustOpenEngine(index) 811 defer e.Close() 812 813 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 814 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 815 816 if err := e.WritePointsString( 817 `cpu,host=A value=1.1 1000000000`, 818 `cpu,host=A value=1.2 2000000000`, 819 `cpu,host=A value=1.3 3000000000`, 820 ); err != nil { 821 t.Fatalf("failed to write points: %s", err.Error()) 822 } 823 824 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 825 Expr: influxql.MustParseExpr(`value`), 826 Dimensions: []string{"host"}, 827 StartTime: influxql.MinTime, 828 EndTime: influxql.MaxTime, 829 Ascending: false, 830 }) 831 if err != nil { 832 t.Fatal(err) 833 } 834 fitr := itr.(query.FloatIterator) 835 836 if p, err := fitr.Next(); err != nil { 837 t.Fatalf("unexpected error(0): %v", err) 838 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) { 839 t.Fatalf("unexpected point(0): %v", p) 840 } 841 if p, err := fitr.Next(); err != nil { 842 t.Fatalf("unepxected error(1): %v", err) 843 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) { 844 t.Fatalf("unexpected point(1): %v", p) 845 } 846 if p, err := fitr.Next(); err != nil { 847 t.Fatalf("unexpected error(2): %v", err) 848 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) { 849 t.Fatalf("unexpected point(2): %v", p) 850 } 851 if p, err := fitr.Next(); err != nil { 852 t.Fatalf("expected eof, got error: %v", err) 853 } else if p != nil { 854 t.Fatalf("expected eof: %v", p) 855 } 856 }) 857 } 858} 859 860// Ensure engine can create an ascending iterator for tsm values. 861func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { 862 t.Parallel() 863 864 for _, index := range tsdb.RegisteredIndexes() { 865 t.Run(index, func(t *testing.T) { 866 e := MustOpenEngine(index) 867 defer e.Close() 868 869 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 870 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 871 872 if err := e.WritePointsString( 873 `cpu,host=A value=1.1 1000000000`, 874 `cpu,host=A value=1.2 2000000000`, 875 `cpu,host=A value=1.3 3000000000`, 876 ); err != nil { 877 t.Fatalf("failed to write points: %s", err.Error()) 878 } 879 e.MustWriteSnapshot() 880 881 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 882 Expr: influxql.MustParseExpr(`value`), 883 Dimensions: []string{"host"}, 884 StartTime: 1000000000, 885 EndTime: 3000000000, 886 Ascending: true, 887 }) 888 if err != nil { 889 t.Fatal(err) 890 } 891 defer itr.Close() 892 fitr := itr.(query.FloatIterator) 893 894 if p, err := fitr.Next(); err != nil { 895 t.Fatalf("unexpected error(0): %v", err) 896 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) { 897 t.Fatalf("unexpected point(0): %v", p) 898 } 899 if p, err := fitr.Next(); err != nil { 900 t.Fatalf("unexpected error(1): %v", err) 901 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) { 902 t.Fatalf("unexpected point(1): %v", p) 903 } 904 if p, err := fitr.Next(); err != nil { 905 t.Fatalf("unexpected error(2): %v", err) 906 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) { 907 t.Fatalf("unexpected point(2): %v", p) 908 } 909 if p, err := fitr.Next(); err != nil { 910 t.Fatalf("expected eof, got error: %v", err) 911 } else if p != nil { 912 t.Fatalf("expected eof: %v", p) 913 } 914 }) 915 } 916} 917 918// Ensure engine can create an descending iterator for cached values. 919func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { 920 t.Parallel() 921 922 for _, index := range tsdb.RegisteredIndexes() { 923 t.Run(index, func(t *testing.T) { 924 e := MustOpenEngine(index) 925 defer e.Close() 926 927 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 928 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 929 930 if err := e.WritePointsString( 931 `cpu,host=A value=1.1 1000000000`, 932 `cpu,host=A value=1.2 2000000000`, 933 `cpu,host=A value=1.3 3000000000`, 934 ); err != nil { 935 t.Fatalf("failed to write points: %s", err.Error()) 936 } 937 e.MustWriteSnapshot() 938 939 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 940 Expr: influxql.MustParseExpr(`value`), 941 Dimensions: []string{"host"}, 942 StartTime: influxql.MinTime, 943 EndTime: influxql.MaxTime, 944 Ascending: false, 945 }) 946 if err != nil { 947 t.Fatal(err) 948 } 949 defer itr.Close() 950 fitr := itr.(query.FloatIterator) 951 952 if p, err := fitr.Next(); err != nil { 953 t.Fatalf("unexpected error(0): %v", err) 954 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) { 955 t.Fatalf("unexpected point(0): %v", p) 956 } 957 if p, err := fitr.Next(); err != nil { 958 t.Fatalf("unexpected error(1): %v", err) 959 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2}) { 960 t.Fatalf("unexpected point(1): %v", p) 961 } 962 if p, err := fitr.Next(); err != nil { 963 t.Fatalf("unexpected error(2): %v", err) 964 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) { 965 t.Fatalf("unexpected point(2): %v", p) 966 } 967 if p, err := fitr.Next(); err != nil { 968 t.Fatalf("expected eof, got error: %v", err) 969 } else if p != nil { 970 t.Fatalf("expected eof: %v", p) 971 } 972 }) 973 } 974} 975 976// Ensure engine can create an iterator with auxiliary fields. 977func TestEngine_CreateIterator_Aux(t *testing.T) { 978 t.Parallel() 979 980 for _, index := range tsdb.RegisteredIndexes() { 981 t.Run(index, func(t *testing.T) { 982 e := MustOpenEngine(index) 983 defer e.Close() 984 985 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 986 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float) 987 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 988 989 if err := e.WritePointsString( 990 `cpu,host=A value=1.1 1000000000`, 991 `cpu,host=A F=100 1000000000`, 992 `cpu,host=A value=1.2 2000000000`, 993 `cpu,host=A value=1.3 3000000000`, 994 `cpu,host=A F=200 3000000000`, 995 ); err != nil { 996 t.Fatalf("failed to write points: %s", err.Error()) 997 } 998 999 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 1000 Expr: influxql.MustParseExpr(`value`), 1001 Aux: []influxql.VarRef{{Val: "F"}}, 1002 Dimensions: []string{"host"}, 1003 StartTime: influxql.MinTime, 1004 EndTime: influxql.MaxTime, 1005 Ascending: true, 1006 }) 1007 if err != nil { 1008 t.Fatal(err) 1009 } 1010 fitr := itr.(query.FloatIterator) 1011 1012 if p, err := fitr.Next(); err != nil { 1013 t.Fatalf("unexpected error(0): %v", err) 1014 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1, Aux: []interface{}{float64(100)}}) { 1015 t.Fatalf("unexpected point(0): %v", p) 1016 } 1017 if p, err := fitr.Next(); err != nil { 1018 t.Fatalf("unexpected error(1): %v", err) 1019 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 2000000000, Value: 1.2, Aux: []interface{}{(*float64)(nil)}}) { 1020 t.Fatalf("unexpected point(1): %v", p) 1021 } 1022 if p, err := fitr.Next(); err != nil { 1023 t.Fatalf("unexpected error(2): %v", err) 1024 } else if !deep.Equal(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3, Aux: []interface{}{float64(200)}}) { 1025 t.Fatalf("unexpected point(2): %v", p) 1026 } 1027 if p, err := fitr.Next(); err != nil { 1028 t.Fatalf("expected eof, got error: %v", err) 1029 } else if p != nil { 1030 t.Fatalf("expected eof: %v", p) 1031 } 1032 }) 1033 } 1034} 1035 1036// Ensure engine can create an iterator with a condition. 1037func TestEngine_CreateIterator_Condition(t *testing.T) { 1038 t.Parallel() 1039 1040 for _, index := range tsdb.RegisteredIndexes() { 1041 t.Run(index, func(t *testing.T) { 1042 e := MustOpenEngine(index) 1043 defer e.Close() 1044 1045 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 1046 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float) 1047 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float) 1048 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 1049 e.SetFieldName([]byte("cpu"), "X") 1050 e.SetFieldName([]byte("cpu"), "Y") 1051 1052 if err := e.WritePointsString( 1053 `cpu,host=A value=1.1 1000000000`, 1054 `cpu,host=A X=10 1000000000`, 1055 `cpu,host=A Y=100 1000000000`, 1056 1057 `cpu,host=A value=1.2 2000000000`, 1058 1059 `cpu,host=A value=1.3 3000000000`, 1060 `cpu,host=A X=20 3000000000`, 1061 `cpu,host=A Y=200 3000000000`, 1062 ); err != nil { 1063 t.Fatalf("failed to write points: %s", err.Error()) 1064 } 1065 1066 itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ 1067 Expr: influxql.MustParseExpr(`value`), 1068 Dimensions: []string{"host"}, 1069 Condition: influxql.MustParseExpr(`X = 10 OR Y > 150`), 1070 StartTime: influxql.MinTime, 1071 EndTime: influxql.MaxTime, 1072 Ascending: true, 1073 }) 1074 if err != nil { 1075 t.Fatal(err) 1076 } 1077 fitr := itr.(query.FloatIterator) 1078 1079 if p, err := fitr.Next(); err != nil { 1080 t.Fatalf("unexpected error(0): %v", err) 1081 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 1000000000, Value: 1.1}) { 1082 t.Fatalf("unexpected point(0): %v", p) 1083 } 1084 if p, err := fitr.Next(); err != nil { 1085 t.Fatalf("unexpected point(1): %v", err) 1086 } else if !reflect.DeepEqual(p, &query.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 3000000000, Value: 1.3}) { 1087 t.Fatalf("unexpected point(1): %v", p) 1088 } 1089 if p, err := fitr.Next(); err != nil { 1090 t.Fatalf("expected eof, got error: %v", err) 1091 } else if p != nil { 1092 t.Fatalf("expected eof: %v", p) 1093 } 1094 }) 1095 } 1096} 1097 1098// Test that series id set gets updated and returned appropriately. 1099func TestIndex_SeriesIDSet(t *testing.T) { 1100 test := func(index string) error { 1101 engine := MustOpenEngine(index) 1102 defer engine.Close() 1103 1104 // Add some series. 1105 engine.MustAddSeries("cpu", map[string]string{"host": "a", "region": "west"}) 1106 engine.MustAddSeries("cpu", map[string]string{"host": "b", "region": "west"}) 1107 engine.MustAddSeries("cpu", map[string]string{"host": "b"}) 1108 engine.MustAddSeries("gpu", nil) 1109 engine.MustAddSeries("gpu", map[string]string{"host": "b"}) 1110 engine.MustAddSeries("mem", map[string]string{"host": "z"}) 1111 1112 // Collect series IDs. 1113 seriesIDMap := map[string]uint64{} 1114 var e tsdb.SeriesIDElem 1115 var err error 1116 1117 itr := engine.sfile.SeriesIDIterator() 1118 for e, err = itr.Next(); ; e, err = itr.Next() { 1119 if err != nil { 1120 return err 1121 } else if e.SeriesID == 0 { 1122 break 1123 } 1124 1125 name, tags := tsdb.ParseSeriesKey(engine.sfile.SeriesKey(e.SeriesID)) 1126 key := fmt.Sprintf("%s%s", name, tags.HashKey()) 1127 seriesIDMap[key] = e.SeriesID 1128 } 1129 1130 for _, id := range seriesIDMap { 1131 if !engine.SeriesIDSet().Contains(id) { 1132 return fmt.Errorf("bitmap does not contain ID: %d", id) 1133 } 1134 } 1135 1136 // Drop all the series for the gpu measurement and they should no longer 1137 // be in the series ID set. 1138 if err := engine.DeleteMeasurement([]byte("gpu")); err != nil { 1139 return err 1140 } 1141 1142 if engine.SeriesIDSet().Contains(seriesIDMap["gpu"]) { 1143 return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu"], "gpu") 1144 } else if engine.SeriesIDSet().Contains(seriesIDMap["gpu,host=b"]) { 1145 return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["gpu,host=b"], "gpu,host=b") 1146 } 1147 delete(seriesIDMap, "gpu") 1148 delete(seriesIDMap, "gpu,host=b") 1149 1150 // Drop the specific mem series 1151 ditr := &seriesIterator{keys: [][]byte{[]byte("mem,host=z")}} 1152 if err := engine.DeleteSeriesRange(ditr, math.MinInt64, math.MaxInt64); err != nil { 1153 return err 1154 } 1155 1156 if engine.SeriesIDSet().Contains(seriesIDMap["mem,host=z"]) { 1157 return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", seriesIDMap["mem,host=z"], "mem,host=z") 1158 } 1159 delete(seriesIDMap, "mem,host=z") 1160 1161 // The rest of the keys should still be in the set. 1162 for key, id := range seriesIDMap { 1163 if !engine.SeriesIDSet().Contains(id) { 1164 return fmt.Errorf("bitmap does not contain ID: %d for key %s, but should", id, key) 1165 } 1166 } 1167 1168 // Reopen the engine, and the series should be re-added to the bitmap. 1169 if err := engine.Reopen(); err != nil { 1170 panic(err) 1171 } 1172 1173 // Check bitset is expected. 1174 expected := tsdb.NewSeriesIDSet() 1175 for _, id := range seriesIDMap { 1176 expected.Add(id) 1177 } 1178 1179 if !engine.SeriesIDSet().Equals(expected) { 1180 return fmt.Errorf("got bitset %s, expected %s", engine.SeriesIDSet().String(), expected.String()) 1181 } 1182 return nil 1183 } 1184 1185 for _, index := range tsdb.RegisteredIndexes() { 1186 t.Run(index, func(t *testing.T) { 1187 if err := test(index); err != nil { 1188 t.Error(err) 1189 } 1190 }) 1191 } 1192} 1193 1194// Ensures that deleting series from TSM files with multiple fields removes all the 1195/// series 1196func TestEngine_DeleteSeries(t *testing.T) { 1197 for _, index := range tsdb.RegisteredIndexes() { 1198 t.Run(index, func(t *testing.T) { 1199 // Create a few points. 1200 p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") 1201 p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") 1202 p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") 1203 1204 e, err := NewEngine(index) 1205 if err != nil { 1206 t.Fatal(err) 1207 } 1208 1209 // mock the planner so compactions don't run during the test 1210 e.CompactionPlan = &mockPlanner{} 1211 if err := e.Open(); err != nil { 1212 t.Fatal(err) 1213 } 1214 defer e.Close() 1215 1216 if err := e.writePoints(p1, p2, p3); err != nil { 1217 t.Fatalf("failed to write points: %s", err.Error()) 1218 } 1219 if err := e.WriteSnapshot(); err != nil { 1220 t.Fatalf("failed to snapshot: %s", err.Error()) 1221 } 1222 1223 keys := e.FileStore.Keys() 1224 if exp, got := 3, len(keys); exp != got { 1225 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1226 } 1227 1228 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} 1229 if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { 1230 t.Fatalf("failed to delete series: %v", err) 1231 } 1232 1233 keys = e.FileStore.Keys() 1234 if exp, got := 1, len(keys); exp != got { 1235 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1236 } 1237 1238 exp := "cpu,host=B#!~#value" 1239 if _, ok := keys[exp]; !ok { 1240 t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) 1241 } 1242 }) 1243 } 1244} 1245 1246func TestEngine_DeleteSeriesRange(t *testing.T) { 1247 for _, index := range tsdb.RegisteredIndexes() { 1248 t.Run(index, func(t *testing.T) { 1249 // Create a few points. 1250 p1 := MustParsePointString("cpu,host=0 value=1.1 6000000000") // Should not be deleted 1251 p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") 1252 p3 := MustParsePointString("cpu,host=A value=1.3 3000000000") 1253 p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") // Should not be deleted 1254 p5 := MustParsePointString("cpu,host=B value=1.3 5000000000") // Should not be deleted 1255 p6 := MustParsePointString("cpu,host=C value=1.3 1000000000") 1256 p7 := MustParsePointString("mem,host=C value=1.3 1000000000") // Should not be deleted 1257 p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted 1258 1259 e, err := NewEngine(index) 1260 if err != nil { 1261 t.Fatal(err) 1262 } 1263 1264 // mock the planner so compactions don't run during the test 1265 e.CompactionPlan = &mockPlanner{} 1266 if err := e.Open(); err != nil { 1267 t.Fatal(err) 1268 } 1269 defer e.Close() 1270 1271 for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { 1272 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 1273 t.Fatalf("create series index error: %v", err) 1274 } 1275 } 1276 1277 if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { 1278 t.Fatalf("failed to write points: %s", err.Error()) 1279 } 1280 if err := e.WriteSnapshot(); err != nil { 1281 t.Fatalf("failed to snapshot: %s", err.Error()) 1282 } 1283 1284 keys := e.FileStore.Keys() 1285 if exp, got := 6, len(keys); exp != got { 1286 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1287 } 1288 1289 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}} 1290 if err := e.DeleteSeriesRange(itr, 0, 3000000000); err != nil { 1291 t.Fatalf("failed to delete series: %v", err) 1292 } 1293 1294 keys = e.FileStore.Keys() 1295 if exp, got := 4, len(keys); exp != got { 1296 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1297 } 1298 1299 exp := "cpu,host=B#!~#value" 1300 if _, ok := keys[exp]; !ok { 1301 t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) 1302 } 1303 1304 // Check that the series still exists in the index 1305 indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1306 iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) 1307 if err != nil { 1308 t.Fatalf("iterator error: %v", err) 1309 } 1310 defer iter.Close() 1311 1312 elem, err := iter.Next() 1313 if err != nil { 1314 t.Fatal(err) 1315 } 1316 if elem.SeriesID == 0 { 1317 t.Fatalf("series index mismatch: EOF, exp 2 series") 1318 } 1319 1320 // Lookup series. 1321 name, tags := e.sfile.Series(elem.SeriesID) 1322 if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { 1323 t.Fatalf("series mismatch: got %s, exp %s", got, exp) 1324 } 1325 1326 if !tags.Equal(models.NewTags(map[string]string{"host": "0"})) && !tags.Equal(models.NewTags(map[string]string{"host": "B"})) { 1327 t.Fatalf(`series mismatch: got %s, exp either "host=0" or "host=B"`, tags) 1328 } 1329 iter.Close() 1330 1331 // Deleting remaining series should remove them from the series. 1332 itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}} 1333 if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { 1334 t.Fatalf("failed to delete series: %v", err) 1335 } 1336 1337 indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1338 if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { 1339 t.Fatalf("iterator error: %v", err) 1340 } 1341 if iter == nil { 1342 return 1343 } 1344 1345 defer iter.Close() 1346 if elem, err = iter.Next(); err != nil { 1347 t.Fatal(err) 1348 } 1349 if elem.SeriesID != 0 { 1350 t.Fatalf("got an undeleted series id, but series should be dropped from index") 1351 } 1352 }) 1353 } 1354} 1355 1356func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) { 1357 for _, index := range tsdb.RegisteredIndexes() { 1358 t.Run(index, func(t *testing.T) { 1359 // Create a few points. 1360 p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted 1361 p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted 1362 p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") 1363 p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") 1364 p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted 1365 p6 := MustParsePointString("mem,host=B value=1.3 1000000000") 1366 p7 := MustParsePointString("mem,host=C value=1.3 1000000000") 1367 p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted 1368 1369 e, err := NewEngine(index) 1370 if err != nil { 1371 t.Fatal(err) 1372 } 1373 1374 // mock the planner so compactions don't run during the test 1375 e.CompactionPlan = &mockPlanner{} 1376 if err := e.Open(); err != nil { 1377 t.Fatal(err) 1378 } 1379 defer e.Close() 1380 1381 for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { 1382 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 1383 t.Fatalf("create series index error: %v", err) 1384 } 1385 } 1386 1387 if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { 1388 t.Fatalf("failed to write points: %s", err.Error()) 1389 } 1390 if err := e.WriteSnapshot(); err != nil { 1391 t.Fatalf("failed to snapshot: %s", err.Error()) 1392 } 1393 1394 keys := e.FileStore.Keys() 1395 if exp, got := 6, len(keys); exp != got { 1396 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1397 } 1398 1399 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} 1400 predicate := func(name []byte, tags models.Tags) (int64, int64, bool) { 1401 if bytes.Equal(name, []byte("mem")) { 1402 return math.MinInt64, math.MaxInt64, true 1403 } 1404 if bytes.Equal(name, []byte("cpu")) { 1405 for _, tag := range tags { 1406 if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) { 1407 return math.MinInt64, math.MaxInt64, true 1408 } 1409 } 1410 } 1411 return math.MinInt64, math.MaxInt64, false 1412 } 1413 if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil { 1414 t.Fatalf("failed to delete series: %v", err) 1415 } 1416 1417 keys = e.FileStore.Keys() 1418 if exp, got := 3, len(keys); exp != got { 1419 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1420 } 1421 1422 exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"} 1423 for _, exp := range exps { 1424 if _, ok := keys[exp]; !ok { 1425 t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys) 1426 } 1427 } 1428 1429 // Check that the series still exists in the index 1430 indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1431 iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) 1432 if err != nil { 1433 t.Fatalf("iterator error: %v", err) 1434 } 1435 defer iter.Close() 1436 1437 elem, err := iter.Next() 1438 if err != nil { 1439 t.Fatal(err) 1440 } 1441 if elem.SeriesID == 0 { 1442 t.Fatalf("series index mismatch: EOF, exp 2 series") 1443 } 1444 1445 // Lookup series. 1446 name, tags := e.sfile.Series(elem.SeriesID) 1447 if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { 1448 t.Fatalf("series mismatch: got %s, exp %s", got, exp) 1449 } 1450 1451 if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) { 1452 t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags) 1453 } 1454 iter.Close() 1455 1456 // Deleting remaining series should remove them from the series. 1457 itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}} 1458 if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { 1459 t.Fatalf("failed to delete series: %v", err) 1460 } 1461 1462 indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1463 if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { 1464 t.Fatalf("iterator error: %v", err) 1465 } 1466 if iter == nil { 1467 return 1468 } 1469 1470 defer iter.Close() 1471 if elem, err = iter.Next(); err != nil { 1472 t.Fatal(err) 1473 } 1474 if elem.SeriesID != 0 { 1475 t.Fatalf("got an undeleted series id, but series should be dropped from index") 1476 } 1477 }) 1478 } 1479} 1480 1481// Tests that a nil predicate deletes all values returned from the series iterator. 1482func TestEngine_DeleteSeriesRangeWithPredicate_Nil(t *testing.T) { 1483 for _, index := range tsdb.RegisteredIndexes() { 1484 t.Run(index, func(t *testing.T) { 1485 // Create a few points. 1486 p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted 1487 p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted 1488 p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") 1489 p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") 1490 p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted 1491 p6 := MustParsePointString("mem,host=B value=1.3 1000000000") 1492 p7 := MustParsePointString("mem,host=C value=1.3 1000000000") 1493 p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted 1494 1495 e, err := NewEngine(index) 1496 if err != nil { 1497 t.Fatal(err) 1498 } 1499 1500 // mock the planner so compactions don't run during the test 1501 e.CompactionPlan = &mockPlanner{} 1502 if err := e.Open(); err != nil { 1503 t.Fatal(err) 1504 } 1505 defer e.Close() 1506 1507 for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { 1508 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 1509 t.Fatalf("create series index error: %v", err) 1510 } 1511 } 1512 1513 if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { 1514 t.Fatalf("failed to write points: %s", err.Error()) 1515 } 1516 if err := e.WriteSnapshot(); err != nil { 1517 t.Fatalf("failed to snapshot: %s", err.Error()) 1518 } 1519 1520 keys := e.FileStore.Keys() 1521 if exp, got := 6, len(keys); exp != got { 1522 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1523 } 1524 1525 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} 1526 if err := e.DeleteSeriesRangeWithPredicate(itr, nil); err != nil { 1527 t.Fatalf("failed to delete series: %v", err) 1528 } 1529 1530 keys = e.FileStore.Keys() 1531 if exp, got := 1, len(keys); exp != got { 1532 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1533 } 1534 1535 // Check that the series still exists in the index 1536 indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1537 iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) 1538 if err != nil { 1539 t.Fatalf("iterator error: %v", err) 1540 } else if iter == nil { 1541 return 1542 } 1543 defer iter.Close() 1544 1545 if elem, err := iter.Next(); err != nil { 1546 t.Fatal(err) 1547 } else if elem.SeriesID != 0 { 1548 t.Fatalf("got an undeleted series id, but series should be dropped from index") 1549 } 1550 1551 // Check that disk series still exists 1552 iter, err = indexSet.MeasurementSeriesIDIterator([]byte("disk")) 1553 if err != nil { 1554 t.Fatalf("iterator error: %v", err) 1555 } else if iter == nil { 1556 return 1557 } 1558 defer iter.Close() 1559 1560 if elem, err := iter.Next(); err != nil { 1561 t.Fatal(err) 1562 } else if elem.SeriesID == 0 { 1563 t.Fatalf("got an undeleted series id, but series should be dropped from index") 1564 } 1565 }) 1566 } 1567} 1568func TestEngine_DeleteSeriesRangeWithPredicate_FlushBatch(t *testing.T) { 1569 for _, index := range tsdb.RegisteredIndexes() { 1570 t.Run(index, func(t *testing.T) { 1571 // Create a few points. 1572 p1 := MustParsePointString("cpu,host=A value=1.1 6000000000") // Should not be deleted 1573 p2 := MustParsePointString("cpu,host=A value=1.2 2000000000") // Should not be deleted 1574 p3 := MustParsePointString("cpu,host=B value=1.3 3000000000") 1575 p4 := MustParsePointString("cpu,host=B value=1.3 4000000000") 1576 p5 := MustParsePointString("cpu,host=C value=1.3 5000000000") // Should not be deleted 1577 p6 := MustParsePointString("mem,host=B value=1.3 1000000000") 1578 p7 := MustParsePointString("mem,host=C value=1.3 1000000000") 1579 p8 := MustParsePointString("disk,host=C value=1.3 1000000000") // Should not be deleted 1580 1581 e, err := NewEngine(index) 1582 if err != nil { 1583 t.Fatal(err) 1584 } 1585 1586 // mock the planner so compactions don't run during the test 1587 e.CompactionPlan = &mockPlanner{} 1588 if err := e.Open(); err != nil { 1589 t.Fatal(err) 1590 } 1591 defer e.Close() 1592 1593 for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} { 1594 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 1595 t.Fatalf("create series index error: %v", err) 1596 } 1597 } 1598 1599 if err := e.WritePoints([]models.Point{p1, p2, p3, p4, p5, p6, p7, p8}); err != nil { 1600 t.Fatalf("failed to write points: %s", err.Error()) 1601 } 1602 if err := e.WriteSnapshot(); err != nil { 1603 t.Fatalf("failed to snapshot: %s", err.Error()) 1604 } 1605 1606 keys := e.FileStore.Keys() 1607 if exp, got := 6, len(keys); exp != got { 1608 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1609 } 1610 1611 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C"), []byte("mem,host=B"), []byte("mem,host=C")}} 1612 predicate := func(name []byte, tags models.Tags) (int64, int64, bool) { 1613 if bytes.Equal(name, []byte("mem")) { 1614 return 1000000000, 1000000000, true 1615 } 1616 1617 if bytes.Equal(name, []byte("cpu")) { 1618 for _, tag := range tags { 1619 if bytes.Equal(tag.Key, []byte("host")) && bytes.Equal(tag.Value, []byte("B")) { 1620 return 3000000000, 4000000000, true 1621 } 1622 } 1623 } 1624 return math.MinInt64, math.MaxInt64, false 1625 } 1626 if err := e.DeleteSeriesRangeWithPredicate(itr, predicate); err != nil { 1627 t.Fatalf("failed to delete series: %v", err) 1628 } 1629 1630 keys = e.FileStore.Keys() 1631 if exp, got := 3, len(keys); exp != got { 1632 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1633 } 1634 1635 exps := []string{"cpu,host=A#!~#value", "cpu,host=C#!~#value", "disk,host=C#!~#value"} 1636 for _, exp := range exps { 1637 if _, ok := keys[exp]; !ok { 1638 t.Fatalf("wrong series deleted: exp %v, got %v", exps, keys) 1639 } 1640 } 1641 1642 // Check that the series still exists in the index 1643 indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1644 iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu")) 1645 if err != nil { 1646 t.Fatalf("iterator error: %v", err) 1647 } 1648 defer iter.Close() 1649 1650 elem, err := iter.Next() 1651 if err != nil { 1652 t.Fatal(err) 1653 } 1654 if elem.SeriesID == 0 { 1655 t.Fatalf("series index mismatch: EOF, exp 2 series") 1656 } 1657 1658 // Lookup series. 1659 name, tags := e.sfile.Series(elem.SeriesID) 1660 if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { 1661 t.Fatalf("series mismatch: got %s, exp %s", got, exp) 1662 } 1663 1664 if !tags.Equal(models.NewTags(map[string]string{"host": "A"})) && !tags.Equal(models.NewTags(map[string]string{"host": "C"})) { 1665 t.Fatalf(`series mismatch: got %s, exp either "host=A" or "host=C"`, tags) 1666 } 1667 iter.Close() 1668 1669 // Deleting remaining series should remove them from the series. 1670 itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=A"), []byte("cpu,host=C")}} 1671 if err := e.DeleteSeriesRange(itr, 0, 9000000000); err != nil { 1672 t.Fatalf("failed to delete series: %v", err) 1673 } 1674 1675 indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} 1676 if iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu")); err != nil { 1677 t.Fatalf("iterator error: %v", err) 1678 } 1679 if iter == nil { 1680 return 1681 } 1682 1683 defer iter.Close() 1684 if elem, err = iter.Next(); err != nil { 1685 t.Fatal(err) 1686 } 1687 if elem.SeriesID != 0 { 1688 t.Fatalf("got an undeleted series id, but series should be dropped from index") 1689 } 1690 }) 1691 } 1692} 1693 1694func TestEngine_DeleteSeriesRange_OutsideTime(t *testing.T) { 1695 for _, index := range tsdb.RegisteredIndexes() { 1696 t.Run(index, func(t *testing.T) { 1697 // Create a few points. 1698 p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") // Should not be deleted 1699 1700 e, err := NewEngine(index) 1701 if err != nil { 1702 t.Fatal(err) 1703 } 1704 1705 // mock the planner so compactions don't run during the test 1706 e.CompactionPlan = &mockPlanner{} 1707 if err := e.Open(); err != nil { 1708 t.Fatal(err) 1709 } 1710 defer e.Close() 1711 1712 for _, p := range []models.Point{p1} { 1713 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 1714 t.Fatalf("create series index error: %v", err) 1715 } 1716 } 1717 1718 if err := e.WritePoints([]models.Point{p1}); err != nil { 1719 t.Fatalf("failed to write points: %s", err.Error()) 1720 } 1721 if err := e.WriteSnapshot(); err != nil { 1722 t.Fatalf("failed to snapshot: %s", err.Error()) 1723 } 1724 1725 keys := e.FileStore.Keys() 1726 if exp, got := 1, len(keys); exp != got { 1727 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1728 } 1729 1730 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} 1731 if err := e.DeleteSeriesRange(itr, 0, 0); err != nil { 1732 t.Fatalf("failed to delete series: %v", err) 1733 } 1734 1735 keys = e.FileStore.Keys() 1736 if exp, got := 1, len(keys); exp != got { 1737 t.Fatalf("series count mismatch: exp %v, got %v", exp, got) 1738 } 1739 1740 exp := "cpu,host=A#!~#value" 1741 if _, ok := keys[exp]; !ok { 1742 t.Fatalf("wrong series deleted: exp %v, got %v", exp, keys) 1743 } 1744 1745 // Check that the series still exists in the index 1746 iter, err := e.index.MeasurementSeriesIDIterator([]byte("cpu")) 1747 if err != nil { 1748 t.Fatalf("iterator error: %v", err) 1749 } 1750 defer iter.Close() 1751 1752 elem, err := iter.Next() 1753 if err != nil { 1754 t.Fatal(err) 1755 } 1756 if elem.SeriesID == 0 { 1757 t.Fatalf("series index mismatch: EOF, exp 1 series") 1758 } 1759 1760 // Lookup series. 1761 name, tags := e.sfile.Series(elem.SeriesID) 1762 if got, exp := name, []byte("cpu"); !bytes.Equal(got, exp) { 1763 t.Fatalf("series mismatch: got %s, exp %s", got, exp) 1764 } 1765 1766 if got, exp := tags, models.NewTags(map[string]string{"host": "A"}); !got.Equal(exp) { 1767 t.Fatalf("series mismatch: got %s, exp %s", got, exp) 1768 } 1769 }) 1770 } 1771} 1772 1773func TestEngine_LastModified(t *testing.T) { 1774 for _, index := range tsdb.RegisteredIndexes() { 1775 t.Run(index, func(t *testing.T) { 1776 // Create a few points. 1777 p1 := MustParsePointString("cpu,host=A value=1.1 1000000000") 1778 p2 := MustParsePointString("cpu,host=B value=1.2 2000000000") 1779 p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000") 1780 1781 e, err := NewEngine(index) 1782 if err != nil { 1783 t.Fatal(err) 1784 } 1785 1786 // mock the planner so compactions don't run during the test 1787 e.CompactionPlan = &mockPlanner{} 1788 e.SetEnabled(false) 1789 if err := e.Open(); err != nil { 1790 t.Fatal(err) 1791 } 1792 defer e.Close() 1793 1794 if err := e.writePoints(p1, p2, p3); err != nil { 1795 t.Fatalf("failed to write points: %s", err.Error()) 1796 } 1797 1798 lm := e.LastModified() 1799 if lm.IsZero() { 1800 t.Fatalf("expected non-zero time, got %v", lm.UTC()) 1801 } 1802 e.SetEnabled(true) 1803 1804 // Artificial sleep added due to filesystems caching the mod time 1805 // of files. This prevents the WAL last modified time from being 1806 // returned and newer than the filestore's mod time. 1807 time.Sleep(2 * time.Second) // Covers most filesystems. 1808 1809 if err := e.WriteSnapshot(); err != nil { 1810 t.Fatalf("failed to snapshot: %s", err.Error()) 1811 } 1812 1813 lm2 := e.LastModified() 1814 1815 if got, exp := lm.Equal(lm2), false; exp != got { 1816 t.Fatalf("expected time change, got %v, exp %v: %s == %s", got, exp, lm.String(), lm2.String()) 1817 } 1818 1819 itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=A")}} 1820 if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { 1821 t.Fatalf("failed to delete series: %v", err) 1822 } 1823 1824 lm3 := e.LastModified() 1825 if got, exp := lm2.Equal(lm3), false; exp != got { 1826 t.Fatalf("expected time change, got %v, exp %v", got, exp) 1827 } 1828 }) 1829 } 1830} 1831 1832func TestEngine_SnapshotsDisabled(t *testing.T) { 1833 sfile := MustOpenSeriesFile() 1834 defer sfile.Close() 1835 1836 // Generate temporary file. 1837 dir, _ := ioutil.TempDir("", "tsm") 1838 walPath := filepath.Join(dir, "wal") 1839 os.MkdirAll(walPath, 0777) 1840 defer os.RemoveAll(dir) 1841 1842 // Create a tsm1 engine. 1843 db := path.Base(dir) 1844 opt := tsdb.NewEngineOptions() 1845 opt.InmemIndex = inmem.NewIndex(db, sfile.SeriesFile) 1846 idx := tsdb.MustOpenIndex(1, db, filepath.Join(dir, "index"), tsdb.NewSeriesIDSet(), sfile.SeriesFile, opt) 1847 defer idx.Close() 1848 1849 e := tsm1.NewEngine(1, idx, dir, walPath, sfile.SeriesFile, opt).(*tsm1.Engine) 1850 1851 // mock the planner so compactions don't run during the test 1852 e.CompactionPlan = &mockPlanner{} 1853 1854 e.SetEnabled(false) 1855 if err := e.Open(); err != nil { 1856 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 1857 } 1858 1859 // Make sure Snapshots are disabled. 1860 e.SetCompactionsEnabled(false) 1861 e.Compactor.DisableSnapshots() 1862 1863 // Writing a snapshot should not fail when the snapshot is empty 1864 // even if snapshots are disabled. 1865 if err := e.WriteSnapshot(); err != nil { 1866 t.Fatalf("failed to snapshot: %s", err.Error()) 1867 } 1868} 1869 1870func TestEngine_ShouldCompactCache(t *testing.T) { 1871 nowTime := time.Now() 1872 1873 e, err := NewEngine(inmem.IndexName) 1874 if err != nil { 1875 t.Fatal(err) 1876 } 1877 1878 // mock the planner so compactions don't run during the test 1879 e.CompactionPlan = &mockPlanner{} 1880 e.SetEnabled(false) 1881 if err := e.Open(); err != nil { 1882 t.Fatalf("failed to open tsm1 engine: %s", err.Error()) 1883 } 1884 defer e.Close() 1885 1886 e.CacheFlushMemorySizeThreshold = 1024 1887 e.CacheFlushWriteColdDuration = time.Minute 1888 1889 if e.ShouldCompactCache(nowTime) { 1890 t.Fatal("nothing written to cache, so should not compact") 1891 } 1892 1893 if err := e.WritePointsString("m,k=v f=3i"); err != nil { 1894 t.Fatal(err) 1895 } 1896 1897 if e.ShouldCompactCache(nowTime) { 1898 t.Fatal("cache size < flush threshold and nothing written to FileStore, so should not compact") 1899 } 1900 1901 if !e.ShouldCompactCache(nowTime.Add(time.Hour)) { 1902 t.Fatal("last compaction was longer than flush write cold threshold, so should compact") 1903 } 1904 1905 e.CacheFlushMemorySizeThreshold = 1 1906 if !e.ShouldCompactCache(nowTime) { 1907 t.Fatal("cache size > flush threshold, so should compact") 1908 } 1909} 1910 1911// Ensure engine can create an ascending cursor for cache and tsm values. 1912func TestEngine_CreateCursor_Ascending(t *testing.T) { 1913 t.Parallel() 1914 1915 for _, index := range tsdb.RegisteredIndexes() { 1916 t.Run(index, func(t *testing.T) { 1917 1918 e := MustOpenEngine(index) 1919 defer e.Close() 1920 1921 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 1922 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 1923 1924 if err := e.WritePointsString( 1925 `cpu,host=A value=1.1 1`, 1926 `cpu,host=A value=1.2 2`, 1927 `cpu,host=A value=1.3 3`, 1928 ); err != nil { 1929 t.Fatalf("failed to write points: %s", err.Error()) 1930 } 1931 e.MustWriteSnapshot() 1932 1933 if err := e.WritePointsString( 1934 `cpu,host=A value=10.1 10`, 1935 `cpu,host=A value=11.2 11`, 1936 `cpu,host=A value=12.3 12`, 1937 ); err != nil { 1938 t.Fatalf("failed to write points: %s", err.Error()) 1939 } 1940 1941 q, err := e.CreateCursorIterator(context.Background()) 1942 if err != nil { 1943 t.Fatal(err) 1944 } 1945 1946 cur, err := q.Next(context.Background(), &tsdb.CursorRequest{ 1947 Name: []byte("cpu"), 1948 Tags: models.ParseTags([]byte("cpu,host=A")), 1949 Field: "value", 1950 Ascending: true, 1951 StartTime: 2, 1952 EndTime: 11, 1953 }) 1954 if err != nil { 1955 t.Fatal(err) 1956 } 1957 defer cur.Close() 1958 1959 fcur := cur.(tsdb.FloatArrayCursor) 1960 a := fcur.Next() 1961 if !cmp.Equal([]int64{2, 3, 10, 11}, a.Timestamps) { 1962 t.Fatal("unexpect timestamps") 1963 } 1964 if !cmp.Equal([]float64{1.2, 1.3, 10.1, 11.2}, a.Values) { 1965 t.Fatal("unexpect timestamps") 1966 } 1967 }) 1968 } 1969} 1970 1971// Ensure engine can create an ascending cursor for tsm values. 1972func TestEngine_CreateCursor_Descending(t *testing.T) { 1973 t.Parallel() 1974 1975 for _, index := range tsdb.RegisteredIndexes() { 1976 t.Run(index, func(t *testing.T) { 1977 1978 e := MustOpenEngine(index) 1979 defer e.Close() 1980 1981 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 1982 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 1983 1984 if err := e.WritePointsString( 1985 `cpu,host=A value=1.1 1`, 1986 `cpu,host=A value=1.2 2`, 1987 `cpu,host=A value=1.3 3`, 1988 ); err != nil { 1989 t.Fatalf("failed to write points: %s", err.Error()) 1990 } 1991 e.MustWriteSnapshot() 1992 1993 if err := e.WritePointsString( 1994 `cpu,host=A value=10.1 10`, 1995 `cpu,host=A value=11.2 11`, 1996 `cpu,host=A value=12.3 12`, 1997 ); err != nil { 1998 t.Fatalf("failed to write points: %s", err.Error()) 1999 } 2000 2001 q, err := e.CreateCursorIterator(context.Background()) 2002 if err != nil { 2003 t.Fatal(err) 2004 } 2005 2006 cur, err := q.Next(context.Background(), &tsdb.CursorRequest{ 2007 Name: []byte("cpu"), 2008 Tags: models.ParseTags([]byte("cpu,host=A")), 2009 Field: "value", 2010 Ascending: false, 2011 StartTime: 2, 2012 EndTime: 11, 2013 }) 2014 if err != nil { 2015 t.Fatal(err) 2016 } 2017 defer cur.Close() 2018 2019 fcur := cur.(tsdb.FloatArrayCursor) 2020 a := fcur.Next() 2021 if !cmp.Equal([]int64{11, 10, 3, 2}, a.Timestamps) { 2022 t.Fatal("unexpect timestamps") 2023 } 2024 if !cmp.Equal([]float64{11.2, 10.1, 1.3, 1.2}, a.Values) { 2025 t.Fatal("unexpect timestamps") 2026 } 2027 }) 2028 } 2029} 2030 2031func makeBlockTypeSlice(n int) []byte { 2032 r := make([]byte, n) 2033 b := tsm1.BlockFloat64 2034 m := tsm1.BlockUnsigned + 1 2035 for i := 0; i < len(r); i++ { 2036 r[i] = b % m 2037 } 2038 return r 2039} 2040 2041var blockType = influxql.Unknown 2042 2043func BenchmarkBlockTypeToInfluxQLDataType(b *testing.B) { 2044 t := makeBlockTypeSlice(1000) 2045 for i := 0; i < b.N; i++ { 2046 for j := 0; j < len(t); j++ { 2047 blockType = tsm1.BlockTypeToInfluxQLDataType(t[j]) 2048 } 2049 } 2050} 2051 2052// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is 2053// is not raised. 2054func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) { 2055 t.Parallel() 2056 2057 for _, index := range tsdb.RegisteredIndexes() { 2058 t.Run(index, func(t *testing.T) { 2059 2060 e := MustOpenEngine(index) 2061 defer e.Close() 2062 2063 var wg sync.WaitGroup 2064 wg.Add(2) 2065 2066 go func() { 2067 defer wg.Done() 2068 for i := 0; i < 1000; i++ { 2069 e.SetCompactionsEnabled(true) 2070 e.SetCompactionsEnabled(false) 2071 } 2072 }() 2073 2074 go func() { 2075 defer wg.Done() 2076 for i := 0; i < 1000; i++ { 2077 e.SetCompactionsEnabled(false) 2078 e.SetCompactionsEnabled(true) 2079 } 2080 }() 2081 2082 done := make(chan struct{}) 2083 go func() { 2084 wg.Wait() 2085 close(done) 2086 }() 2087 2088 // Wait for waitgroup or fail if it takes too long. 2089 select { 2090 case <-time.NewTimer(30 * time.Second).C: 2091 t.Fatalf("timed out after 30 seconds waiting for waitgroup") 2092 case <-done: 2093 } 2094 }) 2095 } 2096 2097} 2098 2099func TestEngine_WritePointsWithContext(t *testing.T) { 2100 // Create a few points. 2101 points := []models.Point{ 2102 MustParsePointString("cpu,host=A value=1.1 1000000000"), 2103 MustParsePointString("cpu,host=B value=1.2,value2=8 2000000000"), 2104 } 2105 2106 expectedPoints, expectedValues := int64(2), int64(3) 2107 2108 for _, index := range tsdb.RegisteredIndexes() { 2109 t.Run(index, func(t *testing.T) { 2110 e := MustOpenEngine(index) 2111 2112 var numPoints, numValues int64 2113 2114 ctx := context.WithValue(context.Background(), tsdb.StatPointsWritten, &numPoints) 2115 ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues) 2116 2117 if err := e.WritePointsWithContext(ctx, points); err != nil { 2118 t.Fatalf("failed to write points: %v", err) 2119 } 2120 2121 if got, expected := numPoints, expectedPoints; got != expected { 2122 t.Fatalf("Expected stats to return %d points; got %d", expected, got) 2123 } 2124 2125 if got, expected := numValues, expectedValues; got != expected { 2126 t.Fatalf("Expected stats to return %d points; got %d", expected, got) 2127 } 2128 }) 2129 } 2130} 2131 2132func TestEngine_WritePoints_TypeConflict(t *testing.T) { 2133 os.Setenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED", "1") 2134 defer os.Unsetenv("INFLUXDB_SERIES_TYPE_CHECK_ENABLED") 2135 2136 for _, index := range tsdb.RegisteredIndexes() { 2137 t.Run(index, func(t *testing.T) { 2138 2139 e := MustOpenEngine(index) 2140 defer e.Close() 2141 2142 if err := e.WritePointsString( 2143 `cpu,host=A value=1.1 1`, 2144 `cpu,host=A value=1i 2`, 2145 ); err == nil { 2146 t.Fatalf("expected field type conflict") 2147 } else if err != tsdb.ErrFieldTypeConflict { 2148 t.Fatalf("error mismatch: got %v, exp %v", err, tsdb.ErrFieldTypeConflict) 2149 } 2150 2151 // Series type should be a float 2152 got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value"))) 2153 if err != nil { 2154 t.Fatalf("unexpected error getting field type: %v", err) 2155 } 2156 2157 if exp := models.Float; got != exp { 2158 t.Fatalf("field type mismatch: got %v, exp %v", got, exp) 2159 } 2160 2161 values := e.Cache.Values([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value"))) 2162 if got, exp := len(values), 1; got != exp { 2163 t.Fatalf("values len mismatch: got %v, exp %v", got, exp) 2164 } 2165 }) 2166 } 2167} 2168 2169func TestEngine_WritePoints_Reload(t *testing.T) { 2170 t.Skip("Disabled until INFLUXDB_SERIES_TYPE_CHECK_ENABLED is enabled by default") 2171 2172 for _, index := range tsdb.RegisteredIndexes() { 2173 t.Run(index, func(t *testing.T) { 2174 2175 e := MustOpenEngine(index) 2176 defer e.Close() 2177 2178 if err := e.WritePointsString( 2179 `cpu,host=A value=1.1 1`, 2180 ); err != nil { 2181 t.Fatalf("expected field type conflict") 2182 } 2183 2184 // Series type should be a float 2185 got, err := e.Type([]byte(tsm1.SeriesFieldKey("cpu,host=A", "value"))) 2186 if err != nil { 2187 t.Fatalf("unexpected error getting field type: %v", err) 2188 } 2189 2190 if exp := models.Float; got != exp { 2191 t.Fatalf("field type mismatch: got %v, exp %v", got, exp) 2192 } 2193 2194 if err := e.WriteSnapshot(); err != nil { 2195 t.Fatalf("unexpected error writing snapshot: %v", err) 2196 } 2197 2198 if err := e.Reopen(); err != nil { 2199 t.Fatalf("unexpected error reopning engine: %v", err) 2200 } 2201 2202 if err := e.WritePointsString( 2203 `cpu,host=A value=1i 1`, 2204 ); err != tsdb.ErrFieldTypeConflict { 2205 t.Fatalf("expected field type conflict: got %v", err) 2206 } 2207 }) 2208 } 2209} 2210 2211func TestEngine_Invalid_UTF8(t *testing.T) { 2212 for _, index := range tsdb.RegisteredIndexes() { 2213 t.Run(index, func(t *testing.T) { 2214 name := []byte{255, 112, 114, 111, 99} // A known invalid UTF-8 string 2215 field := []byte{255, 110, 101, 116} // A known invalid UTF-8 string 2216 p := MustParsePointString(fmt.Sprintf("%s,host=A %s=1.1 6000000000", name, field)) 2217 2218 e, err := NewEngine(index) 2219 if err != nil { 2220 t.Fatal(err) 2221 } 2222 2223 // mock the planner so compactions don't run during the test 2224 e.CompactionPlan = &mockPlanner{} 2225 if err := e.Open(); err != nil { 2226 t.Fatal(err) 2227 } 2228 defer e.Close() 2229 2230 if err := e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()); err != nil { 2231 t.Fatalf("create series index error: %v", err) 2232 } 2233 2234 if err := e.WritePoints([]models.Point{p}); err != nil { 2235 t.Fatalf("failed to write points: %s", err.Error()) 2236 } 2237 2238 // Re-open the engine 2239 if err := e.Reopen(); err != nil { 2240 t.Fatal(err) 2241 } 2242 }) 2243 } 2244} 2245func BenchmarkEngine_WritePoints(b *testing.B) { 2246 batchSizes := []int{10, 100, 1000, 5000, 10000} 2247 for _, sz := range batchSizes { 2248 for _, index := range tsdb.RegisteredIndexes() { 2249 e := MustOpenEngine(index) 2250 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 2251 pp := make([]models.Point, 0, sz) 2252 for i := 0; i < sz; i++ { 2253 p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i)) 2254 pp = append(pp, p) 2255 } 2256 2257 b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) { 2258 b.ReportAllocs() 2259 for i := 0; i < b.N; i++ { 2260 err := e.WritePoints(pp) 2261 if err != nil { 2262 b.Fatal(err) 2263 } 2264 } 2265 }) 2266 e.Close() 2267 } 2268 } 2269} 2270 2271func BenchmarkEngine_WritePoints_Parallel(b *testing.B) { 2272 batchSizes := []int{1000, 5000, 10000, 25000, 50000, 75000, 100000, 200000} 2273 for _, sz := range batchSizes { 2274 for _, index := range tsdb.RegisteredIndexes() { 2275 e := MustOpenEngine(index) 2276 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 2277 2278 cpus := runtime.GOMAXPROCS(0) 2279 pp := make([]models.Point, 0, sz*cpus) 2280 for i := 0; i < sz*cpus; i++ { 2281 p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2,other=%di", i, i)) 2282 pp = append(pp, p) 2283 } 2284 2285 b.Run(fmt.Sprintf("%s_%d", index, sz), func(b *testing.B) { 2286 b.ReportAllocs() 2287 for i := 0; i < b.N; i++ { 2288 var wg sync.WaitGroup 2289 errC := make(chan error) 2290 for i := 0; i < cpus; i++ { 2291 wg.Add(1) 2292 go func(i int) { 2293 defer wg.Done() 2294 from, to := i*sz, (i+1)*sz 2295 err := e.WritePoints(pp[from:to]) 2296 if err != nil { 2297 errC <- err 2298 return 2299 } 2300 }(i) 2301 } 2302 2303 go func() { 2304 wg.Wait() 2305 close(errC) 2306 }() 2307 2308 for err := range errC { 2309 if err != nil { 2310 b.Error(err) 2311 } 2312 } 2313 } 2314 }) 2315 e.Close() 2316 } 2317 } 2318} 2319 2320var benchmarks = []struct { 2321 name string 2322 opt query.IteratorOptions 2323}{ 2324 { 2325 name: "Count", 2326 opt: query.IteratorOptions{ 2327 Expr: influxql.MustParseExpr("count(value)"), 2328 Ascending: true, 2329 StartTime: influxql.MinTime, 2330 EndTime: influxql.MaxTime, 2331 }, 2332 }, 2333 { 2334 name: "First", 2335 opt: query.IteratorOptions{ 2336 Expr: influxql.MustParseExpr("first(value)"), 2337 Ascending: true, 2338 StartTime: influxql.MinTime, 2339 EndTime: influxql.MaxTime, 2340 }, 2341 }, 2342 { 2343 name: "Last", 2344 opt: query.IteratorOptions{ 2345 Expr: influxql.MustParseExpr("last(value)"), 2346 Ascending: true, 2347 StartTime: influxql.MinTime, 2348 EndTime: influxql.MaxTime, 2349 }, 2350 }, 2351 { 2352 name: "Limit", 2353 opt: query.IteratorOptions{ 2354 Expr: influxql.MustParseExpr("value"), 2355 Ascending: true, 2356 StartTime: influxql.MinTime, 2357 EndTime: influxql.MaxTime, 2358 Limit: 10, 2359 }, 2360 }, 2361} 2362 2363var benchmarkVariants = []struct { 2364 name string 2365 modify func(opt query.IteratorOptions) query.IteratorOptions 2366}{ 2367 { 2368 name: "All", 2369 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2370 return opt 2371 }, 2372 }, 2373 { 2374 name: "GroupByTime_1m-1h", 2375 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2376 opt.StartTime = 0 2377 opt.EndTime = int64(time.Hour) - 1 2378 opt.Interval = query.Interval{ 2379 Duration: time.Minute, 2380 } 2381 return opt 2382 }, 2383 }, 2384 { 2385 name: "GroupByTime_1h-1d", 2386 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2387 opt.StartTime = 0 2388 opt.EndTime = int64(24*time.Hour) - 1 2389 opt.Interval = query.Interval{ 2390 Duration: time.Hour, 2391 } 2392 return opt 2393 }, 2394 }, 2395 { 2396 name: "GroupByTime_1m-1d", 2397 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2398 opt.StartTime = 0 2399 opt.EndTime = int64(24*time.Hour) - 1 2400 opt.Interval = query.Interval{ 2401 Duration: time.Minute, 2402 } 2403 return opt 2404 }, 2405 }, 2406 { 2407 name: "GroupByHost", 2408 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2409 opt.Dimensions = []string{"host"} 2410 return opt 2411 }, 2412 }, 2413 { 2414 name: "GroupByHostAndTime_1m-1h", 2415 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2416 opt.Dimensions = []string{"host"} 2417 opt.StartTime = 0 2418 opt.EndTime = int64(time.Hour) - 1 2419 opt.Interval = query.Interval{ 2420 Duration: time.Minute, 2421 } 2422 return opt 2423 }, 2424 }, 2425 { 2426 name: "GroupByHostAndTime_1h-1d", 2427 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2428 opt.Dimensions = []string{"host"} 2429 opt.StartTime = 0 2430 opt.EndTime = int64(24*time.Hour) - 1 2431 opt.Interval = query.Interval{ 2432 Duration: time.Hour, 2433 } 2434 return opt 2435 }, 2436 }, 2437 { 2438 name: "GroupByHostAndTime_1m-1d", 2439 modify: func(opt query.IteratorOptions) query.IteratorOptions { 2440 opt.Dimensions = []string{"host"} 2441 opt.StartTime = 0 2442 opt.EndTime = int64(24*time.Hour) - 1 2443 opt.Interval = query.Interval{ 2444 Duration: time.Hour, 2445 } 2446 return opt 2447 }, 2448 }, 2449} 2450 2451func BenchmarkEngine_CreateIterator(b *testing.B) { 2452 engines := make([]*benchmarkEngine, len(sizes)) 2453 for i, size := range sizes { 2454 engines[i] = MustInitDefaultBenchmarkEngine(size.name, size.sz) 2455 } 2456 2457 for _, tt := range benchmarks { 2458 for _, variant := range benchmarkVariants { 2459 name := tt.name + "_" + variant.name 2460 opt := variant.modify(tt.opt) 2461 b.Run(name, func(b *testing.B) { 2462 for _, e := range engines { 2463 b.Run(e.Name, func(b *testing.B) { 2464 b.ReportAllocs() 2465 for i := 0; i < b.N; i++ { 2466 itr, err := e.CreateIterator(context.Background(), "cpu", opt) 2467 if err != nil { 2468 b.Fatal(err) 2469 } 2470 query.DrainIterator(itr) 2471 } 2472 }) 2473 } 2474 }) 2475 } 2476 } 2477} 2478 2479type benchmarkEngine struct { 2480 *Engine 2481 Name string 2482 PointN int 2483} 2484 2485var ( 2486 hostNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"} 2487 sizes = []struct { 2488 name string 2489 sz int 2490 }{ 2491 {name: "1K", sz: 1000}, 2492 {name: "100K", sz: 100000}, 2493 {name: "1M", sz: 1000000}, 2494 } 2495) 2496 2497// MustInitDefaultBenchmarkEngine creates a new engine using the default index 2498// and fills it with points. Reuses previous engine if the same parameters 2499// were used. 2500func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine { 2501 const batchSize = 1000 2502 if pointN%batchSize != 0 { 2503 panic(fmt.Sprintf("point count (%d) must be a multiple of batch size (%d)", pointN, batchSize)) 2504 } 2505 2506 e := MustOpenEngine(tsdb.DefaultIndex) 2507 2508 // Initialize metadata. 2509 e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) 2510 e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) 2511 2512 // Generate time ascending points with jitterred time & value. 2513 rand := rand.New(rand.NewSource(0)) 2514 for i := 0; i < pointN; i += batchSize { 2515 var buf bytes.Buffer 2516 for j := 0; j < batchSize; j++ { 2517 fmt.Fprintf(&buf, "cpu,host=%s value=%d %d", 2518 hostNames[j%len(hostNames)], 2519 100+rand.Intn(50)-25, 2520 (time.Duration(i+j)*time.Second)+(time.Duration(rand.Intn(500)-250)*time.Millisecond), 2521 ) 2522 if j != pointN-1 { 2523 fmt.Fprint(&buf, "\n") 2524 } 2525 } 2526 2527 if err := e.WritePointsString(buf.String()); err != nil { 2528 panic(err) 2529 } 2530 } 2531 2532 if err := e.WriteSnapshot(); err != nil { 2533 panic(err) 2534 } 2535 2536 // Force garbage collection. 2537 runtime.GC() 2538 2539 // Save engine reference for reuse. 2540 return &benchmarkEngine{ 2541 Engine: e, 2542 Name: name, 2543 PointN: pointN, 2544 } 2545} 2546 2547// Engine is a test wrapper for tsm1.Engine. 2548type Engine struct { 2549 *tsm1.Engine 2550 root string 2551 indexPath string 2552 indexType string 2553 index tsdb.Index 2554 sfile *tsdb.SeriesFile 2555} 2556 2557// NewEngine returns a new instance of Engine at a temporary location. 2558func NewEngine(index string) (*Engine, error) { 2559 root, err := ioutil.TempDir("", "tsm1-") 2560 if err != nil { 2561 panic(err) 2562 } 2563 2564 db := "db0" 2565 dbPath := filepath.Join(root, "data", db) 2566 2567 if err := os.MkdirAll(dbPath, os.ModePerm); err != nil { 2568 return nil, err 2569 } 2570 2571 // Setup series file. 2572 sfile := tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory)) 2573 sfile.Logger = logger.New(os.Stdout) 2574 if err = sfile.Open(); err != nil { 2575 return nil, err 2576 } 2577 2578 opt := tsdb.NewEngineOptions() 2579 opt.IndexVersion = index 2580 if index == tsdb.InmemIndexName { 2581 opt.InmemIndex = inmem.NewIndex(db, sfile) 2582 } 2583 // Initialise series id sets. Need to do this as it's normally done at the 2584 // store level. 2585 seriesIDs := tsdb.NewSeriesIDSet() 2586 opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDs}) 2587 2588 idxPath := filepath.Join(dbPath, "index") 2589 idx := tsdb.MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt) 2590 2591 tsm1Engine := tsm1.NewEngine(1, idx, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine) 2592 2593 return &Engine{ 2594 Engine: tsm1Engine, 2595 root: root, 2596 indexPath: idxPath, 2597 indexType: index, 2598 index: idx, 2599 sfile: sfile, 2600 }, nil 2601} 2602 2603// MustOpenEngine returns a new, open instance of Engine. 2604func MustOpenEngine(index string) *Engine { 2605 e, err := NewEngine(index) 2606 if err != nil { 2607 panic(err) 2608 } 2609 2610 if err := e.Open(); err != nil { 2611 panic(err) 2612 } 2613 return e 2614} 2615 2616// Close closes the engine and removes all underlying data. 2617func (e *Engine) Close() error { 2618 return e.close(true) 2619} 2620 2621func (e *Engine) close(cleanup bool) error { 2622 if e.index != nil { 2623 e.index.Close() 2624 } 2625 2626 if e.sfile != nil { 2627 e.sfile.Close() 2628 } 2629 2630 defer func() { 2631 if cleanup { 2632 os.RemoveAll(e.root) 2633 } 2634 }() 2635 return e.Engine.Close() 2636} 2637 2638// Reopen closes and reopens the engine. 2639func (e *Engine) Reopen() error { 2640 // Close engine without removing underlying engine data. 2641 if err := e.close(false); err != nil { 2642 return err 2643 } 2644 2645 // Re-open series file. Must create a new series file using the same data. 2646 e.sfile = tsdb.NewSeriesFile(e.sfile.Path()) 2647 if err := e.sfile.Open(); err != nil { 2648 return err 2649 } 2650 2651 db := path.Base(e.root) 2652 opt := tsdb.NewEngineOptions() 2653 opt.InmemIndex = inmem.NewIndex(db, e.sfile) 2654 2655 // Re-initialise the series id set 2656 seriesIDSet := tsdb.NewSeriesIDSet() 2657 opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDSet}) 2658 2659 // Re-open index. 2660 e.index = tsdb.MustOpenIndex(1, db, e.indexPath, seriesIDSet, e.sfile, opt) 2661 2662 // Re-initialize engine. 2663 e.Engine = tsm1.NewEngine(1, e.index, filepath.Join(e.root, "data"), filepath.Join(e.root, "wal"), e.sfile, opt).(*tsm1.Engine) 2664 2665 // Reopen engine 2666 if err := e.Engine.Open(); err != nil { 2667 return err 2668 } 2669 2670 // Reload series data into index (no-op on TSI). 2671 return e.LoadMetadataIndex(1, e.index) 2672} 2673 2674// SeriesIDSet provides access to the underlying series id bitset in the engine's 2675// index. It will panic if the underlying index does not have a SeriesIDSet 2676// method. 2677func (e *Engine) SeriesIDSet() *tsdb.SeriesIDSet { 2678 return e.index.SeriesIDSet() 2679} 2680 2681// AddSeries adds the provided series data to the index and writes a point to 2682// the engine with default values for a field and a time of now. 2683func (e *Engine) AddSeries(name string, tags map[string]string) error { 2684 point, err := models.NewPoint(name, models.NewTags(tags), models.Fields{"v": 1.0}, time.Now()) 2685 if err != nil { 2686 return err 2687 } 2688 return e.writePoints(point) 2689} 2690 2691// WritePointsString calls WritePointsString on the underlying engine, but also 2692// adds the associated series to the index. 2693func (e *Engine) WritePointsString(ptstr ...string) error { 2694 points, err := models.ParsePointsString(strings.Join(ptstr, "\n")) 2695 if err != nil { 2696 return err 2697 } 2698 return e.writePoints(points...) 2699} 2700 2701// writePoints adds the series for the provided points to the index, and writes 2702// the point data to the engine. 2703func (e *Engine) writePoints(points ...models.Point) error { 2704 for _, point := range points { 2705 // Write into the index. 2706 if err := e.Engine.CreateSeriesIfNotExists(point.Key(), point.Name(), point.Tags()); err != nil { 2707 return err 2708 } 2709 } 2710 // Write the points into the cache/wal. 2711 return e.WritePoints(points) 2712} 2713 2714// MustAddSeries calls AddSeries, panicking if there is an error. 2715func (e *Engine) MustAddSeries(name string, tags map[string]string) { 2716 if err := e.AddSeries(name, tags); err != nil { 2717 panic(err) 2718 } 2719} 2720 2721// MustWriteSnapshot forces a snapshot of the engine. Panic on error. 2722func (e *Engine) MustWriteSnapshot() { 2723 if err := e.WriteSnapshot(); err != nil { 2724 panic(err) 2725 } 2726} 2727 2728// SeriesFile is a test wrapper for tsdb.SeriesFile. 2729type SeriesFile struct { 2730 *tsdb.SeriesFile 2731} 2732 2733// NewSeriesFile returns a new instance of SeriesFile with a temporary file path. 2734func NewSeriesFile() *SeriesFile { 2735 dir, err := ioutil.TempDir("", "tsdb-series-file-") 2736 if err != nil { 2737 panic(err) 2738 } 2739 return &SeriesFile{SeriesFile: tsdb.NewSeriesFile(dir)} 2740} 2741 2742// MustOpenSeriesFile returns a new, open instance of SeriesFile. Panic on error. 2743func MustOpenSeriesFile() *SeriesFile { 2744 f := NewSeriesFile() 2745 if err := f.Open(); err != nil { 2746 panic(err) 2747 } 2748 return f 2749} 2750 2751// Close closes the log file and removes it from disk. 2752func (f *SeriesFile) Close() { 2753 defer os.RemoveAll(f.Path()) 2754 if err := f.SeriesFile.Close(); err != nil { 2755 panic(err) 2756 } 2757} 2758 2759// MustParsePointsString parses points from a string. Panic on error. 2760func MustParsePointsString(buf string) []models.Point { 2761 a, err := models.ParsePointsString(buf) 2762 if err != nil { 2763 panic(err) 2764 } 2765 return a 2766} 2767 2768// MustParsePointString parses the first point from a string. Panic on error. 2769func MustParsePointString(buf string) models.Point { return MustParsePointsString(buf)[0] } 2770 2771type mockPlanner struct{} 2772 2773func (m *mockPlanner) Plan(lastWrite time.Time) []tsm1.CompactionGroup { return nil } 2774func (m *mockPlanner) PlanLevel(level int) []tsm1.CompactionGroup { return nil } 2775func (m *mockPlanner) PlanOptimize() []tsm1.CompactionGroup { return nil } 2776func (m *mockPlanner) Release(groups []tsm1.CompactionGroup) {} 2777func (m *mockPlanner) FullyCompacted() bool { return false } 2778func (m *mockPlanner) ForceFull() {} 2779func (m *mockPlanner) SetFileStore(fs *tsm1.FileStore) {} 2780 2781// ParseTags returns an instance of Tags for a comma-delimited list of key/values. 2782func ParseTags(s string) query.Tags { 2783 m := make(map[string]string) 2784 for _, kv := range strings.Split(s, ",") { 2785 a := strings.Split(kv, "=") 2786 m[a[0]] = a[1] 2787 } 2788 return query.NewTags(m) 2789} 2790 2791type seriesIterator struct { 2792 keys [][]byte 2793} 2794 2795type series struct { 2796 name []byte 2797 tags models.Tags 2798 deleted bool 2799} 2800 2801func (s series) Name() []byte { return s.name } 2802func (s series) Tags() models.Tags { return s.tags } 2803func (s series) Deleted() bool { return s.deleted } 2804func (s series) Expr() influxql.Expr { return nil } 2805 2806func (itr *seriesIterator) Close() error { return nil } 2807 2808func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) { 2809 if len(itr.keys) == 0 { 2810 return nil, nil 2811 } 2812 name, tags := models.ParseKeyBytes(itr.keys[0]) 2813 s := series{name: name, tags: tags} 2814 itr.keys = itr.keys[1:] 2815 return s, nil 2816} 2817 2818type seriesIDSets []*tsdb.SeriesIDSet 2819 2820func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error { 2821 for _, v := range a { 2822 f(v) 2823 } 2824 return nil 2825} 2826