1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package wal 16 17import ( 18 "bytes" 19 "fmt" 20 "io" 21 "io/ioutil" 22 "math" 23 "os" 24 "path" 25 "path/filepath" 26 "reflect" 27 "regexp" 28 "testing" 29 30 "go.etcd.io/etcd/pkg/fileutil" 31 "go.etcd.io/etcd/pkg/pbutil" 32 "go.etcd.io/etcd/raft/raftpb" 33 "go.etcd.io/etcd/wal/walpb" 34 35 "go.uber.org/zap" 36) 37 38func TestNew(t *testing.T) { 39 p, err := ioutil.TempDir(os.TempDir(), "waltest") 40 if err != nil { 41 t.Fatal(err) 42 } 43 defer os.RemoveAll(p) 44 45 w, err := Create(zap.NewExample(), p, []byte("somedata")) 46 if err != nil { 47 t.Fatalf("err = %v, want nil", err) 48 } 49 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) { 50 t.Errorf("name = %+v, want %+v", g, walName(0, 0)) 51 } 52 defer w.Close() 53 54 // file is preallocated to segment size; only read data written by wal 55 off, err := w.tail().Seek(0, io.SeekCurrent) 56 if err != nil { 57 t.Fatal(err) 58 } 59 gd := make([]byte, off) 60 f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name()))) 61 if err != nil { 62 t.Fatal(err) 63 } 64 defer f.Close() 65 if _, err = io.ReadFull(f, gd); err != nil { 66 t.Fatalf("err = %v, want nil", err) 67 } 68 69 var wb bytes.Buffer 70 e := newEncoder(&wb, 0, 0) 71 err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) 72 if err != nil { 73 t.Fatalf("err = %v, want nil", err) 74 } 75 err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")}) 76 if err != nil { 77 t.Fatalf("err = %v, want nil", err) 78 } 79 r := &walpb.Record{ 80 Type: snapshotType, 81 Data: pbutil.MustMarshal(&walpb.Snapshot{}), 82 } 83 if err = e.encode(r); err != nil { 84 t.Fatalf("err = %v, want nil", err) 85 } 86 e.flush() 87 if !bytes.Equal(gd, wb.Bytes()) { 88 t.Errorf("data = %v, want %v", gd, wb.Bytes()) 89 } 90} 91 92func TestCreateFailFromPollutedDir(t *testing.T) { 93 p, err := ioutil.TempDir(os.TempDir(), "waltest") 94 if err != nil { 95 t.Fatal(err) 96 } 97 defer os.RemoveAll(p) 98 ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary) 99 100 _, err = Create(zap.NewExample(), p, []byte("data")) 101 if err != os.ErrExist { 102 t.Fatalf("expected %v, got %v", os.ErrExist, err) 103 } 104} 105 106func TestWalCleanup(t *testing.T) { 107 testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot") 108 if err != nil { 109 t.Fatal(err) 110 } 111 p, err := ioutil.TempDir(testRoot, "waltest") 112 if err != nil { 113 t.Fatal(err) 114 } 115 defer os.RemoveAll(testRoot) 116 117 logger := zap.NewExample() 118 w, err := Create(logger, p, []byte("")) 119 if err != nil { 120 t.Fatalf("err = %v, want nil", err) 121 } 122 w.cleanupWAL(logger) 123 fnames, err := fileutil.ReadDir(testRoot) 124 if err != nil { 125 t.Fatalf("err = %v, want nil", err) 126 } 127 if len(fnames) != 1 { 128 t.Fatalf("expected 1 file under %v, got %v", testRoot, len(fnames)) 129 } 130 pattern := fmt.Sprintf(`%s.broken\.[\d]{8}\.[\d]{6}\.[\d]{1,6}?`, filepath.Base(p)) 131 match, _ := regexp.MatchString(pattern, fnames[0]) 132 if !match { 133 t.Errorf("match = false, expected true for %v with pattern %v", fnames[0], pattern) 134 } 135} 136 137func TestCreateFailFromNoSpaceLeft(t *testing.T) { 138 p, err := ioutil.TempDir(os.TempDir(), "waltest") 139 if err != nil { 140 t.Fatal(err) 141 } 142 defer os.RemoveAll(p) 143 144 oldSegmentSizeBytes := SegmentSizeBytes 145 defer func() { 146 SegmentSizeBytes = oldSegmentSizeBytes 147 }() 148 SegmentSizeBytes = math.MaxInt64 149 150 _, err = Create(zap.NewExample(), p, []byte("data")) 151 if err == nil { // no space left on device 152 t.Fatalf("expected error 'no space left on device', got nil") 153 } 154} 155 156func TestNewForInitedDir(t *testing.T) { 157 p, err := ioutil.TempDir(os.TempDir(), "waltest") 158 if err != nil { 159 t.Fatal(err) 160 } 161 defer os.RemoveAll(p) 162 163 os.Create(filepath.Join(p, walName(0, 0))) 164 if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist { 165 t.Errorf("err = %v, want %v", err, os.ErrExist) 166 } 167} 168 169func TestOpenAtIndex(t *testing.T) { 170 dir, err := ioutil.TempDir(os.TempDir(), "waltest") 171 if err != nil { 172 t.Fatal(err) 173 } 174 defer os.RemoveAll(dir) 175 176 f, err := os.Create(filepath.Join(dir, walName(0, 0))) 177 if err != nil { 178 t.Fatal(err) 179 } 180 f.Close() 181 182 w, err := Open(zap.NewExample(), dir, walpb.Snapshot{}) 183 if err != nil { 184 t.Fatalf("err = %v, want nil", err) 185 } 186 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) { 187 t.Errorf("name = %+v, want %+v", g, walName(0, 0)) 188 } 189 if w.seq() != 0 { 190 t.Errorf("seq = %d, want %d", w.seq(), 0) 191 } 192 w.Close() 193 194 wname := walName(2, 10) 195 f, err = os.Create(filepath.Join(dir, wname)) 196 if err != nil { 197 t.Fatal(err) 198 } 199 f.Close() 200 201 w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5}) 202 if err != nil { 203 t.Fatalf("err = %v, want nil", err) 204 } 205 if g := filepath.Base(w.tail().Name()); g != wname { 206 t.Errorf("name = %+v, want %+v", g, wname) 207 } 208 if w.seq() != 2 { 209 t.Errorf("seq = %d, want %d", w.seq(), 2) 210 } 211 w.Close() 212 213 emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty") 214 if err != nil { 215 t.Fatal(err) 216 } 217 defer os.RemoveAll(emptydir) 218 if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound { 219 t.Errorf("err = %v, want %v", err, ErrFileNotFound) 220 } 221} 222 223// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted. 224// The test creates a WAL directory and cuts out multiple WAL files. Then 225// it corrupts one of the files by completely truncating it. 226func TestVerify(t *testing.T) { 227 walDir, err := ioutil.TempDir(os.TempDir(), "waltest") 228 if err != nil { 229 t.Fatal(err) 230 } 231 defer os.RemoveAll(walDir) 232 233 // create WAL 234 w, err := Create(zap.NewExample(), walDir, nil) 235 if err != nil { 236 t.Fatal(err) 237 } 238 defer w.Close() 239 240 // make 5 separate files 241 for i := 0; i < 5; i++ { 242 es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(rune(i+1)))}} 243 if err = w.Save(raftpb.HardState{}, es); err != nil { 244 t.Fatal(err) 245 } 246 if err = w.cut(); err != nil { 247 t.Fatal(err) 248 } 249 } 250 251 // to verify the WAL is not corrupted at this point 252 err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) 253 if err != nil { 254 t.Errorf("expected a nil error, got %v", err) 255 } 256 257 walFiles, err := ioutil.ReadDir(walDir) 258 if err != nil { 259 t.Fatal(err) 260 } 261 262 // corrupt the WAL by truncating one of the WAL files completely 263 err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0) 264 if err != nil { 265 t.Fatal(err) 266 } 267 268 err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) 269 if err == nil { 270 t.Error("expected a non-nil error, got nil") 271 } 272} 273 274// TODO: split it into smaller tests for better readability 275func TestCut(t *testing.T) { 276 p, err := ioutil.TempDir(os.TempDir(), "waltest") 277 if err != nil { 278 t.Fatal(err) 279 } 280 defer os.RemoveAll(p) 281 282 w, err := Create(zap.NewExample(), p, nil) 283 if err != nil { 284 t.Fatal(err) 285 } 286 defer w.Close() 287 288 state := raftpb.HardState{Term: 1} 289 if err = w.Save(state, nil); err != nil { 290 t.Fatal(err) 291 } 292 if err = w.cut(); err != nil { 293 t.Fatal(err) 294 } 295 wname := walName(1, 1) 296 if g := filepath.Base(w.tail().Name()); g != wname { 297 t.Errorf("name = %s, want %s", g, wname) 298 } 299 300 es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}} 301 if err = w.Save(raftpb.HardState{}, es); err != nil { 302 t.Fatal(err) 303 } 304 if err = w.cut(); err != nil { 305 t.Fatal(err) 306 } 307 snap := walpb.Snapshot{Index: 2, Term: 1} 308 if err = w.SaveSnapshot(snap); err != nil { 309 t.Fatal(err) 310 } 311 wname = walName(2, 2) 312 if g := filepath.Base(w.tail().Name()); g != wname { 313 t.Errorf("name = %s, want %s", g, wname) 314 } 315 316 // check the state in the last WAL 317 // We do check before closing the WAL to ensure that Cut syncs the data 318 // into the disk. 319 f, err := os.Open(filepath.Join(p, wname)) 320 if err != nil { 321 t.Fatal(err) 322 } 323 defer f.Close() 324 nw := &WAL{ 325 decoder: newDecoder(f), 326 start: snap, 327 } 328 _, gst, _, err := nw.ReadAll() 329 if err != nil { 330 t.Fatal(err) 331 } 332 if !reflect.DeepEqual(gst, state) { 333 t.Errorf("state = %+v, want %+v", gst, state) 334 } 335} 336 337func TestSaveWithCut(t *testing.T) { 338 p, err := ioutil.TempDir(os.TempDir(), "waltest") 339 if err != nil { 340 t.Fatal(err) 341 } 342 defer os.RemoveAll(p) 343 344 w, err := Create(zap.NewExample(), p, []byte("metadata")) 345 if err != nil { 346 t.Fatal(err) 347 } 348 349 state := raftpb.HardState{Term: 1} 350 if err = w.Save(state, nil); err != nil { 351 t.Fatal(err) 352 } 353 bigData := make([]byte, 500) 354 strdata := "Hello World!!" 355 copy(bigData, strdata) 356 // set a lower value for SegmentSizeBytes, else the test takes too long to complete 357 restoreLater := SegmentSizeBytes 358 const EntrySize int = 500 359 SegmentSizeBytes = 2 * 1024 360 defer func() { SegmentSizeBytes = restoreLater }() 361 index := uint64(0) 362 for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize { 363 ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}} 364 if err = w.Save(state, ents); err != nil { 365 t.Fatal(err) 366 } 367 index++ 368 } 369 370 w.Close() 371 372 neww, err := Open(zap.NewExample(), p, walpb.Snapshot{}) 373 if err != nil { 374 t.Fatalf("err = %v, want nil", err) 375 } 376 defer neww.Close() 377 wname := walName(1, index) 378 if g := filepath.Base(neww.tail().Name()); g != wname { 379 t.Errorf("name = %s, want %s", g, wname) 380 } 381 382 _, newhardstate, entries, err := neww.ReadAll() 383 if err != nil { 384 t.Fatal(err) 385 } 386 387 if !reflect.DeepEqual(newhardstate, state) { 388 t.Errorf("Hard State = %+v, want %+v", newhardstate, state) 389 } 390 if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) { 391 t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize))) 392 } 393 for _, oneent := range entries { 394 if !bytes.Equal(oneent.Data, bigData) { 395 t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData) 396 } 397 } 398} 399 400func TestRecover(t *testing.T) { 401 p, err := ioutil.TempDir(os.TempDir(), "waltest") 402 if err != nil { 403 t.Fatal(err) 404 } 405 defer os.RemoveAll(p) 406 407 w, err := Create(zap.NewExample(), p, []byte("metadata")) 408 if err != nil { 409 t.Fatal(err) 410 } 411 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 412 t.Fatal(err) 413 } 414 ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} 415 if err = w.Save(raftpb.HardState{}, ents); err != nil { 416 t.Fatal(err) 417 } 418 sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}} 419 for _, s := range sts { 420 if err = w.Save(s, nil); err != nil { 421 t.Fatal(err) 422 } 423 } 424 w.Close() 425 426 if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil { 427 t.Fatal(err) 428 } 429 metadata, state, entries, err := w.ReadAll() 430 if err != nil { 431 t.Fatal(err) 432 } 433 434 if !bytes.Equal(metadata, []byte("metadata")) { 435 t.Errorf("metadata = %s, want %s", metadata, "metadata") 436 } 437 if !reflect.DeepEqual(entries, ents) { 438 t.Errorf("ents = %+v, want %+v", entries, ents) 439 } 440 // only the latest state is recorded 441 s := sts[len(sts)-1] 442 if !reflect.DeepEqual(state, s) { 443 t.Errorf("state = %+v, want %+v", state, s) 444 } 445 w.Close() 446} 447 448func TestSearchIndex(t *testing.T) { 449 tests := []struct { 450 names []string 451 index uint64 452 widx int 453 wok bool 454 }{ 455 { 456 []string{ 457 "0000000000000000-0000000000000000.wal", 458 "0000000000000001-0000000000001000.wal", 459 "0000000000000002-0000000000002000.wal", 460 }, 461 0x1000, 1, true, 462 }, 463 { 464 []string{ 465 "0000000000000001-0000000000004000.wal", 466 "0000000000000002-0000000000003000.wal", 467 "0000000000000003-0000000000005000.wal", 468 }, 469 0x4000, 1, true, 470 }, 471 { 472 []string{ 473 "0000000000000001-0000000000002000.wal", 474 "0000000000000002-0000000000003000.wal", 475 "0000000000000003-0000000000005000.wal", 476 }, 477 0x1000, -1, false, 478 }, 479 } 480 for i, tt := range tests { 481 idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index) 482 if idx != tt.widx { 483 t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx) 484 } 485 if ok != tt.wok { 486 t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) 487 } 488 } 489} 490 491func TestScanWalName(t *testing.T) { 492 tests := []struct { 493 str string 494 wseq, windex uint64 495 wok bool 496 }{ 497 {"0000000000000000-0000000000000000.wal", 0, 0, true}, 498 {"0000000000000000.wal", 0, 0, false}, 499 {"0000000000000000-0000000000000000.snap", 0, 0, false}, 500 } 501 for i, tt := range tests { 502 s, index, err := parseWALName(tt.str) 503 if g := err == nil; g != tt.wok { 504 t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok) 505 } 506 if s != tt.wseq { 507 t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq) 508 } 509 if index != tt.windex { 510 t.Errorf("#%d: index = %d, want %d", i, index, tt.windex) 511 } 512 } 513} 514 515func TestRecoverAfterCut(t *testing.T) { 516 p, err := ioutil.TempDir(os.TempDir(), "waltest") 517 if err != nil { 518 t.Fatal(err) 519 } 520 defer os.RemoveAll(p) 521 522 md, err := Create(zap.NewExample(), p, []byte("metadata")) 523 if err != nil { 524 t.Fatal(err) 525 } 526 for i := 0; i < 10; i++ { 527 if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil { 528 t.Fatal(err) 529 } 530 es := []raftpb.Entry{{Index: uint64(i)}} 531 if err = md.Save(raftpb.HardState{}, es); err != nil { 532 t.Fatal(err) 533 } 534 if err = md.cut(); err != nil { 535 t.Fatal(err) 536 } 537 } 538 md.Close() 539 540 if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil { 541 t.Fatal(err) 542 } 543 544 for i := 0; i < 10; i++ { 545 w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)}) 546 if err != nil { 547 if i <= 4 { 548 if err != ErrFileNotFound { 549 t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound) 550 } 551 } else { 552 t.Errorf("#%d: err = %v, want nil", i, err) 553 } 554 continue 555 } 556 metadata, _, entries, err := w.ReadAll() 557 if err != nil { 558 t.Errorf("#%d: err = %v, want nil", i, err) 559 continue 560 } 561 if !bytes.Equal(metadata, []byte("metadata")) { 562 t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata") 563 } 564 for j, e := range entries { 565 if e.Index != uint64(j+i+1) { 566 t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1) 567 } 568 } 569 w.Close() 570 } 571} 572 573func TestOpenAtUncommittedIndex(t *testing.T) { 574 p, err := ioutil.TempDir(os.TempDir(), "waltest") 575 if err != nil { 576 t.Fatal(err) 577 } 578 defer os.RemoveAll(p) 579 580 w, err := Create(zap.NewExample(), p, nil) 581 if err != nil { 582 t.Fatal(err) 583 } 584 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 585 t.Fatal(err) 586 } 587 if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil { 588 t.Fatal(err) 589 } 590 w.Close() 591 592 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 593 if err != nil { 594 t.Fatal(err) 595 } 596 // commit up to index 0, try to read index 1 597 if _, _, _, err = w.ReadAll(); err != nil { 598 t.Errorf("err = %v, want nil", err) 599 } 600 w.Close() 601} 602 603// TestOpenForRead tests that OpenForRead can load all files. 604// The tests creates WAL directory, and cut out multiple WAL files. Then 605// it releases the lock of part of data, and excepts that OpenForRead 606// can read out all files even if some are locked for write. 607func TestOpenForRead(t *testing.T) { 608 p, err := ioutil.TempDir(os.TempDir(), "waltest") 609 if err != nil { 610 t.Fatal(err) 611 } 612 defer os.RemoveAll(p) 613 // create WAL 614 w, err := Create(zap.NewExample(), p, nil) 615 if err != nil { 616 t.Fatal(err) 617 } 618 defer w.Close() 619 // make 10 separate files 620 for i := 0; i < 10; i++ { 621 es := []raftpb.Entry{{Index: uint64(i)}} 622 if err = w.Save(raftpb.HardState{}, es); err != nil { 623 t.Fatal(err) 624 } 625 if err = w.cut(); err != nil { 626 t.Fatal(err) 627 } 628 } 629 // release the lock to 5 630 unlockIndex := uint64(5) 631 w.ReleaseLockTo(unlockIndex) 632 633 // All are available for read 634 w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) 635 if err != nil { 636 t.Fatal(err) 637 } 638 defer w2.Close() 639 _, _, ents, err := w2.ReadAll() 640 if err != nil { 641 t.Fatalf("err = %v, want nil", err) 642 } 643 if g := ents[len(ents)-1].Index; g != 9 { 644 t.Errorf("last index read = %d, want %d", g, 9) 645 } 646} 647 648func TestOpenWithMaxIndex(t *testing.T) { 649 p, err := ioutil.TempDir(os.TempDir(), "waltest") 650 if err != nil { 651 t.Fatal(err) 652 } 653 defer os.RemoveAll(p) 654 // create WAL 655 w, err := Create(zap.NewExample(), p, nil) 656 if err != nil { 657 t.Fatal(err) 658 } 659 defer w.Close() 660 661 es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}} 662 if err = w.Save(raftpb.HardState{}, es); err != nil { 663 t.Fatal(err) 664 } 665 w.Close() 666 667 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 668 if err != nil { 669 t.Fatal(err) 670 } 671 _, _, _, err = w.ReadAll() 672 if err == nil || err != ErrSliceOutOfRange { 673 t.Fatalf("err = %v, want ErrSliceOutOfRange", err) 674 } 675} 676 677func TestSaveEmpty(t *testing.T) { 678 var buf bytes.Buffer 679 var est raftpb.HardState 680 w := WAL{ 681 encoder: newEncoder(&buf, 0, 0), 682 } 683 if err := w.saveState(&est); err != nil { 684 t.Errorf("err = %v, want nil", err) 685 } 686 if len(buf.Bytes()) != 0 { 687 t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes())) 688 } 689} 690 691func TestReleaseLockTo(t *testing.T) { 692 p, err := ioutil.TempDir(os.TempDir(), "waltest") 693 if err != nil { 694 t.Fatal(err) 695 } 696 defer os.RemoveAll(p) 697 // create WAL 698 w, err := Create(zap.NewExample(), p, nil) 699 defer func() { 700 if err = w.Close(); err != nil { 701 t.Fatal(err) 702 } 703 }() 704 if err != nil { 705 t.Fatal(err) 706 } 707 708 // release nothing if no files 709 err = w.ReleaseLockTo(10) 710 if err != nil { 711 t.Errorf("err = %v, want nil", err) 712 } 713 714 // make 10 separate files 715 for i := 0; i < 10; i++ { 716 es := []raftpb.Entry{{Index: uint64(i)}} 717 if err = w.Save(raftpb.HardState{}, es); err != nil { 718 t.Fatal(err) 719 } 720 if err = w.cut(); err != nil { 721 t.Fatal(err) 722 } 723 } 724 // release the lock to 5 725 unlockIndex := uint64(5) 726 w.ReleaseLockTo(unlockIndex) 727 728 // expected remaining are 4,5,6,7,8,9,10 729 if len(w.locks) != 7 { 730 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7) 731 } 732 for i, l := range w.locks { 733 var lockIndex uint64 734 _, lockIndex, err = parseWALName(filepath.Base(l.Name())) 735 if err != nil { 736 t.Fatal(err) 737 } 738 739 if lockIndex != uint64(i+4) { 740 t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4)) 741 } 742 } 743 744 // release the lock to 15 745 unlockIndex = uint64(15) 746 w.ReleaseLockTo(unlockIndex) 747 748 // expected remaining is 10 749 if len(w.locks) != 1 { 750 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1) 751 } 752 _, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name())) 753 if err != nil { 754 t.Fatal(err) 755 } 756 757 if lockIndex != uint64(10) { 758 t.Errorf("lockindex = %d, want %d", lockIndex, 10) 759 } 760} 761 762// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space. 763func TestTailWriteNoSlackSpace(t *testing.T) { 764 p, err := ioutil.TempDir(os.TempDir(), "waltest") 765 if err != nil { 766 t.Fatal(err) 767 } 768 defer os.RemoveAll(p) 769 770 // create initial WAL 771 w, err := Create(zap.NewExample(), p, []byte("metadata")) 772 if err != nil { 773 t.Fatal(err) 774 } 775 // write some entries 776 for i := 1; i <= 5; i++ { 777 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}} 778 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil { 779 t.Fatal(err) 780 } 781 } 782 // get rid of slack space by truncating file 783 off, serr := w.tail().Seek(0, io.SeekCurrent) 784 if serr != nil { 785 t.Fatal(serr) 786 } 787 if terr := w.tail().Truncate(off); terr != nil { 788 t.Fatal(terr) 789 } 790 w.Close() 791 792 // open, write more 793 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 794 if err != nil { 795 t.Fatal(err) 796 } 797 _, _, ents, rerr := w.ReadAll() 798 if rerr != nil { 799 t.Fatal(rerr) 800 } 801 if len(ents) != 5 { 802 t.Fatalf("got entries %+v, expected 5 entries", ents) 803 } 804 // write more entries 805 for i := 6; i <= 10; i++ { 806 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}} 807 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil { 808 t.Fatal(err) 809 } 810 } 811 w.Close() 812 813 // confirm all writes 814 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 815 if err != nil { 816 t.Fatal(err) 817 } 818 _, _, ents, rerr = w.ReadAll() 819 if rerr != nil { 820 t.Fatal(rerr) 821 } 822 if len(ents) != 10 { 823 t.Fatalf("got entries %+v, expected 10 entries", ents) 824 } 825 w.Close() 826} 827 828// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart 829func TestRestartCreateWal(t *testing.T) { 830 p, err := ioutil.TempDir(os.TempDir(), "waltest") 831 if err != nil { 832 t.Fatal(err) 833 } 834 defer os.RemoveAll(p) 835 836 // make temporary directory so it looks like initialization is interrupted 837 tmpdir := filepath.Clean(p) + ".tmp" 838 if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil { 839 t.Fatal(err) 840 } 841 if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil { 842 t.Fatal(err) 843 } 844 845 w, werr := Create(zap.NewExample(), p, []byte("abc")) 846 if werr != nil { 847 t.Fatal(werr) 848 } 849 w.Close() 850 if Exist(tmpdir) { 851 t.Fatalf("got %q exists, expected it to not exist", tmpdir) 852 } 853 854 if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil { 855 t.Fatal(err) 856 } 857 defer w.Close() 858 859 if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" { 860 t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc") 861 } 862} 863 864// TestOpenOnTornWrite ensures that entries past the torn write are truncated. 865func TestOpenOnTornWrite(t *testing.T) { 866 maxEntries := 40 867 clobberIdx := 20 868 overwriteEntries := 5 869 870 p, err := ioutil.TempDir(os.TempDir(), "waltest") 871 if err != nil { 872 t.Fatal(err) 873 } 874 defer os.RemoveAll(p) 875 w, err := Create(zap.NewExample(), p, nil) 876 defer func() { 877 if err = w.Close(); err != nil && err != os.ErrInvalid { 878 t.Fatal(err) 879 } 880 }() 881 if err != nil { 882 t.Fatal(err) 883 } 884 885 // get offset of end of each saved entry 886 offsets := make([]int64, maxEntries) 887 for i := range offsets { 888 es := []raftpb.Entry{{Index: uint64(i)}} 889 if err = w.Save(raftpb.HardState{}, es); err != nil { 890 t.Fatal(err) 891 } 892 if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil { 893 t.Fatal(err) 894 } 895 } 896 897 fn := filepath.Join(p, filepath.Base(w.tail().Name())) 898 w.Close() 899 900 // clobber some entry with 0's to simulate a torn write 901 f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode) 902 if ferr != nil { 903 t.Fatal(ferr) 904 } 905 defer f.Close() 906 _, err = f.Seek(offsets[clobberIdx], io.SeekStart) 907 if err != nil { 908 t.Fatal(err) 909 } 910 zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx]) 911 _, err = f.Write(zeros) 912 if err != nil { 913 t.Fatal(err) 914 } 915 f.Close() 916 917 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 918 if err != nil { 919 t.Fatal(err) 920 } 921 // seek up to clobbered entry 922 _, _, _, err = w.ReadAll() 923 if err != nil { 924 t.Fatal(err) 925 } 926 927 // write a few entries past the clobbered entry 928 for i := 0; i < overwriteEntries; i++ { 929 // Index is different from old, truncated entries 930 es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}} 931 if err = w.Save(raftpb.HardState{}, es); err != nil { 932 t.Fatal(err) 933 } 934 } 935 w.Close() 936 937 // read back the entries, confirm number of entries matches expectation 938 w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) 939 if err != nil { 940 t.Fatal(err) 941 } 942 943 _, _, ents, rerr := w.ReadAll() 944 if rerr != nil { 945 // CRC error? the old entries were likely never truncated away 946 t.Fatal(rerr) 947 } 948 wEntries := (clobberIdx - 1) + overwriteEntries 949 if len(ents) != wEntries { 950 t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents)) 951 } 952} 953 954func TestRenameFail(t *testing.T) { 955 p, err := ioutil.TempDir(os.TempDir(), "waltest") 956 if err != nil { 957 t.Fatal(err) 958 } 959 defer os.RemoveAll(p) 960 961 oldSegmentSizeBytes := SegmentSizeBytes 962 defer func() { 963 SegmentSizeBytes = oldSegmentSizeBytes 964 }() 965 SegmentSizeBytes = math.MaxInt64 966 967 tp, terr := ioutil.TempDir(os.TempDir(), "waltest") 968 if terr != nil { 969 t.Fatal(terr) 970 } 971 os.RemoveAll(tp) 972 973 w := &WAL{ 974 lg: zap.NewExample(), 975 dir: p, 976 } 977 w2, werr := w.renameWAL(tp) 978 if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory' 979 t.Fatalf("expected error, got %v", werr) 980 } 981} 982 983// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting 984// for hardstate 985func TestValidSnapshotEntries(t *testing.T) { 986 p, err := ioutil.TempDir(os.TempDir(), "waltest") 987 if err != nil { 988 t.Fatal(err) 989 } 990 defer os.RemoveAll(p) 991 snap0 := walpb.Snapshot{Index: 0, Term: 0} 992 snap1 := walpb.Snapshot{Index: 1, Term: 1} 993 state1 := raftpb.HardState{Commit: 1, Term: 1} 994 snap2 := walpb.Snapshot{Index: 2, Term: 1} 995 snap3 := walpb.Snapshot{Index: 3, Term: 2} 996 state2 := raftpb.HardState{Commit: 3, Term: 2} 997 snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3 998 func() { 999 var w *WAL 1000 w, err = Create(zap.NewExample(), p, nil) 1001 if err != nil { 1002 t.Fatal(err) 1003 } 1004 defer w.Close() 1005 1006 // snap0 is implicitly created at index 0, term 0 1007 if err = w.SaveSnapshot(snap1); err != nil { 1008 t.Fatal(err) 1009 } 1010 if err = w.Save(state1, nil); err != nil { 1011 t.Fatal(err) 1012 } 1013 if err = w.SaveSnapshot(snap2); err != nil { 1014 t.Fatal(err) 1015 } 1016 if err = w.SaveSnapshot(snap3); err != nil { 1017 t.Fatal(err) 1018 } 1019 if err = w.Save(state2, nil); err != nil { 1020 t.Fatal(err) 1021 } 1022 if err = w.SaveSnapshot(snap4); err != nil { 1023 t.Fatal(err) 1024 } 1025 }() 1026 walSnaps, serr := ValidSnapshotEntries(zap.NewExample(), p) 1027 if serr != nil { 1028 t.Fatal(serr) 1029 } 1030 expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} 1031 if !reflect.DeepEqual(walSnaps, expected) { 1032 t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps) 1033 } 1034} 1035 1036// TestValidSnapshotEntriesAfterPurgeWal ensure that there are many wal files, and after cleaning the first wal file, 1037// it can work well. 1038func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) { 1039 oldSegmentSizeBytes := SegmentSizeBytes 1040 SegmentSizeBytes = 64 1041 defer func() { 1042 SegmentSizeBytes = oldSegmentSizeBytes 1043 }() 1044 p, err := ioutil.TempDir(os.TempDir(), "waltest") 1045 if err != nil { 1046 t.Fatal(err) 1047 } 1048 defer os.RemoveAll(p) 1049 snap0 := walpb.Snapshot{Index: 0, Term: 0} 1050 snap1 := walpb.Snapshot{Index: 1, Term: 1} 1051 state1 := raftpb.HardState{Commit: 1, Term: 1} 1052 snap2 := walpb.Snapshot{Index: 2, Term: 1} 1053 snap3 := walpb.Snapshot{Index: 3, Term: 2} 1054 state2 := raftpb.HardState{Commit: 3, Term: 2} 1055 func() { 1056 w, werr := Create(zap.NewExample(), p, nil) 1057 if werr != nil { 1058 t.Fatal(werr) 1059 } 1060 defer w.Close() 1061 1062 // snap0 is implicitly created at index 0, term 0 1063 if err = w.SaveSnapshot(snap1); err != nil { 1064 t.Fatal(err) 1065 } 1066 if err = w.Save(state1, nil); err != nil { 1067 t.Fatal(err) 1068 } 1069 if err = w.SaveSnapshot(snap2); err != nil { 1070 t.Fatal(err) 1071 } 1072 if err = w.SaveSnapshot(snap3); err != nil { 1073 t.Fatal(err) 1074 } 1075 for i := 0; i < 128; i++ { 1076 if err = w.Save(state2, nil); err != nil { 1077 t.Fatal(err) 1078 } 1079 } 1080 }() 1081 files, _, ferr := selectWALFiles(nil, p, snap0) 1082 if ferr != nil { 1083 t.Fatal(ferr) 1084 } 1085 os.Remove(p + "/" + files[0]) 1086 _, err = ValidSnapshotEntries(zap.NewExample(), p) 1087 if err != nil { 1088 t.Fatal(err) 1089 } 1090} 1091 1092// TestReadAllFail ensure ReadAll error if used without opening the WAL 1093func TestReadAllFail(t *testing.T) { 1094 dir, err := ioutil.TempDir(os.TempDir(), "waltest") 1095 if err != nil { 1096 t.Fatal(err) 1097 } 1098 defer os.RemoveAll(dir) 1099 1100 // create initial WAL 1101 f, err := Create(zap.NewExample(), dir, []byte("metadata")) 1102 if err != nil { 1103 t.Fatal(err) 1104 } 1105 f.Close() 1106 // try to read without opening the WAL 1107 _, _, _, err = f.ReadAll() 1108 if err == nil || err != ErrDecoderNotFound { 1109 t.Fatalf("err = %v, want ErrDecoderNotFound", err) 1110 } 1111} 1112