1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package wal
16
17import (
18	"bytes"
19	"fmt"
20	"io"
21	"io/ioutil"
22	"math"
23	"os"
24	"path"
25	"path/filepath"
26	"reflect"
27	"regexp"
28	"testing"
29
30	"go.etcd.io/etcd/pkg/fileutil"
31	"go.etcd.io/etcd/pkg/pbutil"
32	"go.etcd.io/etcd/raft/raftpb"
33	"go.etcd.io/etcd/wal/walpb"
34
35	"go.uber.org/zap"
36)
37
38func TestNew(t *testing.T) {
39	p, err := ioutil.TempDir(os.TempDir(), "waltest")
40	if err != nil {
41		t.Fatal(err)
42	}
43	defer os.RemoveAll(p)
44
45	w, err := Create(zap.NewExample(), p, []byte("somedata"))
46	if err != nil {
47		t.Fatalf("err = %v, want nil", err)
48	}
49	if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
50		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
51	}
52	defer w.Close()
53
54	// file is preallocated to segment size; only read data written by wal
55	off, err := w.tail().Seek(0, io.SeekCurrent)
56	if err != nil {
57		t.Fatal(err)
58	}
59	gd := make([]byte, off)
60	f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
61	if err != nil {
62		t.Fatal(err)
63	}
64	defer f.Close()
65	if _, err = io.ReadFull(f, gd); err != nil {
66		t.Fatalf("err = %v, want nil", err)
67	}
68
69	var wb bytes.Buffer
70	e := newEncoder(&wb, 0, 0)
71	err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
72	if err != nil {
73		t.Fatalf("err = %v, want nil", err)
74	}
75	err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
76	if err != nil {
77		t.Fatalf("err = %v, want nil", err)
78	}
79	r := &walpb.Record{
80		Type: snapshotType,
81		Data: pbutil.MustMarshal(&walpb.Snapshot{}),
82	}
83	if err = e.encode(r); err != nil {
84		t.Fatalf("err = %v, want nil", err)
85	}
86	e.flush()
87	if !bytes.Equal(gd, wb.Bytes()) {
88		t.Errorf("data = %v, want %v", gd, wb.Bytes())
89	}
90}
91
92func TestCreateFailFromPollutedDir(t *testing.T) {
93	p, err := ioutil.TempDir(os.TempDir(), "waltest")
94	if err != nil {
95		t.Fatal(err)
96	}
97	defer os.RemoveAll(p)
98	ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary)
99
100	_, err = Create(zap.NewExample(), p, []byte("data"))
101	if err != os.ErrExist {
102		t.Fatalf("expected %v, got %v", os.ErrExist, err)
103	}
104}
105
106func TestWalCleanup(t *testing.T) {
107	testRoot, err := ioutil.TempDir(os.TempDir(), "waltestroot")
108	if err != nil {
109		t.Fatal(err)
110	}
111	p, err := ioutil.TempDir(testRoot, "waltest")
112	if err != nil {
113		t.Fatal(err)
114	}
115	defer os.RemoveAll(testRoot)
116
117	logger := zap.NewExample()
118	w, err := Create(logger, p, []byte(""))
119	if err != nil {
120		t.Fatalf("err = %v, want nil", err)
121	}
122	w.cleanupWAL(logger)
123	fnames, err := fileutil.ReadDir(testRoot)
124	if err != nil {
125		t.Fatalf("err = %v, want nil", err)
126	}
127	if len(fnames) != 1 {
128		t.Fatalf("expected 1 file under %v, got %v", testRoot, len(fnames))
129	}
130	pattern := fmt.Sprintf(`%s.broken\.[\d]{8}\.[\d]{6}\.[\d]{1,6}?`, filepath.Base(p))
131	match, _ := regexp.MatchString(pattern, fnames[0])
132	if !match {
133		t.Errorf("match = false, expected true for %v with pattern %v", fnames[0], pattern)
134	}
135}
136
137func TestCreateFailFromNoSpaceLeft(t *testing.T) {
138	p, err := ioutil.TempDir(os.TempDir(), "waltest")
139	if err != nil {
140		t.Fatal(err)
141	}
142	defer os.RemoveAll(p)
143
144	oldSegmentSizeBytes := SegmentSizeBytes
145	defer func() {
146		SegmentSizeBytes = oldSegmentSizeBytes
147	}()
148	SegmentSizeBytes = math.MaxInt64
149
150	_, err = Create(zap.NewExample(), p, []byte("data"))
151	if err == nil { // no space left on device
152		t.Fatalf("expected error 'no space left on device', got nil")
153	}
154}
155
156func TestNewForInitedDir(t *testing.T) {
157	p, err := ioutil.TempDir(os.TempDir(), "waltest")
158	if err != nil {
159		t.Fatal(err)
160	}
161	defer os.RemoveAll(p)
162
163	os.Create(filepath.Join(p, walName(0, 0)))
164	if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist {
165		t.Errorf("err = %v, want %v", err, os.ErrExist)
166	}
167}
168
169func TestOpenAtIndex(t *testing.T) {
170	dir, err := ioutil.TempDir(os.TempDir(), "waltest")
171	if err != nil {
172		t.Fatal(err)
173	}
174	defer os.RemoveAll(dir)
175
176	f, err := os.Create(filepath.Join(dir, walName(0, 0)))
177	if err != nil {
178		t.Fatal(err)
179	}
180	f.Close()
181
182	w, err := Open(zap.NewExample(), dir, walpb.Snapshot{})
183	if err != nil {
184		t.Fatalf("err = %v, want nil", err)
185	}
186	if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
187		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
188	}
189	if w.seq() != 0 {
190		t.Errorf("seq = %d, want %d", w.seq(), 0)
191	}
192	w.Close()
193
194	wname := walName(2, 10)
195	f, err = os.Create(filepath.Join(dir, wname))
196	if err != nil {
197		t.Fatal(err)
198	}
199	f.Close()
200
201	w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5})
202	if err != nil {
203		t.Fatalf("err = %v, want nil", err)
204	}
205	if g := filepath.Base(w.tail().Name()); g != wname {
206		t.Errorf("name = %+v, want %+v", g, wname)
207	}
208	if w.seq() != 2 {
209		t.Errorf("seq = %d, want %d", w.seq(), 2)
210	}
211	w.Close()
212
213	emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
214	if err != nil {
215		t.Fatal(err)
216	}
217	defer os.RemoveAll(emptydir)
218	if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
219		t.Errorf("err = %v, want %v", err, ErrFileNotFound)
220	}
221}
222
223// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted.
224// The test creates a WAL directory and cuts out multiple WAL files. Then
225// it corrupts one of the files by completely truncating it.
226func TestVerify(t *testing.T) {
227	walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
228	if err != nil {
229		t.Fatal(err)
230	}
231	defer os.RemoveAll(walDir)
232
233	// create WAL
234	w, err := Create(zap.NewExample(), walDir, nil)
235	if err != nil {
236		t.Fatal(err)
237	}
238	defer w.Close()
239
240	// make 5 separate files
241	for i := 0; i < 5; i++ {
242		es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(rune(i+1)))}}
243		if err = w.Save(raftpb.HardState{}, es); err != nil {
244			t.Fatal(err)
245		}
246		if err = w.cut(); err != nil {
247			t.Fatal(err)
248		}
249	}
250
251	// to verify the WAL is not corrupted at this point
252	err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
253	if err != nil {
254		t.Errorf("expected a nil error, got %v", err)
255	}
256
257	walFiles, err := ioutil.ReadDir(walDir)
258	if err != nil {
259		t.Fatal(err)
260	}
261
262	// corrupt the WAL by truncating one of the WAL files completely
263	err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
264	if err != nil {
265		t.Fatal(err)
266	}
267
268	err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
269	if err == nil {
270		t.Error("expected a non-nil error, got nil")
271	}
272}
273
274// TODO: split it into smaller tests for better readability
275func TestCut(t *testing.T) {
276	p, err := ioutil.TempDir(os.TempDir(), "waltest")
277	if err != nil {
278		t.Fatal(err)
279	}
280	defer os.RemoveAll(p)
281
282	w, err := Create(zap.NewExample(), p, nil)
283	if err != nil {
284		t.Fatal(err)
285	}
286	defer w.Close()
287
288	state := raftpb.HardState{Term: 1}
289	if err = w.Save(state, nil); err != nil {
290		t.Fatal(err)
291	}
292	if err = w.cut(); err != nil {
293		t.Fatal(err)
294	}
295	wname := walName(1, 1)
296	if g := filepath.Base(w.tail().Name()); g != wname {
297		t.Errorf("name = %s, want %s", g, wname)
298	}
299
300	es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}}
301	if err = w.Save(raftpb.HardState{}, es); err != nil {
302		t.Fatal(err)
303	}
304	if err = w.cut(); err != nil {
305		t.Fatal(err)
306	}
307	snap := walpb.Snapshot{Index: 2, Term: 1}
308	if err = w.SaveSnapshot(snap); err != nil {
309		t.Fatal(err)
310	}
311	wname = walName(2, 2)
312	if g := filepath.Base(w.tail().Name()); g != wname {
313		t.Errorf("name = %s, want %s", g, wname)
314	}
315
316	// check the state in the last WAL
317	// We do check before closing the WAL to ensure that Cut syncs the data
318	// into the disk.
319	f, err := os.Open(filepath.Join(p, wname))
320	if err != nil {
321		t.Fatal(err)
322	}
323	defer f.Close()
324	nw := &WAL{
325		decoder: newDecoder(f),
326		start:   snap,
327	}
328	_, gst, _, err := nw.ReadAll()
329	if err != nil {
330		t.Fatal(err)
331	}
332	if !reflect.DeepEqual(gst, state) {
333		t.Errorf("state = %+v, want %+v", gst, state)
334	}
335}
336
337func TestSaveWithCut(t *testing.T) {
338	p, err := ioutil.TempDir(os.TempDir(), "waltest")
339	if err != nil {
340		t.Fatal(err)
341	}
342	defer os.RemoveAll(p)
343
344	w, err := Create(zap.NewExample(), p, []byte("metadata"))
345	if err != nil {
346		t.Fatal(err)
347	}
348
349	state := raftpb.HardState{Term: 1}
350	if err = w.Save(state, nil); err != nil {
351		t.Fatal(err)
352	}
353	bigData := make([]byte, 500)
354	strdata := "Hello World!!"
355	copy(bigData, strdata)
356	// set a lower value for SegmentSizeBytes, else the test takes too long to complete
357	restoreLater := SegmentSizeBytes
358	const EntrySize int = 500
359	SegmentSizeBytes = 2 * 1024
360	defer func() { SegmentSizeBytes = restoreLater }()
361	index := uint64(0)
362	for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize {
363		ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}}
364		if err = w.Save(state, ents); err != nil {
365			t.Fatal(err)
366		}
367		index++
368	}
369
370	w.Close()
371
372	neww, err := Open(zap.NewExample(), p, walpb.Snapshot{})
373	if err != nil {
374		t.Fatalf("err = %v, want nil", err)
375	}
376	defer neww.Close()
377	wname := walName(1, index)
378	if g := filepath.Base(neww.tail().Name()); g != wname {
379		t.Errorf("name = %s, want %s", g, wname)
380	}
381
382	_, newhardstate, entries, err := neww.ReadAll()
383	if err != nil {
384		t.Fatal(err)
385	}
386
387	if !reflect.DeepEqual(newhardstate, state) {
388		t.Errorf("Hard State = %+v, want %+v", newhardstate, state)
389	}
390	if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) {
391		t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize)))
392	}
393	for _, oneent := range entries {
394		if !bytes.Equal(oneent.Data, bigData) {
395			t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData)
396		}
397	}
398}
399
400func TestRecover(t *testing.T) {
401	p, err := ioutil.TempDir(os.TempDir(), "waltest")
402	if err != nil {
403		t.Fatal(err)
404	}
405	defer os.RemoveAll(p)
406
407	w, err := Create(zap.NewExample(), p, []byte("metadata"))
408	if err != nil {
409		t.Fatal(err)
410	}
411	if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
412		t.Fatal(err)
413	}
414	ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
415	if err = w.Save(raftpb.HardState{}, ents); err != nil {
416		t.Fatal(err)
417	}
418	sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
419	for _, s := range sts {
420		if err = w.Save(s, nil); err != nil {
421			t.Fatal(err)
422		}
423	}
424	w.Close()
425
426	if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
427		t.Fatal(err)
428	}
429	metadata, state, entries, err := w.ReadAll()
430	if err != nil {
431		t.Fatal(err)
432	}
433
434	if !bytes.Equal(metadata, []byte("metadata")) {
435		t.Errorf("metadata = %s, want %s", metadata, "metadata")
436	}
437	if !reflect.DeepEqual(entries, ents) {
438		t.Errorf("ents = %+v, want %+v", entries, ents)
439	}
440	// only the latest state is recorded
441	s := sts[len(sts)-1]
442	if !reflect.DeepEqual(state, s) {
443		t.Errorf("state = %+v, want %+v", state, s)
444	}
445	w.Close()
446}
447
448func TestSearchIndex(t *testing.T) {
449	tests := []struct {
450		names []string
451		index uint64
452		widx  int
453		wok   bool
454	}{
455		{
456			[]string{
457				"0000000000000000-0000000000000000.wal",
458				"0000000000000001-0000000000001000.wal",
459				"0000000000000002-0000000000002000.wal",
460			},
461			0x1000, 1, true,
462		},
463		{
464			[]string{
465				"0000000000000001-0000000000004000.wal",
466				"0000000000000002-0000000000003000.wal",
467				"0000000000000003-0000000000005000.wal",
468			},
469			0x4000, 1, true,
470		},
471		{
472			[]string{
473				"0000000000000001-0000000000002000.wal",
474				"0000000000000002-0000000000003000.wal",
475				"0000000000000003-0000000000005000.wal",
476			},
477			0x1000, -1, false,
478		},
479	}
480	for i, tt := range tests {
481		idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index)
482		if idx != tt.widx {
483			t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
484		}
485		if ok != tt.wok {
486			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
487		}
488	}
489}
490
491func TestScanWalName(t *testing.T) {
492	tests := []struct {
493		str          string
494		wseq, windex uint64
495		wok          bool
496	}{
497		{"0000000000000000-0000000000000000.wal", 0, 0, true},
498		{"0000000000000000.wal", 0, 0, false},
499		{"0000000000000000-0000000000000000.snap", 0, 0, false},
500	}
501	for i, tt := range tests {
502		s, index, err := parseWALName(tt.str)
503		if g := err == nil; g != tt.wok {
504			t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
505		}
506		if s != tt.wseq {
507			t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq)
508		}
509		if index != tt.windex {
510			t.Errorf("#%d: index = %d, want %d", i, index, tt.windex)
511		}
512	}
513}
514
515func TestRecoverAfterCut(t *testing.T) {
516	p, err := ioutil.TempDir(os.TempDir(), "waltest")
517	if err != nil {
518		t.Fatal(err)
519	}
520	defer os.RemoveAll(p)
521
522	md, err := Create(zap.NewExample(), p, []byte("metadata"))
523	if err != nil {
524		t.Fatal(err)
525	}
526	for i := 0; i < 10; i++ {
527		if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
528			t.Fatal(err)
529		}
530		es := []raftpb.Entry{{Index: uint64(i)}}
531		if err = md.Save(raftpb.HardState{}, es); err != nil {
532			t.Fatal(err)
533		}
534		if err = md.cut(); err != nil {
535			t.Fatal(err)
536		}
537	}
538	md.Close()
539
540	if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
541		t.Fatal(err)
542	}
543
544	for i := 0; i < 10; i++ {
545		w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
546		if err != nil {
547			if i <= 4 {
548				if err != ErrFileNotFound {
549					t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound)
550				}
551			} else {
552				t.Errorf("#%d: err = %v, want nil", i, err)
553			}
554			continue
555		}
556		metadata, _, entries, err := w.ReadAll()
557		if err != nil {
558			t.Errorf("#%d: err = %v, want nil", i, err)
559			continue
560		}
561		if !bytes.Equal(metadata, []byte("metadata")) {
562			t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
563		}
564		for j, e := range entries {
565			if e.Index != uint64(j+i+1) {
566				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
567			}
568		}
569		w.Close()
570	}
571}
572
573func TestOpenAtUncommittedIndex(t *testing.T) {
574	p, err := ioutil.TempDir(os.TempDir(), "waltest")
575	if err != nil {
576		t.Fatal(err)
577	}
578	defer os.RemoveAll(p)
579
580	w, err := Create(zap.NewExample(), p, nil)
581	if err != nil {
582		t.Fatal(err)
583	}
584	if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
585		t.Fatal(err)
586	}
587	if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil {
588		t.Fatal(err)
589	}
590	w.Close()
591
592	w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
593	if err != nil {
594		t.Fatal(err)
595	}
596	// commit up to index 0, try to read index 1
597	if _, _, _, err = w.ReadAll(); err != nil {
598		t.Errorf("err = %v, want nil", err)
599	}
600	w.Close()
601}
602
603// TestOpenForRead tests that OpenForRead can load all files.
604// The tests creates WAL directory, and cut out multiple WAL files. Then
605// it releases the lock of part of data, and excepts that OpenForRead
606// can read out all files even if some are locked for write.
607func TestOpenForRead(t *testing.T) {
608	p, err := ioutil.TempDir(os.TempDir(), "waltest")
609	if err != nil {
610		t.Fatal(err)
611	}
612	defer os.RemoveAll(p)
613	// create WAL
614	w, err := Create(zap.NewExample(), p, nil)
615	if err != nil {
616		t.Fatal(err)
617	}
618	defer w.Close()
619	// make 10 separate files
620	for i := 0; i < 10; i++ {
621		es := []raftpb.Entry{{Index: uint64(i)}}
622		if err = w.Save(raftpb.HardState{}, es); err != nil {
623			t.Fatal(err)
624		}
625		if err = w.cut(); err != nil {
626			t.Fatal(err)
627		}
628	}
629	// release the lock to 5
630	unlockIndex := uint64(5)
631	w.ReleaseLockTo(unlockIndex)
632
633	// All are available for read
634	w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
635	if err != nil {
636		t.Fatal(err)
637	}
638	defer w2.Close()
639	_, _, ents, err := w2.ReadAll()
640	if err != nil {
641		t.Fatalf("err = %v, want nil", err)
642	}
643	if g := ents[len(ents)-1].Index; g != 9 {
644		t.Errorf("last index read = %d, want %d", g, 9)
645	}
646}
647
648func TestOpenWithMaxIndex(t *testing.T) {
649	p, err := ioutil.TempDir(os.TempDir(), "waltest")
650	if err != nil {
651		t.Fatal(err)
652	}
653	defer os.RemoveAll(p)
654	// create WAL
655	w, err := Create(zap.NewExample(), p, nil)
656	if err != nil {
657		t.Fatal(err)
658	}
659	defer w.Close()
660
661	es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}}
662	if err = w.Save(raftpb.HardState{}, es); err != nil {
663		t.Fatal(err)
664	}
665	w.Close()
666
667	w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
668	if err != nil {
669		t.Fatal(err)
670	}
671	_, _, _, err = w.ReadAll()
672	if err == nil || err != ErrSliceOutOfRange {
673		t.Fatalf("err = %v, want ErrSliceOutOfRange", err)
674	}
675}
676
677func TestSaveEmpty(t *testing.T) {
678	var buf bytes.Buffer
679	var est raftpb.HardState
680	w := WAL{
681		encoder: newEncoder(&buf, 0, 0),
682	}
683	if err := w.saveState(&est); err != nil {
684		t.Errorf("err = %v, want nil", err)
685	}
686	if len(buf.Bytes()) != 0 {
687		t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
688	}
689}
690
691func TestReleaseLockTo(t *testing.T) {
692	p, err := ioutil.TempDir(os.TempDir(), "waltest")
693	if err != nil {
694		t.Fatal(err)
695	}
696	defer os.RemoveAll(p)
697	// create WAL
698	w, err := Create(zap.NewExample(), p, nil)
699	defer func() {
700		if err = w.Close(); err != nil {
701			t.Fatal(err)
702		}
703	}()
704	if err != nil {
705		t.Fatal(err)
706	}
707
708	// release nothing if no files
709	err = w.ReleaseLockTo(10)
710	if err != nil {
711		t.Errorf("err = %v, want nil", err)
712	}
713
714	// make 10 separate files
715	for i := 0; i < 10; i++ {
716		es := []raftpb.Entry{{Index: uint64(i)}}
717		if err = w.Save(raftpb.HardState{}, es); err != nil {
718			t.Fatal(err)
719		}
720		if err = w.cut(); err != nil {
721			t.Fatal(err)
722		}
723	}
724	// release the lock to 5
725	unlockIndex := uint64(5)
726	w.ReleaseLockTo(unlockIndex)
727
728	// expected remaining are 4,5,6,7,8,9,10
729	if len(w.locks) != 7 {
730		t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
731	}
732	for i, l := range w.locks {
733		var lockIndex uint64
734		_, lockIndex, err = parseWALName(filepath.Base(l.Name()))
735		if err != nil {
736			t.Fatal(err)
737		}
738
739		if lockIndex != uint64(i+4) {
740			t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
741		}
742	}
743
744	// release the lock to 15
745	unlockIndex = uint64(15)
746	w.ReleaseLockTo(unlockIndex)
747
748	// expected remaining is 10
749	if len(w.locks) != 1 {
750		t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
751	}
752	_, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name()))
753	if err != nil {
754		t.Fatal(err)
755	}
756
757	if lockIndex != uint64(10) {
758		t.Errorf("lockindex = %d, want %d", lockIndex, 10)
759	}
760}
761
762// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
763func TestTailWriteNoSlackSpace(t *testing.T) {
764	p, err := ioutil.TempDir(os.TempDir(), "waltest")
765	if err != nil {
766		t.Fatal(err)
767	}
768	defer os.RemoveAll(p)
769
770	// create initial WAL
771	w, err := Create(zap.NewExample(), p, []byte("metadata"))
772	if err != nil {
773		t.Fatal(err)
774	}
775	// write some entries
776	for i := 1; i <= 5; i++ {
777		es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
778		if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
779			t.Fatal(err)
780		}
781	}
782	// get rid of slack space by truncating file
783	off, serr := w.tail().Seek(0, io.SeekCurrent)
784	if serr != nil {
785		t.Fatal(serr)
786	}
787	if terr := w.tail().Truncate(off); terr != nil {
788		t.Fatal(terr)
789	}
790	w.Close()
791
792	// open, write more
793	w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
794	if err != nil {
795		t.Fatal(err)
796	}
797	_, _, ents, rerr := w.ReadAll()
798	if rerr != nil {
799		t.Fatal(rerr)
800	}
801	if len(ents) != 5 {
802		t.Fatalf("got entries %+v, expected 5 entries", ents)
803	}
804	// write more entries
805	for i := 6; i <= 10; i++ {
806		es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
807		if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
808			t.Fatal(err)
809		}
810	}
811	w.Close()
812
813	// confirm all writes
814	w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
815	if err != nil {
816		t.Fatal(err)
817	}
818	_, _, ents, rerr = w.ReadAll()
819	if rerr != nil {
820		t.Fatal(rerr)
821	}
822	if len(ents) != 10 {
823		t.Fatalf("got entries %+v, expected 10 entries", ents)
824	}
825	w.Close()
826}
827
828// TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
829func TestRestartCreateWal(t *testing.T) {
830	p, err := ioutil.TempDir(os.TempDir(), "waltest")
831	if err != nil {
832		t.Fatal(err)
833	}
834	defer os.RemoveAll(p)
835
836	// make temporary directory so it looks like initialization is interrupted
837	tmpdir := filepath.Clean(p) + ".tmp"
838	if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
839		t.Fatal(err)
840	}
841	if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
842		t.Fatal(err)
843	}
844
845	w, werr := Create(zap.NewExample(), p, []byte("abc"))
846	if werr != nil {
847		t.Fatal(werr)
848	}
849	w.Close()
850	if Exist(tmpdir) {
851		t.Fatalf("got %q exists, expected it to not exist", tmpdir)
852	}
853
854	if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
855		t.Fatal(err)
856	}
857	defer w.Close()
858
859	if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" {
860		t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
861	}
862}
863
864// TestOpenOnTornWrite ensures that entries past the torn write are truncated.
865func TestOpenOnTornWrite(t *testing.T) {
866	maxEntries := 40
867	clobberIdx := 20
868	overwriteEntries := 5
869
870	p, err := ioutil.TempDir(os.TempDir(), "waltest")
871	if err != nil {
872		t.Fatal(err)
873	}
874	defer os.RemoveAll(p)
875	w, err := Create(zap.NewExample(), p, nil)
876	defer func() {
877		if err = w.Close(); err != nil && err != os.ErrInvalid {
878			t.Fatal(err)
879		}
880	}()
881	if err != nil {
882		t.Fatal(err)
883	}
884
885	// get offset of end of each saved entry
886	offsets := make([]int64, maxEntries)
887	for i := range offsets {
888		es := []raftpb.Entry{{Index: uint64(i)}}
889		if err = w.Save(raftpb.HardState{}, es); err != nil {
890			t.Fatal(err)
891		}
892		if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil {
893			t.Fatal(err)
894		}
895	}
896
897	fn := filepath.Join(p, filepath.Base(w.tail().Name()))
898	w.Close()
899
900	// clobber some entry with 0's to simulate a torn write
901	f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
902	if ferr != nil {
903		t.Fatal(ferr)
904	}
905	defer f.Close()
906	_, err = f.Seek(offsets[clobberIdx], io.SeekStart)
907	if err != nil {
908		t.Fatal(err)
909	}
910	zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
911	_, err = f.Write(zeros)
912	if err != nil {
913		t.Fatal(err)
914	}
915	f.Close()
916
917	w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
918	if err != nil {
919		t.Fatal(err)
920	}
921	// seek up to clobbered entry
922	_, _, _, err = w.ReadAll()
923	if err != nil {
924		t.Fatal(err)
925	}
926
927	// write a few entries past the clobbered entry
928	for i := 0; i < overwriteEntries; i++ {
929		// Index is different from old, truncated entries
930		es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
931		if err = w.Save(raftpb.HardState{}, es); err != nil {
932			t.Fatal(err)
933		}
934	}
935	w.Close()
936
937	// read back the entries, confirm number of entries matches expectation
938	w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
939	if err != nil {
940		t.Fatal(err)
941	}
942
943	_, _, ents, rerr := w.ReadAll()
944	if rerr != nil {
945		// CRC error? the old entries were likely never truncated away
946		t.Fatal(rerr)
947	}
948	wEntries := (clobberIdx - 1) + overwriteEntries
949	if len(ents) != wEntries {
950		t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
951	}
952}
953
954func TestRenameFail(t *testing.T) {
955	p, err := ioutil.TempDir(os.TempDir(), "waltest")
956	if err != nil {
957		t.Fatal(err)
958	}
959	defer os.RemoveAll(p)
960
961	oldSegmentSizeBytes := SegmentSizeBytes
962	defer func() {
963		SegmentSizeBytes = oldSegmentSizeBytes
964	}()
965	SegmentSizeBytes = math.MaxInt64
966
967	tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
968	if terr != nil {
969		t.Fatal(terr)
970	}
971	os.RemoveAll(tp)
972
973	w := &WAL{
974		lg:  zap.NewExample(),
975		dir: p,
976	}
977	w2, werr := w.renameWAL(tp)
978	if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory'
979		t.Fatalf("expected error, got %v", werr)
980	}
981}
982
983// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
984// for hardstate
985func TestValidSnapshotEntries(t *testing.T) {
986	p, err := ioutil.TempDir(os.TempDir(), "waltest")
987	if err != nil {
988		t.Fatal(err)
989	}
990	defer os.RemoveAll(p)
991	snap0 := walpb.Snapshot{Index: 0, Term: 0}
992	snap1 := walpb.Snapshot{Index: 1, Term: 1}
993	state1 := raftpb.HardState{Commit: 1, Term: 1}
994	snap2 := walpb.Snapshot{Index: 2, Term: 1}
995	snap3 := walpb.Snapshot{Index: 3, Term: 2}
996	state2 := raftpb.HardState{Commit: 3, Term: 2}
997	snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
998	func() {
999		var w *WAL
1000		w, err = Create(zap.NewExample(), p, nil)
1001		if err != nil {
1002			t.Fatal(err)
1003		}
1004		defer w.Close()
1005
1006		// snap0 is implicitly created at index 0, term 0
1007		if err = w.SaveSnapshot(snap1); err != nil {
1008			t.Fatal(err)
1009		}
1010		if err = w.Save(state1, nil); err != nil {
1011			t.Fatal(err)
1012		}
1013		if err = w.SaveSnapshot(snap2); err != nil {
1014			t.Fatal(err)
1015		}
1016		if err = w.SaveSnapshot(snap3); err != nil {
1017			t.Fatal(err)
1018		}
1019		if err = w.Save(state2, nil); err != nil {
1020			t.Fatal(err)
1021		}
1022		if err = w.SaveSnapshot(snap4); err != nil {
1023			t.Fatal(err)
1024		}
1025	}()
1026	walSnaps, serr := ValidSnapshotEntries(zap.NewExample(), p)
1027	if serr != nil {
1028		t.Fatal(serr)
1029	}
1030	expected := []walpb.Snapshot{snap0, snap1, snap2, snap3}
1031	if !reflect.DeepEqual(walSnaps, expected) {
1032		t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps)
1033	}
1034}
1035
1036// TestValidSnapshotEntriesAfterPurgeWal ensure that there are many wal files, and after cleaning the first wal file,
1037// it can work well.
1038func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
1039	oldSegmentSizeBytes := SegmentSizeBytes
1040	SegmentSizeBytes = 64
1041	defer func() {
1042		SegmentSizeBytes = oldSegmentSizeBytes
1043	}()
1044	p, err := ioutil.TempDir(os.TempDir(), "waltest")
1045	if err != nil {
1046		t.Fatal(err)
1047	}
1048	defer os.RemoveAll(p)
1049	snap0 := walpb.Snapshot{Index: 0, Term: 0}
1050	snap1 := walpb.Snapshot{Index: 1, Term: 1}
1051	state1 := raftpb.HardState{Commit: 1, Term: 1}
1052	snap2 := walpb.Snapshot{Index: 2, Term: 1}
1053	snap3 := walpb.Snapshot{Index: 3, Term: 2}
1054	state2 := raftpb.HardState{Commit: 3, Term: 2}
1055	func() {
1056		w, werr := Create(zap.NewExample(), p, nil)
1057		if werr != nil {
1058			t.Fatal(werr)
1059		}
1060		defer w.Close()
1061
1062		// snap0 is implicitly created at index 0, term 0
1063		if err = w.SaveSnapshot(snap1); err != nil {
1064			t.Fatal(err)
1065		}
1066		if err = w.Save(state1, nil); err != nil {
1067			t.Fatal(err)
1068		}
1069		if err = w.SaveSnapshot(snap2); err != nil {
1070			t.Fatal(err)
1071		}
1072		if err = w.SaveSnapshot(snap3); err != nil {
1073			t.Fatal(err)
1074		}
1075		for i := 0; i < 128; i++ {
1076			if err = w.Save(state2, nil); err != nil {
1077				t.Fatal(err)
1078			}
1079		}
1080	}()
1081	files, _, ferr := selectWALFiles(nil, p, snap0)
1082	if ferr != nil {
1083		t.Fatal(ferr)
1084	}
1085	os.Remove(p + "/" + files[0])
1086	_, err = ValidSnapshotEntries(zap.NewExample(), p)
1087	if err != nil {
1088		t.Fatal(err)
1089	}
1090}
1091
1092// TestReadAllFail ensure ReadAll error if used without opening the WAL
1093func TestReadAllFail(t *testing.T) {
1094	dir, err := ioutil.TempDir(os.TempDir(), "waltest")
1095	if err != nil {
1096		t.Fatal(err)
1097	}
1098	defer os.RemoveAll(dir)
1099
1100	// create initial WAL
1101	f, err := Create(zap.NewExample(), dir, []byte("metadata"))
1102	if err != nil {
1103		t.Fatal(err)
1104	}
1105	f.Close()
1106	// try to read without opening the WAL
1107	_, _, _, err = f.ReadAll()
1108	if err == nil || err != ErrDecoderNotFound {
1109		t.Fatalf("err = %v, want ErrDecoderNotFound", err)
1110	}
1111}
1112