1package raft 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "sync" 10 11 log "github.com/hashicorp/go-hclog" 12 13 "github.com/hashicorp/raft" 14) 15 16const ( 17 // boltSnapshotID is the stable ID for any boltDB snapshot. Keeping the ID 18 // stable means there is only ever one bolt snapshot in the system 19 boltSnapshotID = "bolt-snapshot" 20) 21 22// BoltSnapshotStore implements the SnapshotStore interface and allows 23// snapshots to be made on the local disk. The main difference between this 24// store and the file store is we make the distinction between snapshots that 25// have been written by the FSM and by internal Raft operations. The former are 26// treated as noop snapshots on Persist and are read in full from the FSM on 27// Open. The latter are treated like normal file snapshots and are able to be 28// opened and applied as usual. 29type BoltSnapshotStore struct { 30 // path is the directory in which to store file based snapshots 31 path string 32 // retain is the number of file based snapshots to keep 33 retain int 34 35 // We hold a copy of the FSM so we can stream snapshots straight out of the 36 // database. 37 fsm *FSM 38 39 // fileSnapStore is used to fall back to file snapshots when the data is 40 // being written from the raft library. This currently only happens on a 41 // follower during a snapshot install RPC. 42 fileSnapStore *raft.FileSnapshotStore 43 logger log.Logger 44} 45 46// BoltSnapshotSink implements SnapshotSink optionally choosing to write to a 47// file. 48type BoltSnapshotSink struct { 49 store *BoltSnapshotStore 50 logger log.Logger 51 meta raft.SnapshotMeta 52 trans raft.Transport 53 54 fileSink raft.SnapshotSink 55 l sync.Mutex 56 closed bool 57} 58 59// NewBoltSnapshotStore creates a new BoltSnapshotStore based 60// on a base directory. The `retain` parameter controls how many 61// snapshots are retained. Must be at least 1. 62func NewBoltSnapshotStore(base string, retain int, logger log.Logger, fsm *FSM) (*BoltSnapshotStore, error) { 63 if retain < 1 { 64 return nil, fmt.Errorf("must retain at least one snapshot") 65 } 66 if logger == nil { 67 return nil, fmt.Errorf("no logger provided") 68 } 69 70 fileStore, err := raft.NewFileSnapshotStore(base, retain, nil) 71 if err != nil { 72 return nil, err 73 } 74 75 // Setup the store 76 store := &BoltSnapshotStore{ 77 logger: logger, 78 fsm: fsm, 79 fileSnapStore: fileStore, 80 } 81 82 { 83 // TODO: I think this needs to be done before every NewRaft and 84 // RecoverCluster call. Not just on Factory method. 85 86 // Here we delete all the existing file based snapshots. This is necessary 87 // because we do not issue a restore on NewRaft. If a previous file snapshot 88 // had failed to apply we will be incorrectly setting the indexes. It's 89 // safer to simply delete all file snapshots on startup and rely on Raft to 90 // reconcile the FSM state. 91 if err := store.ReapSnapshots(); err != nil { 92 return nil, err 93 } 94 } 95 96 return store, nil 97} 98 99// Create is used to start a new snapshot 100func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uint64, 101 configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) { 102 // We only support version 1 snapshots at this time. 103 if version != 1 { 104 return nil, fmt.Errorf("unsupported snapshot version %d", version) 105 } 106 107 // We are processing a snapshot, fastforward the index, term, and 108 // configuration to the latest seen by the raft system. This could include 109 // log indexes for operation types that are never sent to the FSM. 110 if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil { 111 return nil, err 112 } 113 114 // Create the sink 115 sink := &BoltSnapshotSink{ 116 store: f, 117 logger: f.logger, 118 meta: raft.SnapshotMeta{ 119 Version: version, 120 ID: boltSnapshotID, 121 Index: index, 122 Term: term, 123 Configuration: configuration, 124 ConfigurationIndex: configurationIndex, 125 }, 126 trans: trans, 127 } 128 129 // Done 130 return sink, nil 131} 132 133// List returns available snapshots in the store. It only returns bolt 134// snapshots. No snapshot will be returned if there are no indexes in the 135// FSM. 136func (f *BoltSnapshotStore) List() ([]*raft.SnapshotMeta, error) { 137 meta, err := f.getBoltSnapshotMeta() 138 if err != nil { 139 return nil, err 140 } 141 142 // If we haven't seen any data yet do not return a snapshot 143 if meta.Index == 0 { 144 return nil, nil 145 } 146 147 return []*raft.SnapshotMeta{meta}, nil 148} 149 150// getBoltSnapshotMeta returns the fsm's latest state and configuration. 151func (f *BoltSnapshotStore) getBoltSnapshotMeta() (*raft.SnapshotMeta, error) { 152 latestIndex, latestConfig := f.fsm.LatestState() 153 meta := &raft.SnapshotMeta{ 154 Version: 1, 155 ID: boltSnapshotID, 156 Index: latestIndex.Index, 157 Term: latestIndex.Term, 158 } 159 160 if latestConfig != nil { 161 index, configuration := protoConfigurationToRaftConfiguration(latestConfig) 162 meta.Configuration = configuration 163 meta.ConfigurationIndex = index 164 } 165 166 return meta, nil 167} 168 169// Open takes a snapshot ID and returns a ReadCloser for that snapshot. 170func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { 171 var readCloser io.ReadCloser 172 var meta *raft.SnapshotMeta 173 switch id { 174 case boltSnapshotID: 175 176 var err error 177 meta, err = f.getBoltSnapshotMeta() 178 if err != nil { 179 return nil, nil, err 180 } 181 // If we don't have any data return an error 182 if meta.Index == 0 { 183 return nil, nil, errors.New("no snapshot data") 184 } 185 186 // Stream data out of the FSM to calculate the size 187 var writeCloser *io.PipeWriter 188 readCloser, writeCloser = io.Pipe() 189 metaReadCloser, metaWriteCloser := io.Pipe() 190 go func() { 191 f.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser) 192 }() 193 194 // Compute the size 195 n, err := io.Copy(ioutil.Discard, metaReadCloser) 196 if err != nil { 197 f.logger.Error("failed to read state file", "error", err) 198 metaReadCloser.Close() 199 readCloser.Close() 200 return nil, nil, err 201 } 202 203 meta.Size = n 204 205 default: 206 var err error 207 meta, readCloser, err = f.fileSnapStore.Open(id) 208 if err != nil { 209 return nil, nil, err 210 } 211 } 212 213 return meta, readCloser, nil 214} 215 216// ReapSnapshots reaps any snapshots beyond the retain count. 217func (f *BoltSnapshotStore) ReapSnapshots() error { 218 return f.fileSnapStore.ReapSnapshots() 219} 220 221// ID returns the ID of the snapshot, can be used with Open() 222// after the snapshot is finalized. 223func (s *BoltSnapshotSink) ID() string { 224 s.l.Lock() 225 defer s.l.Unlock() 226 227 if s.fileSink != nil { 228 return s.fileSink.ID() 229 } 230 231 return s.meta.ID 232} 233 234// Write is used to append to the state file. We write to the 235// buffered IO object to reduce the amount of context switches. 236func (s *BoltSnapshotSink) Write(b []byte) (int, error) { 237 s.l.Lock() 238 defer s.l.Unlock() 239 240 // If someone is writing to this sink then we need to create a file sink to 241 // capture the data. This currently only happens when a follower is being 242 // sent a snapshot. 243 if s.fileSink == nil { 244 fileSink, err := s.store.fileSnapStore.Create(s.meta.Version, s.meta.Index, s.meta.Term, s.meta.Configuration, s.meta.ConfigurationIndex, s.trans) 245 if err != nil { 246 return 0, err 247 } 248 s.fileSink = fileSink 249 } 250 251 return s.fileSink.Write(b) 252} 253 254// Close is used to indicate a successful end. 255func (s *BoltSnapshotSink) Close() error { 256 s.l.Lock() 257 defer s.l.Unlock() 258 259 // Make sure close is idempotent 260 if s.closed { 261 return nil 262 } 263 s.closed = true 264 265 if s.fileSink != nil { 266 return s.fileSink.Close() 267 } 268 269 return nil 270} 271 272// Cancel is used to indicate an unsuccessful end. 273func (s *BoltSnapshotSink) Cancel() error { 274 s.l.Lock() 275 defer s.l.Unlock() 276 277 // Make sure close is idempotent 278 if s.closed { 279 return nil 280 } 281 s.closed = true 282 283 if s.fileSink != nil { 284 return s.fileSink.Cancel() 285 } 286 287 return nil 288} 289