1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "io" 19 20 "go.etcd.io/etcd/etcdserver/api/snap" 21 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 22 "go.etcd.io/etcd/pkg/pbutil" 23 "go.etcd.io/etcd/pkg/types" 24 "go.etcd.io/etcd/raft/raftpb" 25 "go.etcd.io/etcd/wal" 26 "go.etcd.io/etcd/wal/walpb" 27 28 "go.uber.org/zap" 29) 30 31type Storage interface { 32 // Save function saves ents and state to the underlying stable storage. 33 // Save MUST block until st and ents are on stable storage. 34 Save(st raftpb.HardState, ents []raftpb.Entry) error 35 // SaveSnap function saves snapshot to the underlying stable storage. 36 SaveSnap(snap raftpb.Snapshot) error 37 // Close closes the Storage and performs finalization. 38 Close() error 39 // Release releases the locked wal files older than the provided snapshot. 40 Release(snap raftpb.Snapshot) error 41 // Sync WAL 42 Sync() error 43} 44 45type storage struct { 46 *wal.WAL 47 *snap.Snapshotter 48} 49 50func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { 51 return &storage{w, s} 52} 53 54// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry. 55func (st *storage) SaveSnap(snap raftpb.Snapshot) error { 56 walsnap := walpb.Snapshot{ 57 Index: snap.Metadata.Index, 58 Term: snap.Metadata.Term, 59 } 60 // save the snapshot file before writing the snapshot to the wal. 61 // This makes it possible for the snapshot file to become orphaned, but prevents 62 // a WAL snapshot entry from having no corresponding snapshot file. 63 err := st.Snapshotter.SaveSnap(snap) 64 if err != nil { 65 return err 66 } 67 // gofail: var raftBeforeWALSaveSnaphot struct{} 68 69 return st.WAL.SaveSnapshot(walsnap) 70} 71 72// Release releases resources older than the given snap and are no longer needed: 73// - releases the locks to the wal files that are older than the provided wal for the given snap. 74// - deletes any .snap.db files that are older than the given snap. 75func (st *storage) Release(snap raftpb.Snapshot) error { 76 if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil { 77 return err 78 } 79 return st.Snapshotter.ReleaseSnapDBs(snap) 80} 81 82// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear 83// after the position of the given snap in the WAL. 84// The snap must have been previously saved to the WAL, or this call will panic. 85func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { 86 var ( 87 err error 88 wmetadata []byte 89 ) 90 91 repaired := false 92 for { 93 if w, err = wal.Open(lg, waldir, snap); err != nil { 94 if lg != nil { 95 lg.Fatal("failed to open WAL", zap.Error(err)) 96 } else { 97 plog.Fatalf("open wal error: %v", err) 98 } 99 } 100 if unsafeNoFsync { 101 w.SetUnsafeNoFsync() 102 } 103 if wmetadata, st, ents, err = w.ReadAll(); err != nil { 104 w.Close() 105 // we can only repair ErrUnexpectedEOF and we never repair twice. 106 if repaired || err != io.ErrUnexpectedEOF { 107 if lg != nil { 108 lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) 109 } else { 110 plog.Fatalf("read wal error (%v) and cannot be repaired", err) 111 } 112 } 113 if !wal.Repair(lg, waldir) { 114 if lg != nil { 115 lg.Fatal("failed to repair WAL", zap.Error(err)) 116 } else { 117 plog.Fatalf("WAL error (%v) cannot be repaired", err) 118 } 119 } else { 120 if lg != nil { 121 lg.Info("repaired WAL", zap.Error(err)) 122 } else { 123 plog.Infof("repaired WAL error (%v)", err) 124 } 125 repaired = true 126 } 127 continue 128 } 129 break 130 } 131 var metadata pb.Metadata 132 pbutil.MustUnmarshal(&metadata, wmetadata) 133 id = types.ID(metadata.NodeID) 134 cid = types.ID(metadata.ClusterID) 135 return w, id, cid, st, ents 136} 137