1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package wal 16 17import ( 18 "bytes" 19 "fmt" 20 "io" 21 "io/ioutil" 22 "math" 23 "os" 24 "path" 25 "path/filepath" 26 "reflect" 27 "regexp" 28 "testing" 29 30 "github.com/stretchr/testify/assert" 31 "go.etcd.io/etcd/client/pkg/v3/fileutil" 32 "go.etcd.io/etcd/pkg/v3/pbutil" 33 "go.etcd.io/etcd/raft/v3/raftpb" 34 "go.etcd.io/etcd/server/v3/wal/walpb" 35 "go.uber.org/zap/zaptest" 36 37 "go.uber.org/zap" 38) 39 40var ( 41 confState = raftpb.ConfState{ 42 Voters: []uint64{0x00ffca74}, 43 AutoLeave: false, 44 } 45) 46 47func TestNew(t *testing.T) { 48 p, err := ioutil.TempDir(t.TempDir(), "waltest") 49 if err != nil { 50 t.Fatal(err) 51 } 52 defer os.RemoveAll(p) 53 54 w, err := Create(zap.NewExample(), p, []byte("somedata")) 55 if err != nil { 56 t.Fatalf("err = %v, want nil", err) 57 } 58 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) { 59 t.Errorf("name = %+v, want %+v", g, walName(0, 0)) 60 } 61 defer w.Close() 62 63 // file is preallocated to segment size; only read data written by wal 64 off, err := w.tail().Seek(0, io.SeekCurrent) 65 if err != nil { 66 t.Fatal(err) 67 } 68 gd := make([]byte, off) 69 f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name()))) 70 if err != nil { 71 t.Fatal(err) 72 } 73 defer f.Close() 74 if _, err = io.ReadFull(f, gd); err != nil { 75 t.Fatalf("err = %v, want nil", err) 76 } 77 78 var wb bytes.Buffer 79 e := newEncoder(&wb, 0, 0) 80 err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) 81 if err != nil { 82 t.Fatalf("err = %v, want nil", err) 83 } 84 err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")}) 85 if err != nil { 86 t.Fatalf("err = %v, want nil", err) 87 } 88 r := &walpb.Record{ 89 Type: snapshotType, 90 Data: pbutil.MustMarshal(&walpb.Snapshot{}), 91 } 92 if err = e.encode(r); err != nil { 93 t.Fatalf("err = %v, want nil", err) 94 } 95 e.flush() 96 if !bytes.Equal(gd, wb.Bytes()) { 97 t.Errorf("data = %v, want %v", gd, wb.Bytes()) 98 } 99} 100 101func TestCreateFailFromPollutedDir(t *testing.T) { 102 p, err := ioutil.TempDir(t.TempDir(), "waltest") 103 if err != nil { 104 t.Fatal(err) 105 } 106 defer os.RemoveAll(p) 107 ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary) 108 109 _, err = Create(zap.NewExample(), p, []byte("data")) 110 if err != os.ErrExist { 111 t.Fatalf("expected %v, got %v", os.ErrExist, err) 112 } 113} 114 115func TestWalCleanup(t *testing.T) { 116 testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot") 117 if err != nil { 118 t.Fatal(err) 119 } 120 p, err := ioutil.TempDir(testRoot, "waltest") 121 if err != nil { 122 t.Fatal(err) 123 } 124 defer os.RemoveAll(testRoot) 125 126 logger := zap.NewExample() 127 w, err := Create(logger, p, []byte("")) 128 if err != nil { 129 t.Fatalf("err = %v, want nil", err) 130 } 131 w.cleanupWAL(logger) 132 fnames, err := fileutil.ReadDir(testRoot) 133 if err != nil { 134 t.Fatalf("err = %v, want nil", err) 135 } 136 if len(fnames) != 1 { 137 t.Fatalf("expected 1 file under %v, got %v", testRoot, len(fnames)) 138 } 139 pattern := fmt.Sprintf(`%s.broken\.[\d]{8}\.[\d]{6}\.[\d]{1,6}?`, filepath.Base(p)) 140 match, _ := regexp.MatchString(pattern, fnames[0]) 141 if !match { 142 t.Errorf("match = false, expected true for %v with pattern %v", fnames[0], pattern) 143 } 144} 145 146func TestCreateFailFromNoSpaceLeft(t *testing.T) { 147 p, err := ioutil.TempDir(t.TempDir(), "waltest") 148 if err != nil { 149 t.Fatal(err) 150 } 151 defer os.RemoveAll(p) 152 153 oldSegmentSizeBytes := SegmentSizeBytes 154 defer func() { 155 SegmentSizeBytes = oldSegmentSizeBytes 156 }() 157 SegmentSizeBytes = math.MaxInt64 158 159 _, err = Create(zap.NewExample(), p, []byte("data")) 160 if err == nil { // no space left on device 161 t.Fatalf("expected error 'no space left on device', got nil") 162 } 163} 164 165func TestNewForInitedDir(t *testing.T) { 166 p, err := ioutil.TempDir(t.TempDir(), "waltest") 167 if err != nil { 168 t.Fatal(err) 169 } 170 defer os.RemoveAll(p) 171 172 os.Create(filepath.Join(p, walName(0, 0))) 173 if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist { 174 t.Errorf("err = %v, want %v", err, os.ErrExist) 175 } 176} 177 178func TestOpenAtIndex(t *testing.T) { 179 dir, err := ioutil.TempDir(t.TempDir(), "waltest") 180 if err != nil { 181 t.Fatal(err) 182 } 183 defer os.RemoveAll(dir) 184 185 f, err := os.Create(filepath.Join(dir, walName(0, 0))) 186 if err != nil { 187 t.Fatal(err) 188 } 189 f.Close() 190 191 w, err := Open(zap.NewExample(), dir, walpb.Snapshot{}) 192 if err != nil { 193 t.Fatalf("err = %v, want nil", err) 194 } 195 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) { 196 t.Errorf("name = %+v, want %+v", g, walName(0, 0)) 197 } 198 if w.seq() != 0 { 199 t.Errorf("seq = %d, want %d", w.seq(), 0) 200 } 201 w.Close() 202 203 wname := walName(2, 10) 204 f, err = os.Create(filepath.Join(dir, wname)) 205 if err != nil { 206 t.Fatal(err) 207 } 208 f.Close() 209 210 w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5}) 211 if err != nil { 212 t.Fatalf("err = %v, want nil", err) 213 } 214 if g := filepath.Base(w.tail().Name()); g != wname { 215 t.Errorf("name = %+v, want %+v", g, wname) 216 } 217 if w.seq() != 2 { 218 t.Errorf("seq = %d, want %d", w.seq(), 2) 219 } 220 w.Close() 221 222 emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty") 223 if err != nil { 224 t.Fatal(err) 225 } 226 defer os.RemoveAll(emptydir) 227 if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound { 228 t.Errorf("err = %v, want %v", err, ErrFileNotFound) 229 } 230} 231 232// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted. 233// The test creates a WAL directory and cuts out multiple WAL files. Then 234// it corrupts one of the files by completely truncating it. 235func TestVerify(t *testing.T) { 236 lg := zaptest.NewLogger(t) 237 walDir, err := ioutil.TempDir(t.TempDir(), "waltest") 238 if err != nil { 239 t.Fatal(err) 240 } 241 242 // create WAL 243 w, err := Create(lg, walDir, nil) 244 if err != nil { 245 t.Fatal(err) 246 } 247 defer w.Close() 248 249 // make 5 separate files 250 for i := 0; i < 5; i++ { 251 es := []raftpb.Entry{{Index: uint64(i), Data: []byte(fmt.Sprintf("waldata%d", i+1))}} 252 if err = w.Save(raftpb.HardState{}, es); err != nil { 253 t.Fatal(err) 254 } 255 if err = w.cut(); err != nil { 256 t.Fatal(err) 257 } 258 } 259 260 hs := raftpb.HardState{Term: 1, Vote: 3, Commit: 5} 261 assert.NoError(t, w.Save(hs, nil)) 262 263 // to verify the WAL is not corrupted at this point 264 hardstate, err := Verify(lg, walDir, walpb.Snapshot{}) 265 if err != nil { 266 t.Errorf("expected a nil error, got %v", err) 267 } 268 assert.Equal(t, hs, *hardstate) 269 270 walFiles, err := ioutil.ReadDir(walDir) 271 if err != nil { 272 t.Fatal(err) 273 } 274 275 // corrupt the WAL by truncating one of the WAL files completely 276 err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0) 277 if err != nil { 278 t.Fatal(err) 279 } 280 281 _, err = Verify(lg, walDir, walpb.Snapshot{}) 282 if err == nil { 283 t.Error("expected a non-nil error, got nil") 284 } 285} 286 287// TODO: split it into smaller tests for better readability 288func TestCut(t *testing.T) { 289 p, err := ioutil.TempDir(t.TempDir(), "waltest") 290 if err != nil { 291 t.Fatal(err) 292 } 293 defer os.RemoveAll(p) 294 295 w, err := Create(zap.NewExample(), p, nil) 296 if err != nil { 297 t.Fatal(err) 298 } 299 defer w.Close() 300 301 state := raftpb.HardState{Term: 1} 302 if err = w.Save(state, nil); err != nil { 303 t.Fatal(err) 304 } 305 if err = w.cut(); err != nil { 306 t.Fatal(err) 307 } 308 wname := walName(1, 1) 309 if g := filepath.Base(w.tail().Name()); g != wname { 310 t.Errorf("name = %s, want %s", g, wname) 311 } 312 313 es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}} 314 if err = w.Save(raftpb.HardState{}, es); err != nil { 315 t.Fatal(err) 316 } 317 if err = w.cut(); err != nil { 318 t.Fatal(err) 319 } 320 snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} 321 if err = w.SaveSnapshot(snap); err != nil { 322 t.Fatal(err) 323 } 324 wname = walName(2, 2) 325 if g := filepath.Base(w.tail().Name()); g != wname { 326 t.Errorf("name = %s, want %s", g, wname) 327 } 328 329 // check the state in the last WAL 330 // We do check before closing the WAL to ensure that Cut syncs the data 331 // into the disk. 332 f, err := os.Open(filepath.Join(p, wname)) 333 if err != nil { 334 t.Fatal(err) 335 } 336 defer f.Close() 337 nw := &WAL{ 338 decoder: newDecoder(f), 339 start: snap, 340 } 341 _, gst, _, err := nw.ReadAll() 342 if err != nil { 343 t.Fatal(err) 344 } 345 if !reflect.DeepEqual(gst, state) { 346 t.Errorf("state = %+v, want %+v", gst, state) 347 } 348} 349 350func TestSaveWithCut(t *testing.T) { 351 p, err := ioutil.TempDir(t.TempDir(), "waltest") 352 if err != nil { 353 t.Fatal(err) 354 } 355 defer os.RemoveAll(p) 356 357 w, err := Create(zap.NewExample(), p, []byte("metadata")) 358 if err != nil { 359 t.Fatal(err) 360 } 361 362 state := raftpb.HardState{Term: 1} 363 if err = w.Save(state, nil); err != nil { 364 t.Fatal(err) 365 } 366 bigData := make([]byte, 500) 367 strdata := "Hello World!!" 368 copy(bigData, strdata) 369 // set a lower value for SegmentSizeBytes, else the test takes too long to complete 370 restoreLater := SegmentSizeBytes 371 const EntrySize int = 500 372 SegmentSizeBytes = 2 * 1024 373 defer func() { SegmentSizeBytes = restoreLater }() 374 index := uint64(0) 375 for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize { 376 ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}} 377 if err = w.Save(state, ents); err != nil { 378 t.Fatal(err) 379 } 380 index++ 381 } 382 383 w.Close() 384 385 neww, err := Open(zap.NewExample(), p, walpb.Snapshot{}) 386 if err != nil { 387 t.Fatalf("err = %v, want nil", err) 388 } 389 defer neww.Close() 390 wname := walName(1, index) 391 if g := filepath.Base(neww.tail().Name()); g != wname { 392 t.Errorf("name = %s, want %s", g, wname) 393 } 394 395 _, newhardstate, entries, err := neww.ReadAll() 396 if err != nil { 397 t.Fatal(err) 398 } 399 400 if !reflect.DeepEqual(newhardstate, state) { 401 t.Errorf("Hard State = %+v, want %+v", newhardstate, state) 402 } 403 if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) { 404 t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize))) 405 } 406 for _, oneent := range entries { 407 if !bytes.Equal(oneent.Data, bigData) { 408 t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData) 409 } 410 } 411} 412 413func TestRecover(t *testing.T) { 414 p, err := ioutil.TempDir(t.TempDir(), "waltest") 415 if err != nil { 416 t.Fatal(err) 417 } 418 defer os.RemoveAll(p) 419 420 w, err := Create(zap.NewExample(), p, []byte("metadata")) 421 if err != nil { 422 t.Fatal(err) 423 } 424 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 425 t.Fatal(err) 426 } 427 ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} 428 if err = w.Save(raftpb.HardState{}, ents); err != nil { 429 t.Fatal(err) 430 } 431 sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}} 432 for _, s := range sts { 433 if err = w.Save(s, nil); err != nil { 434 t.Fatal(err) 435 } 436 } 437 w.Close() 438 439 if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil { 440 t.Fatal(err) 441 } 442 metadata, state, entries, err := w.ReadAll() 443 if err != nil { 444 t.Fatal(err) 445 } 446 447 if !bytes.Equal(metadata, []byte("metadata")) { 448 t.Errorf("metadata = %s, want %s", metadata, "metadata") 449 } 450 if !reflect.DeepEqual(entries, ents) { 451 t.Errorf("ents = %+v, want %+v", entries, ents) 452 } 453 // only the latest state is recorded 454 s := sts[len(sts)-1] 455 if !reflect.DeepEqual(state, s) { 456 t.Errorf("state = %+v, want %+v", state, s) 457 } 458 w.Close() 459} 460 461func TestSearchIndex(t *testing.T) { 462 tests := []struct { 463 names []string 464 index uint64 465 widx int 466 wok bool 467 }{ 468 { 469 []string{ 470 "0000000000000000-0000000000000000.wal", 471 "0000000000000001-0000000000001000.wal", 472 "0000000000000002-0000000000002000.wal", 473 }, 474 0x1000, 1, true, 475 }, 476 { 477 []string{ 478 "0000000000000001-0000000000004000.wal", 479 "0000000000000002-0000000000003000.wal", 480 "0000000000000003-0000000000005000.wal", 481 }, 482 0x4000, 1, true, 483 }, 484 { 485 []string{ 486 "0000000000000001-0000000000002000.wal", 487 "0000000000000002-0000000000003000.wal", 488 "0000000000000003-0000000000005000.wal", 489 }, 490 0x1000, -1, false, 491 }, 492 } 493 for i, tt := range tests { 494 idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index) 495 if idx != tt.widx { 496 t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx) 497 } 498 if ok != tt.wok { 499 t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) 500 } 501 } 502} 503 504func TestScanWalName(t *testing.T) { 505 tests := []struct { 506 str string 507 wseq, windex uint64 508 wok bool 509 }{ 510 {"0000000000000000-0000000000000000.wal", 0, 0, true}, 511 {"0000000000000000.wal", 0, 0, false}, 512 {"0000000000000000-0000000000000000.snap", 0, 0, false}, 513 } 514 for i, tt := range tests { 515 s, index, err := parseWALName(tt.str) 516 if g := err == nil; g != tt.wok { 517 t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok) 518 } 519 if s != tt.wseq { 520 t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq) 521 } 522 if index != tt.windex { 523 t.Errorf("#%d: index = %d, want %d", i, index, tt.windex) 524 } 525 } 526} 527 528func TestRecoverAfterCut(t *testing.T) { 529 p, err := ioutil.TempDir(t.TempDir(), "waltest") 530 if err != nil { 531 t.Fatal(err) 532 } 533 defer os.RemoveAll(p) 534 535 md, err := Create(zap.NewExample(), p, []byte("metadata")) 536 if err != nil { 537 t.Fatal(err) 538 } 539 for i := 0; i < 10; i++ { 540 if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil { 541 t.Fatal(err) 542 } 543 es := []raftpb.Entry{{Index: uint64(i)}} 544 if err = md.Save(raftpb.HardState{}, es); err != nil { 545 t.Fatal(err) 546 } 547 if err = md.cut(); err != nil { 548 t.Fatal(err) 549 } 550 } 551 md.Close() 552 553 if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil { 554 t.Fatal(err) 555 } 556 557 for i := 0; i < 10; i++ { 558 w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1}) 559 if err != nil { 560 if i <= 4 { 561 if err != ErrFileNotFound { 562 t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound) 563 } 564 } else { 565 t.Errorf("#%d: err = %v, want nil", i, err) 566 } 567 continue 568 } 569 metadata, _, entries, err := w.ReadAll() 570 if err != nil { 571 t.Errorf("#%d: err = %v, want nil", i, err) 572 continue 573 } 574 if !bytes.Equal(metadata, []byte("metadata")) { 575 t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata") 576 } 577 for j, e := range entries { 578 if e.Index != uint64(j+i+1) { 579 t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1) 580 } 581 } 582 w.Close() 583 } 584} 585 586func TestOpenAtUncommittedIndex(t *testing.T) { 587 p, err := ioutil.TempDir(t.TempDir(), "waltest") 588 if err != nil { 589 t.Fatal(err) 590 } 591 defer os.RemoveAll(p) 592 593 w, err := Create(zap.NewExample(), p, nil) 594 if err != nil { 595 t.Fatal(err) 596 } 597 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { 598 t.Fatal(err) 599 } 600 if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil { 601 t.Fatal(err) 602 } 603 w.Close() 604 605 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 606 if err != nil { 607 t.Fatal(err) 608 } 609 // commit up to index 0, try to read index 1 610 if _, _, _, err = w.ReadAll(); err != nil { 611 t.Errorf("err = %v, want nil", err) 612 } 613 w.Close() 614} 615 616// TestOpenForRead tests that OpenForRead can load all files. 617// The tests creates WAL directory, and cut out multiple WAL files. Then 618// it releases the lock of part of data, and excepts that OpenForRead 619// can read out all files even if some are locked for write. 620func TestOpenForRead(t *testing.T) { 621 p, err := ioutil.TempDir(t.TempDir(), "waltest") 622 if err != nil { 623 t.Fatal(err) 624 } 625 defer os.RemoveAll(p) 626 // create WAL 627 w, err := Create(zap.NewExample(), p, nil) 628 if err != nil { 629 t.Fatal(err) 630 } 631 defer w.Close() 632 // make 10 separate files 633 for i := 0; i < 10; i++ { 634 es := []raftpb.Entry{{Index: uint64(i)}} 635 if err = w.Save(raftpb.HardState{}, es); err != nil { 636 t.Fatal(err) 637 } 638 if err = w.cut(); err != nil { 639 t.Fatal(err) 640 } 641 } 642 // release the lock to 5 643 unlockIndex := uint64(5) 644 w.ReleaseLockTo(unlockIndex) 645 646 // All are available for read 647 w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) 648 if err != nil { 649 t.Fatal(err) 650 } 651 defer w2.Close() 652 _, _, ents, err := w2.ReadAll() 653 if err != nil { 654 t.Fatalf("err = %v, want nil", err) 655 } 656 if g := ents[len(ents)-1].Index; g != 9 { 657 t.Errorf("last index read = %d, want %d", g, 9) 658 } 659} 660 661func TestOpenWithMaxIndex(t *testing.T) { 662 p, err := ioutil.TempDir(t.TempDir(), "waltest") 663 if err != nil { 664 t.Fatal(err) 665 } 666 defer os.RemoveAll(p) 667 // create WAL 668 w, err := Create(zap.NewExample(), p, nil) 669 if err != nil { 670 t.Fatal(err) 671 } 672 defer w.Close() 673 674 es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}} 675 if err = w.Save(raftpb.HardState{}, es); err != nil { 676 t.Fatal(err) 677 } 678 w.Close() 679 680 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 681 if err != nil { 682 t.Fatal(err) 683 } 684 _, _, _, err = w.ReadAll() 685 if err == nil || err != ErrSliceOutOfRange { 686 t.Fatalf("err = %v, want ErrSliceOutOfRange", err) 687 } 688} 689 690func TestSaveEmpty(t *testing.T) { 691 var buf bytes.Buffer 692 var est raftpb.HardState 693 w := WAL{ 694 encoder: newEncoder(&buf, 0, 0), 695 } 696 if err := w.saveState(&est); err != nil { 697 t.Errorf("err = %v, want nil", err) 698 } 699 if len(buf.Bytes()) != 0 { 700 t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes())) 701 } 702} 703 704func TestReleaseLockTo(t *testing.T) { 705 p, err := ioutil.TempDir(t.TempDir(), "waltest") 706 if err != nil { 707 t.Fatal(err) 708 } 709 defer os.RemoveAll(p) 710 // create WAL 711 w, err := Create(zap.NewExample(), p, nil) 712 defer func() { 713 if err = w.Close(); err != nil { 714 t.Fatal(err) 715 } 716 }() 717 if err != nil { 718 t.Fatal(err) 719 } 720 721 // release nothing if no files 722 err = w.ReleaseLockTo(10) 723 if err != nil { 724 t.Errorf("err = %v, want nil", err) 725 } 726 727 // make 10 separate files 728 for i := 0; i < 10; i++ { 729 es := []raftpb.Entry{{Index: uint64(i)}} 730 if err = w.Save(raftpb.HardState{}, es); err != nil { 731 t.Fatal(err) 732 } 733 if err = w.cut(); err != nil { 734 t.Fatal(err) 735 } 736 } 737 // release the lock to 5 738 unlockIndex := uint64(5) 739 w.ReleaseLockTo(unlockIndex) 740 741 // expected remaining are 4,5,6,7,8,9,10 742 if len(w.locks) != 7 { 743 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7) 744 } 745 for i, l := range w.locks { 746 var lockIndex uint64 747 _, lockIndex, err = parseWALName(filepath.Base(l.Name())) 748 if err != nil { 749 t.Fatal(err) 750 } 751 752 if lockIndex != uint64(i+4) { 753 t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4)) 754 } 755 } 756 757 // release the lock to 15 758 unlockIndex = uint64(15) 759 w.ReleaseLockTo(unlockIndex) 760 761 // expected remaining is 10 762 if len(w.locks) != 1 { 763 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1) 764 } 765 _, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name())) 766 if err != nil { 767 t.Fatal(err) 768 } 769 770 if lockIndex != uint64(10) { 771 t.Errorf("lockindex = %d, want %d", lockIndex, 10) 772 } 773} 774 775// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space. 776func TestTailWriteNoSlackSpace(t *testing.T) { 777 p, err := ioutil.TempDir(t.TempDir(), "waltest") 778 if err != nil { 779 t.Fatal(err) 780 } 781 defer os.RemoveAll(p) 782 783 // create initial WAL 784 w, err := Create(zap.NewExample(), p, []byte("metadata")) 785 if err != nil { 786 t.Fatal(err) 787 } 788 // write some entries 789 for i := 1; i <= 5; i++ { 790 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}} 791 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil { 792 t.Fatal(err) 793 } 794 } 795 // get rid of slack space by truncating file 796 off, serr := w.tail().Seek(0, io.SeekCurrent) 797 if serr != nil { 798 t.Fatal(serr) 799 } 800 if terr := w.tail().Truncate(off); terr != nil { 801 t.Fatal(terr) 802 } 803 w.Close() 804 805 // open, write more 806 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 807 if err != nil { 808 t.Fatal(err) 809 } 810 _, _, ents, rerr := w.ReadAll() 811 if rerr != nil { 812 t.Fatal(rerr) 813 } 814 if len(ents) != 5 { 815 t.Fatalf("got entries %+v, expected 5 entries", ents) 816 } 817 // write more entries 818 for i := 6; i <= 10; i++ { 819 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}} 820 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil { 821 t.Fatal(err) 822 } 823 } 824 w.Close() 825 826 // confirm all writes 827 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 828 if err != nil { 829 t.Fatal(err) 830 } 831 _, _, ents, rerr = w.ReadAll() 832 if rerr != nil { 833 t.Fatal(rerr) 834 } 835 if len(ents) != 10 { 836 t.Fatalf("got entries %+v, expected 10 entries", ents) 837 } 838 w.Close() 839} 840 841// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart 842func TestRestartCreateWal(t *testing.T) { 843 p, err := ioutil.TempDir(t.TempDir(), "waltest") 844 if err != nil { 845 t.Fatal(err) 846 } 847 defer os.RemoveAll(p) 848 849 // make temporary directory so it looks like initialization is interrupted 850 tmpdir := filepath.Clean(p) + ".tmp" 851 if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil { 852 t.Fatal(err) 853 } 854 if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil { 855 t.Fatal(err) 856 } 857 858 w, werr := Create(zap.NewExample(), p, []byte("abc")) 859 if werr != nil { 860 t.Fatal(werr) 861 } 862 w.Close() 863 if Exist(tmpdir) { 864 t.Fatalf("got %q exists, expected it to not exist", tmpdir) 865 } 866 867 if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil { 868 t.Fatal(err) 869 } 870 defer w.Close() 871 872 if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" { 873 t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc") 874 } 875} 876 877// TestOpenOnTornWrite ensures that entries past the torn write are truncated. 878func TestOpenOnTornWrite(t *testing.T) { 879 maxEntries := 40 880 clobberIdx := 20 881 overwriteEntries := 5 882 883 p, err := ioutil.TempDir(t.TempDir(), "waltest") 884 if err != nil { 885 t.Fatal(err) 886 } 887 defer os.RemoveAll(p) 888 w, err := Create(zap.NewExample(), p, nil) 889 defer func() { 890 if err = w.Close(); err != nil && err != os.ErrInvalid { 891 t.Fatal(err) 892 } 893 }() 894 if err != nil { 895 t.Fatal(err) 896 } 897 898 // get offset of end of each saved entry 899 offsets := make([]int64, maxEntries) 900 for i := range offsets { 901 es := []raftpb.Entry{{Index: uint64(i)}} 902 if err = w.Save(raftpb.HardState{}, es); err != nil { 903 t.Fatal(err) 904 } 905 if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil { 906 t.Fatal(err) 907 } 908 } 909 910 fn := filepath.Join(p, filepath.Base(w.tail().Name())) 911 w.Close() 912 913 // clobber some entry with 0's to simulate a torn write 914 f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode) 915 if ferr != nil { 916 t.Fatal(ferr) 917 } 918 defer f.Close() 919 _, err = f.Seek(offsets[clobberIdx], io.SeekStart) 920 if err != nil { 921 t.Fatal(err) 922 } 923 zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx]) 924 _, err = f.Write(zeros) 925 if err != nil { 926 t.Fatal(err) 927 } 928 f.Close() 929 930 w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) 931 if err != nil { 932 t.Fatal(err) 933 } 934 // seek up to clobbered entry 935 _, _, _, err = w.ReadAll() 936 if err != nil { 937 t.Fatal(err) 938 } 939 940 // write a few entries past the clobbered entry 941 for i := 0; i < overwriteEntries; i++ { 942 // Index is different from old, truncated entries 943 es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}} 944 if err = w.Save(raftpb.HardState{}, es); err != nil { 945 t.Fatal(err) 946 } 947 } 948 w.Close() 949 950 // read back the entries, confirm number of entries matches expectation 951 w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) 952 if err != nil { 953 t.Fatal(err) 954 } 955 956 _, _, ents, rerr := w.ReadAll() 957 if rerr != nil { 958 // CRC error? the old entries were likely never truncated away 959 t.Fatal(rerr) 960 } 961 wEntries := (clobberIdx - 1) + overwriteEntries 962 if len(ents) != wEntries { 963 t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents)) 964 } 965} 966 967func TestRenameFail(t *testing.T) { 968 p, err := ioutil.TempDir(t.TempDir(), "waltest") 969 if err != nil { 970 t.Fatal(err) 971 } 972 defer os.RemoveAll(p) 973 974 oldSegmentSizeBytes := SegmentSizeBytes 975 defer func() { 976 SegmentSizeBytes = oldSegmentSizeBytes 977 }() 978 SegmentSizeBytes = math.MaxInt64 979 980 tp, terr := ioutil.TempDir(t.TempDir(), "waltest") 981 if terr != nil { 982 t.Fatal(terr) 983 } 984 os.RemoveAll(tp) 985 986 w := &WAL{ 987 lg: zap.NewExample(), 988 dir: p, 989 } 990 w2, werr := w.renameWAL(tp) 991 if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory' 992 t.Fatalf("expected error, got %v", werr) 993 } 994} 995 996// TestReadAllFail ensure ReadAll error if used without opening the WAL 997func TestReadAllFail(t *testing.T) { 998 dir, err := ioutil.TempDir(t.TempDir(), "waltest") 999 if err != nil { 1000 t.Fatal(err) 1001 } 1002 defer os.RemoveAll(dir) 1003 1004 // create initial WAL 1005 f, err := Create(zap.NewExample(), dir, []byte("metadata")) 1006 if err != nil { 1007 t.Fatal(err) 1008 } 1009 f.Close() 1010 // try to read without opening the WAL 1011 _, _, _, err = f.ReadAll() 1012 if err == nil || err != ErrDecoderNotFound { 1013 t.Fatalf("err = %v, want ErrDecoderNotFound", err) 1014 } 1015} 1016 1017// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting 1018// for hardstate 1019func TestValidSnapshotEntries(t *testing.T) { 1020 p, err := ioutil.TempDir(t.TempDir(), "waltest") 1021 if err != nil { 1022 t.Fatal(err) 1023 } 1024 defer os.RemoveAll(p) 1025 snap0 := walpb.Snapshot{} 1026 snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState} 1027 state1 := raftpb.HardState{Commit: 1, Term: 1} 1028 snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} 1029 snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState} 1030 state2 := raftpb.HardState{Commit: 3, Term: 2} 1031 snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState} // will be orphaned since the last committed entry will be snap3 1032 func() { 1033 w, err := Create(zap.NewExample(), p, nil) 1034 if err != nil { 1035 t.Fatal(err) 1036 } 1037 defer w.Close() 1038 1039 // snap0 is implicitly created at index 0, term 0 1040 if err = w.SaveSnapshot(snap1); err != nil { 1041 t.Fatal(err) 1042 } 1043 if err = w.Save(state1, nil); err != nil { 1044 t.Fatal(err) 1045 } 1046 if err = w.SaveSnapshot(snap2); err != nil { 1047 t.Fatal(err) 1048 } 1049 if err = w.SaveSnapshot(snap3); err != nil { 1050 t.Fatal(err) 1051 } 1052 if err = w.Save(state2, nil); err != nil { 1053 t.Fatal(err) 1054 } 1055 if err = w.SaveSnapshot(snap4); err != nil { 1056 t.Fatal(err) 1057 } 1058 }() 1059 walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p) 1060 if err != nil { 1061 t.Fatal(err) 1062 } 1063 expected := []walpb.Snapshot{snap0, snap1, snap2, snap3} 1064 if !reflect.DeepEqual(walSnaps, expected) { 1065 t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps) 1066 } 1067} 1068 1069// TestValidSnapshotEntriesAfterPurgeWal ensure that there are many wal files, and after cleaning the first wal file, 1070// it can work well. 1071func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) { 1072 oldSegmentSizeBytes := SegmentSizeBytes 1073 SegmentSizeBytes = 64 1074 defer func() { 1075 SegmentSizeBytes = oldSegmentSizeBytes 1076 }() 1077 p, err := ioutil.TempDir(t.TempDir(), "waltest") 1078 if err != nil { 1079 t.Fatal(err) 1080 } 1081 defer os.RemoveAll(p) 1082 snap0 := walpb.Snapshot{} 1083 snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState} 1084 state1 := raftpb.HardState{Commit: 1, Term: 1} 1085 snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState} 1086 snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState} 1087 state2 := raftpb.HardState{Commit: 3, Term: 2} 1088 func() { 1089 w, err := Create(zap.NewExample(), p, nil) 1090 if err != nil { 1091 t.Fatal(err) 1092 } 1093 defer w.Close() 1094 1095 // snap0 is implicitly created at index 0, term 0 1096 if err = w.SaveSnapshot(snap1); err != nil { 1097 t.Fatal(err) 1098 } 1099 if err = w.Save(state1, nil); err != nil { 1100 t.Fatal(err) 1101 } 1102 if err = w.SaveSnapshot(snap2); err != nil { 1103 t.Fatal(err) 1104 } 1105 if err = w.SaveSnapshot(snap3); err != nil { 1106 t.Fatal(err) 1107 } 1108 for i := 0; i < 128; i++ { 1109 if err = w.Save(state2, nil); err != nil { 1110 t.Fatal(err) 1111 } 1112 } 1113 1114 }() 1115 files, _, err := selectWALFiles(nil, p, snap0) 1116 if err != nil { 1117 t.Fatal(err) 1118 } 1119 os.Remove(p + "/" + files[0]) 1120 _, err = ValidSnapshotEntries(zap.NewExample(), p) 1121 if err != nil { 1122 t.Fatal(err) 1123 } 1124} 1125