1package tsm1_test 2 3import ( 4 "fmt" 5 "io" 6 "os" 7 "reflect" 8 "testing" 9 10 "github.com/golang/snappy" 11 "github.com/influxdata/influxdb/pkg/slices" 12 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 13) 14 15func TestWALWriter_WriteMulti_Single(t *testing.T) { 16 dir := MustTempDir() 17 defer os.RemoveAll(dir) 18 f := MustTempFile(dir) 19 w := tsm1.NewWALSegmentWriter(f) 20 21 p1 := tsm1.NewValue(1, 1.1) 22 p2 := tsm1.NewValue(1, int64(1)) 23 p3 := tsm1.NewValue(1, true) 24 p4 := tsm1.NewValue(1, "string") 25 p5 := tsm1.NewValue(1, ^uint64(0)) 26 27 values := map[string][]tsm1.Value{ 28 "cpu,host=A#!~#float": []tsm1.Value{p1}, 29 "cpu,host=A#!~#int": []tsm1.Value{p2}, 30 "cpu,host=A#!~#bool": []tsm1.Value{p3}, 31 "cpu,host=A#!~#string": []tsm1.Value{p4}, 32 "cpu,host=A#!~#unsigned": []tsm1.Value{p5}, 33 } 34 35 entry := &tsm1.WriteWALEntry{ 36 Values: values, 37 } 38 39 if err := w.Write(mustMarshalEntry(entry)); err != nil { 40 fatal(t, "write points", err) 41 } 42 43 if err := w.Flush(); err != nil { 44 fatal(t, "flush", err) 45 } 46 47 if _, err := f.Seek(0, io.SeekStart); err != nil { 48 fatal(t, "seek", err) 49 } 50 51 r := tsm1.NewWALSegmentReader(f) 52 53 if !r.Next() { 54 t.Fatalf("expected next, got false") 55 } 56 57 we, err := r.Read() 58 if err != nil { 59 fatal(t, "read entry", err) 60 } 61 62 e, ok := we.(*tsm1.WriteWALEntry) 63 if !ok { 64 t.Fatalf("expected WriteWALEntry: got %#v", e) 65 } 66 67 for k, v := range e.Values { 68 for i, vv := range v { 69 if got, exp := vv.String(), values[k][i].String(); got != exp { 70 t.Fatalf("points mismatch: got %v, exp %v", got, exp) 71 } 72 } 73 } 74 75 if n := r.Count(); n != MustReadFileSize(f) { 76 t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) 77 } 78} 79 80func TestWALWriter_WriteMulti_LargeBatch(t *testing.T) { 81 dir := MustTempDir() 82 defer os.RemoveAll(dir) 83 f := MustTempFile(dir) 84 w := tsm1.NewWALSegmentWriter(f) 85 86 var points []tsm1.Value 87 for i := 0; i < 100000; i++ { 88 points = append(points, tsm1.NewValue(int64(i), int64(1))) 89 } 90 91 values := map[string][]tsm1.Value{ 92 "cpu,host=A,server=01,foo=bar,tag=really-long#!~#float": points, 93 "mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points, 94 } 95 96 entry := &tsm1.WriteWALEntry{ 97 Values: values, 98 } 99 100 if err := w.Write(mustMarshalEntry(entry)); err != nil { 101 fatal(t, "write points", err) 102 } 103 104 if err := w.Flush(); err != nil { 105 fatal(t, "flush", err) 106 } 107 108 if _, err := f.Seek(0, io.SeekStart); err != nil { 109 fatal(t, "seek", err) 110 } 111 112 r := tsm1.NewWALSegmentReader(f) 113 114 if !r.Next() { 115 t.Fatalf("expected next, got false") 116 } 117 118 we, err := r.Read() 119 if err != nil { 120 fatal(t, "read entry", err) 121 } 122 123 e, ok := we.(*tsm1.WriteWALEntry) 124 if !ok { 125 t.Fatalf("expected WriteWALEntry: got %#v", e) 126 } 127 128 for k, v := range e.Values { 129 for i, vv := range v { 130 if got, exp := vv.String(), values[k][i].String(); got != exp { 131 t.Fatalf("points mismatch: got %v, exp %v", got, exp) 132 } 133 } 134 } 135 136 if n := r.Count(); n != MustReadFileSize(f) { 137 t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) 138 } 139} 140func TestWALWriter_WriteMulti_Multiple(t *testing.T) { 141 dir := MustTempDir() 142 defer os.RemoveAll(dir) 143 f := MustTempFile(dir) 144 w := tsm1.NewWALSegmentWriter(f) 145 146 p1 := tsm1.NewValue(1, int64(1)) 147 p2 := tsm1.NewValue(1, int64(2)) 148 149 exp := []struct { 150 key string 151 values []tsm1.Value 152 }{ 153 {"cpu,host=A#!~#value", []tsm1.Value{p1}}, 154 {"cpu,host=B#!~#value", []tsm1.Value{p2}}, 155 } 156 157 for _, v := range exp { 158 entry := &tsm1.WriteWALEntry{ 159 Values: map[string][]tsm1.Value{v.key: v.values}, 160 } 161 162 if err := w.Write(mustMarshalEntry(entry)); err != nil { 163 fatal(t, "write points", err) 164 } 165 if err := w.Flush(); err != nil { 166 fatal(t, "flush", err) 167 } 168 } 169 170 // Seek back to the beinning of the file for reading 171 if _, err := f.Seek(0, io.SeekStart); err != nil { 172 fatal(t, "seek", err) 173 } 174 175 r := tsm1.NewWALSegmentReader(f) 176 177 for _, ep := range exp { 178 if !r.Next() { 179 t.Fatalf("expected next, got false") 180 } 181 182 we, err := r.Read() 183 if err != nil { 184 fatal(t, "read entry", err) 185 } 186 187 e, ok := we.(*tsm1.WriteWALEntry) 188 if !ok { 189 t.Fatalf("expected WriteWALEntry: got %#v", e) 190 } 191 192 for k, v := range e.Values { 193 if got, exp := k, ep.key; got != exp { 194 t.Fatalf("key mismatch. got %v, exp %v", got, exp) 195 } 196 197 if got, exp := len(v), len(ep.values); got != exp { 198 t.Fatalf("values length mismatch: got %v, exp %v", got, exp) 199 } 200 201 for i, vv := range v { 202 if got, exp := vv.String(), ep.values[i].String(); got != exp { 203 t.Fatalf("points mismatch: got %v, exp %v", got, exp) 204 } 205 } 206 } 207 } 208 209 if n := r.Count(); n != MustReadFileSize(f) { 210 t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) 211 } 212} 213 214func TestWALWriter_WriteDelete_Single(t *testing.T) { 215 dir := MustTempDir() 216 defer os.RemoveAll(dir) 217 f := MustTempFile(dir) 218 w := tsm1.NewWALSegmentWriter(f) 219 220 entry := &tsm1.DeleteWALEntry{ 221 Keys: [][]byte{[]byte("cpu")}, 222 } 223 224 if err := w.Write(mustMarshalEntry(entry)); err != nil { 225 fatal(t, "write points", err) 226 } 227 228 if err := w.Flush(); err != nil { 229 fatal(t, "flush", err) 230 } 231 232 if _, err := f.Seek(0, io.SeekStart); err != nil { 233 fatal(t, "seek", err) 234 } 235 236 r := tsm1.NewWALSegmentReader(f) 237 238 if !r.Next() { 239 t.Fatalf("expected next, got false") 240 } 241 242 we, err := r.Read() 243 if err != nil { 244 fatal(t, "read entry", err) 245 } 246 247 e, ok := we.(*tsm1.DeleteWALEntry) 248 if !ok { 249 t.Fatalf("expected WriteWALEntry: got %#v", e) 250 } 251 252 if got, exp := len(e.Keys), len(entry.Keys); got != exp { 253 t.Fatalf("key length mismatch: got %v, exp %v", got, exp) 254 } 255 256 if got, exp := string(e.Keys[0]), string(entry.Keys[0]); got != exp { 257 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 258 } 259} 260 261func TestWALWriter_WriteMultiDelete_Multiple(t *testing.T) { 262 dir := MustTempDir() 263 defer os.RemoveAll(dir) 264 f := MustTempFile(dir) 265 w := tsm1.NewWALSegmentWriter(f) 266 267 p1 := tsm1.NewValue(1, true) 268 values := map[string][]tsm1.Value{ 269 "cpu,host=A#!~#value": []tsm1.Value{p1}, 270 } 271 272 writeEntry := &tsm1.WriteWALEntry{ 273 Values: values, 274 } 275 276 if err := w.Write(mustMarshalEntry(writeEntry)); err != nil { 277 fatal(t, "write points", err) 278 } 279 280 if err := w.Flush(); err != nil { 281 fatal(t, "flush", err) 282 } 283 284 // Write the delete entry 285 deleteEntry := &tsm1.DeleteWALEntry{ 286 Keys: [][]byte{[]byte("cpu,host=A#!~value")}, 287 } 288 289 if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil { 290 fatal(t, "write points", err) 291 } 292 293 if err := w.Flush(); err != nil { 294 fatal(t, "flush", err) 295 } 296 297 // Seek back to the beinning of the file for reading 298 if _, err := f.Seek(0, io.SeekStart); err != nil { 299 fatal(t, "seek", err) 300 } 301 302 r := tsm1.NewWALSegmentReader(f) 303 304 // Read the write points first 305 if !r.Next() { 306 t.Fatalf("expected next, got false") 307 } 308 309 we, err := r.Read() 310 if err != nil { 311 fatal(t, "read entry", err) 312 } 313 314 e, ok := we.(*tsm1.WriteWALEntry) 315 if !ok { 316 t.Fatalf("expected WriteWALEntry: got %#v", e) 317 } 318 319 for k, v := range e.Values { 320 if got, exp := len(v), len(values[k]); got != exp { 321 t.Fatalf("values length mismatch: got %v, exp %v", got, exp) 322 } 323 324 for i, vv := range v { 325 if got, exp := vv.String(), values[k][i].String(); got != exp { 326 t.Fatalf("points mismatch: got %v, exp %v", got, exp) 327 } 328 } 329 } 330 331 // Read the delete second 332 if !r.Next() { 333 t.Fatalf("expected next, got false") 334 } 335 336 we, err = r.Read() 337 if err != nil { 338 fatal(t, "read entry", err) 339 } 340 341 de, ok := we.(*tsm1.DeleteWALEntry) 342 if !ok { 343 t.Fatalf("expected DeleteWALEntry: got %#v", e) 344 } 345 346 if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp { 347 t.Fatalf("key length mismatch: got %v, exp %v", got, exp) 348 } 349 350 if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp { 351 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 352 } 353} 354 355func TestWALWriter_WriteMultiDeleteRange_Multiple(t *testing.T) { 356 dir := MustTempDir() 357 defer os.RemoveAll(dir) 358 f := MustTempFile(dir) 359 w := tsm1.NewWALSegmentWriter(f) 360 361 p1 := tsm1.NewValue(1, 1.0) 362 p2 := tsm1.NewValue(2, 2.0) 363 p3 := tsm1.NewValue(3, 3.0) 364 365 values := map[string][]tsm1.Value{ 366 "cpu,host=A#!~#value": []tsm1.Value{p1, p2, p3}, 367 } 368 369 writeEntry := &tsm1.WriteWALEntry{ 370 Values: values, 371 } 372 373 if err := w.Write(mustMarshalEntry(writeEntry)); err != nil { 374 fatal(t, "write points", err) 375 } 376 377 if err := w.Flush(); err != nil { 378 fatal(t, "flush", err) 379 } 380 381 // Write the delete entry 382 deleteEntry := &tsm1.DeleteRangeWALEntry{ 383 Keys: [][]byte{[]byte("cpu,host=A#!~value")}, 384 Min: 2, 385 Max: 3, 386 } 387 388 if err := w.Write(mustMarshalEntry(deleteEntry)); err != nil { 389 fatal(t, "write points", err) 390 } 391 392 if err := w.Flush(); err != nil { 393 fatal(t, "flush", err) 394 } 395 396 // Seek back to the beinning of the file for reading 397 if _, err := f.Seek(0, io.SeekStart); err != nil { 398 fatal(t, "seek", err) 399 } 400 401 r := tsm1.NewWALSegmentReader(f) 402 403 // Read the write points first 404 if !r.Next() { 405 t.Fatalf("expected next, got false") 406 } 407 408 we, err := r.Read() 409 if err != nil { 410 fatal(t, "read entry", err) 411 } 412 413 e, ok := we.(*tsm1.WriteWALEntry) 414 if !ok { 415 t.Fatalf("expected WriteWALEntry: got %#v", e) 416 } 417 418 for k, v := range e.Values { 419 if got, exp := len(v), len(values[k]); got != exp { 420 t.Fatalf("values length mismatch: got %v, exp %v", got, exp) 421 } 422 423 for i, vv := range v { 424 if got, exp := vv.String(), values[k][i].String(); got != exp { 425 t.Fatalf("points mismatch: got %v, exp %v", got, exp) 426 } 427 } 428 } 429 430 // Read the delete second 431 if !r.Next() { 432 t.Fatalf("expected next, got false") 433 } 434 435 we, err = r.Read() 436 if err != nil { 437 fatal(t, "read entry", err) 438 } 439 440 de, ok := we.(*tsm1.DeleteRangeWALEntry) 441 if !ok { 442 t.Fatalf("expected DeleteWALEntry: got %#v", e) 443 } 444 445 if got, exp := len(de.Keys), len(deleteEntry.Keys); got != exp { 446 t.Fatalf("key length mismatch: got %v, exp %v", got, exp) 447 } 448 449 if got, exp := string(de.Keys[0]), string(deleteEntry.Keys[0]); got != exp { 450 t.Fatalf("key mismatch: got %v, exp %v", got, exp) 451 } 452 453 if got, exp := de.Min, int64(2); got != exp { 454 t.Fatalf("min time mismatch: got %v, exp %v", got, exp) 455 } 456 457 if got, exp := de.Max, int64(3); got != exp { 458 t.Fatalf("min time mismatch: got %v, exp %v", got, exp) 459 } 460 461} 462 463func TestWAL_ClosedSegments(t *testing.T) { 464 dir := MustTempDir() 465 defer os.RemoveAll(dir) 466 467 w := tsm1.NewWAL(dir) 468 if err := w.Open(); err != nil { 469 t.Fatalf("error opening WAL: %v", err) 470 } 471 472 files, err := w.ClosedSegments() 473 if err != nil { 474 t.Fatalf("error getting closed segments: %v", err) 475 } 476 477 if got, exp := len(files), 0; got != exp { 478 t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) 479 } 480 481 if _, err := w.WriteMulti(map[string][]tsm1.Value{ 482 "cpu,host=A#!~#value": []tsm1.Value{ 483 tsm1.NewValue(1, 1.1), 484 }, 485 }); err != nil { 486 t.Fatalf("error writing points: %v", err) 487 } 488 489 if err := w.Close(); err != nil { 490 t.Fatalf("error closing wal: %v", err) 491 } 492 493 // Re-open the WAL 494 w = tsm1.NewWAL(dir) 495 defer w.Close() 496 if err := w.Open(); err != nil { 497 t.Fatalf("error opening WAL: %v", err) 498 } 499 500 files, err = w.ClosedSegments() 501 if err != nil { 502 t.Fatalf("error getting closed segments: %v", err) 503 } 504 if got, exp := len(files), 0; got != exp { 505 t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) 506 } 507} 508 509func TestWAL_Delete(t *testing.T) { 510 dir := MustTempDir() 511 defer os.RemoveAll(dir) 512 513 w := tsm1.NewWAL(dir) 514 if err := w.Open(); err != nil { 515 t.Fatalf("error opening WAL: %v", err) 516 } 517 518 files, err := w.ClosedSegments() 519 if err != nil { 520 t.Fatalf("error getting closed segments: %v", err) 521 } 522 523 if got, exp := len(files), 0; got != exp { 524 t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) 525 } 526 527 if _, err := w.Delete([][]byte{[]byte("cpu")}); err != nil { 528 t.Fatalf("error writing points: %v", err) 529 } 530 531 if err := w.Close(); err != nil { 532 t.Fatalf("error closing wal: %v", err) 533 } 534 535 // Re-open the WAL 536 w = tsm1.NewWAL(dir) 537 defer w.Close() 538 if err := w.Open(); err != nil { 539 t.Fatalf("error opening WAL: %v", err) 540 } 541 542 files, err = w.ClosedSegments() 543 if err != nil { 544 t.Fatalf("error getting closed segments: %v", err) 545 } 546 if got, exp := len(files), 0; got != exp { 547 t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) 548 } 549} 550 551func TestWALWriter_Corrupt(t *testing.T) { 552 dir := MustTempDir() 553 defer os.RemoveAll(dir) 554 f := MustTempFile(dir) 555 w := tsm1.NewWALSegmentWriter(f) 556 corruption := []byte{1, 4, 0, 0, 0} 557 558 p1 := tsm1.NewValue(1, 1.1) 559 values := map[string][]tsm1.Value{ 560 "cpu,host=A#!~#float": []tsm1.Value{p1}, 561 } 562 563 entry := &tsm1.WriteWALEntry{ 564 Values: values, 565 } 566 if err := w.Write(mustMarshalEntry(entry)); err != nil { 567 fatal(t, "write points", err) 568 } 569 570 if err := w.Flush(); err != nil { 571 fatal(t, "flush", err) 572 } 573 574 // Write some random bytes to the file to simulate corruption. 575 if _, err := f.Write(corruption); err != nil { 576 fatal(t, "corrupt WAL segment", err) 577 } 578 579 // Create the WAL segment reader. 580 if _, err := f.Seek(0, io.SeekStart); err != nil { 581 fatal(t, "seek", err) 582 } 583 r := tsm1.NewWALSegmentReader(f) 584 585 // Try to decode two entries. 586 587 if !r.Next() { 588 t.Fatalf("expected next, got false") 589 } 590 if _, err := r.Read(); err != nil { 591 fatal(t, "read entry", err) 592 } 593 594 if !r.Next() { 595 t.Fatalf("expected next, got false") 596 } 597 if _, err := r.Read(); err == nil { 598 fatal(t, "read entry did not return err", nil) 599 } 600 601 // Count should only return size of valid data. 602 expCount := MustReadFileSize(f) - int64(len(corruption)) 603 if n := r.Count(); n != expCount { 604 t.Fatalf("wrong count of bytes read, got %d, exp %d", n, expCount) 605 } 606} 607 608// Reproduces a `panic: runtime error: makeslice: cap out of range` when run with 609// GOARCH=386 go test -run TestWALSegmentReader_Corrupt -v ./tsdb/engine/tsm1/ 610func TestWALSegmentReader_Corrupt(t *testing.T) { 611 dir := MustTempDir() 612 defer os.RemoveAll(dir) 613 f := MustTempFile(dir) 614 w := tsm1.NewWALSegmentWriter(f) 615 616 p4 := tsm1.NewValue(1, "string") 617 618 values := map[string][]tsm1.Value{ 619 "cpu,host=A#!~#string": []tsm1.Value{p4, p4}, 620 } 621 622 entry := &tsm1.WriteWALEntry{ 623 Values: values, 624 } 625 626 typ, b := mustMarshalEntry(entry) 627 628 // This causes the nvals field to overflow on 32 bit systems which produces a 629 // negative count and a panic when reading the segment. 630 b[25] = 255 631 632 if err := w.Write(typ, b); err != nil { 633 fatal(t, "write points", err) 634 } 635 636 if err := w.Flush(); err != nil { 637 fatal(t, "flush", err) 638 } 639 640 // Create the WAL segment reader. 641 if _, err := f.Seek(0, io.SeekStart); err != nil { 642 fatal(t, "seek", err) 643 } 644 645 r := tsm1.NewWALSegmentReader(f) 646 defer r.Close() 647 648 // Try to decode two entries. 649 for r.Next() { 650 r.Read() 651 } 652} 653 654func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { 655 p1 := tsm1.NewValue(1, 1.1) 656 p2 := tsm1.NewValue(1, int64(1)) 657 p3 := tsm1.NewValue(1, true) 658 p4 := tsm1.NewValue(1, "string") 659 p5 := tsm1.NewValue(1, uint64(1)) 660 661 values := map[string][]tsm1.Value{ 662 "cpu,host=A#!~#float": []tsm1.Value{p1, p1}, 663 "cpu,host=A#!~#int": []tsm1.Value{p2, p2}, 664 "cpu,host=A#!~#bool": []tsm1.Value{p3, p3}, 665 "cpu,host=A#!~#string": []tsm1.Value{p4, p4}, 666 "cpu,host=A#!~#unsigned": []tsm1.Value{p5, p5}, 667 } 668 669 w := &tsm1.WriteWALEntry{ 670 Values: values, 671 } 672 673 b, err := w.MarshalBinary() 674 if err != nil { 675 t.Fatalf("unexpected error, got %v", err) 676 } 677 678 // Test every possible truncation of a write WAL entry 679 for i := 0; i < len(b); i++ { 680 // re-allocated to ensure capacity would be exceed if slicing 681 truncated := make([]byte, i) 682 copy(truncated, b[:i]) 683 err := w.UnmarshalBinary(truncated) 684 if err != nil && err != tsm1.ErrWALCorrupt { 685 t.Fatalf("unexpected error: %v", err) 686 } 687 } 688} 689 690func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) { 691 examples := []struct { 692 In []string 693 Out [][]byte 694 }{ 695 { 696 In: []string{""}, 697 Out: nil, 698 }, 699 { 700 In: []string{"foo"}, 701 Out: [][]byte{[]byte("foo")}, 702 }, 703 { 704 In: []string{"foo", "bar"}, 705 Out: [][]byte{[]byte("foo"), []byte("bar")}, 706 }, 707 { 708 In: []string{"foo", "bar", "z", "abc"}, 709 Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("abc")}, 710 }, 711 { 712 In: []string{"foo", "bar", "z", "a"}, 713 Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("a")}, 714 }, 715 } 716 717 for i, example := range examples { 718 w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)} 719 b, err := w.MarshalBinary() 720 if err != nil { 721 t.Fatalf("[example %d] unexpected error, got %v", i, err) 722 } 723 724 out := &tsm1.DeleteWALEntry{} 725 if err := out.UnmarshalBinary(b); err != nil { 726 t.Fatalf("[example %d] %v", i, err) 727 } 728 729 if !reflect.DeepEqual(example.Out, out.Keys) { 730 t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out) 731 } 732 } 733} 734 735func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) { 736 w := &tsm1.DeleteWALEntry{ 737 Keys: [][]byte{[]byte("foo"), []byte("bar")}, 738 } 739 740 b, err := w.MarshalBinary() 741 if err != nil { 742 t.Fatalf("unexpected error, got %v", err) 743 } 744 745 // Test every possible truncation of a write WAL entry 746 for i := 0; i < len(b); i++ { 747 // re-allocated to ensure capacity would be exceed if slicing 748 truncated := make([]byte, i) 749 copy(truncated, b[:i]) 750 err := w.UnmarshalBinary(truncated) 751 if err != nil && err != tsm1.ErrWALCorrupt { 752 t.Fatalf("unexpected error: %v", err) 753 } 754 } 755} 756 757func TestWriteWALSegment_UnmarshalBinary_DeleteRangeWALCorrupt(t *testing.T) { 758 w := &tsm1.DeleteRangeWALEntry{ 759 Keys: [][]byte{[]byte("foo"), []byte("bar")}, 760 Min: 1, 761 Max: 2, 762 } 763 764 b, err := w.MarshalBinary() 765 if err != nil { 766 t.Fatalf("unexpected error, got %v", err) 767 } 768 769 // Test every possible truncation of a write WAL entry 770 for i := 0; i < len(b); i++ { 771 // re-allocated to ensure capacity would be exceed if slicing 772 truncated := make([]byte, i) 773 copy(truncated, b[:i]) 774 err := w.UnmarshalBinary(truncated) 775 if err != nil && err != tsm1.ErrWALCorrupt { 776 t.Fatalf("unexpected error: %v", err) 777 } 778 } 779} 780 781func BenchmarkWALSegmentWriter(b *testing.B) { 782 points := map[string][]tsm1.Value{} 783 for i := 0; i < 5000; i++ { 784 k := "cpu,host=A#!~#value" 785 points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1)) 786 } 787 788 dir := MustTempDir() 789 defer os.RemoveAll(dir) 790 791 f := MustTempFile(dir) 792 w := tsm1.NewWALSegmentWriter(f) 793 794 write := &tsm1.WriteWALEntry{ 795 Values: points, 796 } 797 798 b.ResetTimer() 799 for i := 0; i < b.N; i++ { 800 if err := w.Write(mustMarshalEntry(write)); err != nil { 801 b.Fatalf("unexpected error writing entry: %v", err) 802 } 803 } 804} 805 806func BenchmarkWALSegmentReader(b *testing.B) { 807 points := map[string][]tsm1.Value{} 808 for i := 0; i < 5000; i++ { 809 k := "cpu,host=A#!~#value" 810 points[k] = append(points[k], tsm1.NewValue(int64(i), 1.1)) 811 } 812 813 dir := MustTempDir() 814 defer os.RemoveAll(dir) 815 816 f := MustTempFile(dir) 817 w := tsm1.NewWALSegmentWriter(f) 818 819 write := &tsm1.WriteWALEntry{ 820 Values: points, 821 } 822 823 for i := 0; i < 100; i++ { 824 if err := w.Write(mustMarshalEntry(write)); err != nil { 825 b.Fatalf("unexpected error writing entry: %v", err) 826 } 827 } 828 829 r := tsm1.NewWALSegmentReader(f) 830 b.ResetTimer() 831 832 for i := 0; i < b.N; i++ { 833 b.StopTimer() 834 f.Seek(0, io.SeekStart) 835 b.StartTimer() 836 837 for r.Next() { 838 _, err := r.Read() 839 if err != nil { 840 b.Fatalf("unexpected error reading entry: %v", err) 841 } 842 } 843 } 844} 845 846// MustReadFileSize returns the size of the file, or panics. 847func MustReadFileSize(f *os.File) int64 { 848 stat, err := os.Stat(f.Name()) 849 if err != nil { 850 panic(fmt.Sprintf("failed to get size of file at %s: %s", f.Name(), err.Error())) 851 } 852 return stat.Size() 853} 854 855func mustMarshalEntry(entry tsm1.WALEntry) (tsm1.WalEntryType, []byte) { 856 bytes := make([]byte, 1024<<2) 857 858 b, err := entry.Encode(bytes) 859 if err != nil { 860 panic(fmt.Sprintf("error encoding: %v", err)) 861 } 862 863 return entry.Type(), snappy.Encode(b, b) 864} 865