1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package main 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "net/http" 22 "net/url" 23 "os" 24 "strconv" 25 "time" 26 27 "go.etcd.io/etcd/etcdserver/api/rafthttp" 28 "go.etcd.io/etcd/etcdserver/api/snap" 29 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 30 "go.etcd.io/etcd/pkg/fileutil" 31 "go.etcd.io/etcd/pkg/types" 32 "go.etcd.io/etcd/raft" 33 "go.etcd.io/etcd/raft/raftpb" 34 "go.etcd.io/etcd/wal" 35 "go.etcd.io/etcd/wal/walpb" 36 37 "go.uber.org/zap" 38) 39 40// A key-value stream backed by raft 41type raftNode struct { 42 proposeC <-chan string // proposed messages (k,v) 43 confChangeC <-chan raftpb.ConfChange // proposed cluster config changes 44 commitC chan<- *string // entries committed to log (k,v) 45 errorC chan<- error // errors from raft session 46 47 id int // client ID for raft session 48 peers []string // raft peer URLs 49 join bool // node is joining an existing cluster 50 waldir string // path to WAL directory 51 snapdir string // path to snapshot directory 52 getSnapshot func() ([]byte, error) 53 lastIndex uint64 // index of log at start 54 55 confState raftpb.ConfState 56 snapshotIndex uint64 57 appliedIndex uint64 58 59 // raft backing for the commit/error channel 60 node raft.Node 61 raftStorage *raft.MemoryStorage 62 wal *wal.WAL 63 64 snapshotter *snap.Snapshotter 65 snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready 66 67 snapCount uint64 68 transport *rafthttp.Transport 69 stopc chan struct{} // signals proposal channel closed 70 httpstopc chan struct{} // signals http server to shutdown 71 httpdonec chan struct{} // signals http server shutdown complete 72} 73 74var defaultSnapshotCount uint64 = 10000 75 76// newRaftNode initiates a raft instance and returns a committed log entry 77// channel and error channel. Proposals for log updates are sent over the 78// provided the proposal channel. All log entries are replayed over the 79// commit channel, followed by a nil message (to indicate the channel is 80// current), then new log entries. To shutdown, close proposeC and read errorC. 81func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, 82 confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { 83 84 commitC := make(chan *string) 85 errorC := make(chan error) 86 87 rc := &raftNode{ 88 proposeC: proposeC, 89 confChangeC: confChangeC, 90 commitC: commitC, 91 errorC: errorC, 92 id: id, 93 peers: peers, 94 join: join, 95 waldir: fmt.Sprintf("raftexample-%d", id), 96 snapdir: fmt.Sprintf("raftexample-%d-snap", id), 97 getSnapshot: getSnapshot, 98 snapCount: defaultSnapshotCount, 99 stopc: make(chan struct{}), 100 httpstopc: make(chan struct{}), 101 httpdonec: make(chan struct{}), 102 103 snapshotterReady: make(chan *snap.Snapshotter, 1), 104 // rest of structure populated after WAL replay 105 } 106 go rc.startRaft() 107 return commitC, errorC, rc.snapshotterReady 108} 109 110func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { 111 // must save the snapshot index to the WAL before saving the 112 // snapshot to maintain the invariant that we only Open the 113 // wal at previously-saved snapshot indexes. 114 walSnap := walpb.Snapshot{ 115 Index: snap.Metadata.Index, 116 Term: snap.Metadata.Term, 117 } 118 if err := rc.wal.SaveSnapshot(walSnap); err != nil { 119 return err 120 } 121 if err := rc.snapshotter.SaveSnap(snap); err != nil { 122 return err 123 } 124 return rc.wal.ReleaseLockTo(snap.Metadata.Index) 125} 126 127func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { 128 if len(ents) == 0 { 129 return ents 130 } 131 firstIdx := ents[0].Index 132 if firstIdx > rc.appliedIndex+1 { 133 log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, rc.appliedIndex) 134 } 135 if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) { 136 nents = ents[rc.appliedIndex-firstIdx+1:] 137 } 138 return nents 139} 140 141// publishEntries writes committed log entries to commit channel and returns 142// whether all entries could be published. 143func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { 144 for i := range ents { 145 switch ents[i].Type { 146 case raftpb.EntryNormal: 147 if len(ents[i].Data) == 0 { 148 // ignore empty messages 149 break 150 } 151 s := string(ents[i].Data) 152 select { 153 case rc.commitC <- &s: 154 case <-rc.stopc: 155 return false 156 } 157 158 case raftpb.EntryConfChange: 159 var cc raftpb.ConfChange 160 cc.Unmarshal(ents[i].Data) 161 rc.confState = *rc.node.ApplyConfChange(cc) 162 switch cc.Type { 163 case raftpb.ConfChangeAddNode: 164 if len(cc.Context) > 0 { 165 rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}) 166 } 167 case raftpb.ConfChangeRemoveNode: 168 if cc.NodeID == uint64(rc.id) { 169 log.Println("I've been removed from the cluster! Shutting down.") 170 return false 171 } 172 rc.transport.RemovePeer(types.ID(cc.NodeID)) 173 } 174 } 175 176 // after commit, update appliedIndex 177 rc.appliedIndex = ents[i].Index 178 179 // special nil commit to signal replay has finished 180 if ents[i].Index == rc.lastIndex { 181 select { 182 case rc.commitC <- nil: 183 case <-rc.stopc: 184 return false 185 } 186 } 187 } 188 return true 189} 190 191func (rc *raftNode) loadSnapshot() *raftpb.Snapshot { 192 snapshot, err := rc.snapshotter.Load() 193 if err != nil && err != snap.ErrNoSnapshot { 194 log.Fatalf("raftexample: error loading snapshot (%v)", err) 195 } 196 return snapshot 197} 198 199// openWAL returns a WAL ready for reading. 200func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL { 201 if !wal.Exist(rc.waldir) { 202 if err := os.Mkdir(rc.waldir, 0750); err != nil { 203 log.Fatalf("raftexample: cannot create dir for wal (%v)", err) 204 } 205 206 w, err := wal.Create(zap.NewExample(), rc.waldir, nil) 207 if err != nil { 208 log.Fatalf("raftexample: create wal error (%v)", err) 209 } 210 w.Close() 211 } 212 213 walsnap := walpb.Snapshot{} 214 if snapshot != nil { 215 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term 216 } 217 log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index) 218 w, err := wal.Open(zap.NewExample(), rc.waldir, walsnap) 219 if err != nil { 220 log.Fatalf("raftexample: error loading wal (%v)", err) 221 } 222 223 return w 224} 225 226// replayWAL replays WAL entries into the raft instance. 227func (rc *raftNode) replayWAL() *wal.WAL { 228 log.Printf("replaying WAL of member %d", rc.id) 229 snapshot := rc.loadSnapshot() 230 w := rc.openWAL(snapshot) 231 _, st, ents, err := w.ReadAll() 232 if err != nil { 233 log.Fatalf("raftexample: failed to read WAL (%v)", err) 234 } 235 rc.raftStorage = raft.NewMemoryStorage() 236 if snapshot != nil { 237 rc.raftStorage.ApplySnapshot(*snapshot) 238 } 239 rc.raftStorage.SetHardState(st) 240 241 // append to storage so raft starts at the right place in log 242 rc.raftStorage.Append(ents) 243 // send nil once lastIndex is published so client knows commit channel is current 244 if len(ents) > 0 { 245 rc.lastIndex = ents[len(ents)-1].Index 246 } else { 247 rc.commitC <- nil 248 } 249 return w 250} 251 252func (rc *raftNode) writeError(err error) { 253 rc.stopHTTP() 254 close(rc.commitC) 255 rc.errorC <- err 256 close(rc.errorC) 257 rc.node.Stop() 258} 259 260func (rc *raftNode) startRaft() { 261 if !fileutil.Exist(rc.snapdir) { 262 if err := os.Mkdir(rc.snapdir, 0750); err != nil { 263 log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) 264 } 265 } 266 rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) 267 rc.snapshotterReady <- rc.snapshotter 268 269 oldwal := wal.Exist(rc.waldir) 270 rc.wal = rc.replayWAL() 271 272 rpeers := make([]raft.Peer, len(rc.peers)) 273 for i := range rpeers { 274 rpeers[i] = raft.Peer{ID: uint64(i + 1)} 275 } 276 c := &raft.Config{ 277 ID: uint64(rc.id), 278 ElectionTick: 10, 279 HeartbeatTick: 1, 280 Storage: rc.raftStorage, 281 MaxSizePerMsg: 1024 * 1024, 282 MaxInflightMsgs: 256, 283 MaxUncommittedEntriesSize: 1 << 30, 284 } 285 286 if oldwal { 287 rc.node = raft.RestartNode(c) 288 } else { 289 startPeers := rpeers 290 if rc.join { 291 startPeers = nil 292 } 293 rc.node = raft.StartNode(c, startPeers) 294 } 295 296 rc.transport = &rafthttp.Transport{ 297 Logger: zap.NewExample(), 298 ID: types.ID(rc.id), 299 ClusterID: 0x1000, 300 Raft: rc, 301 ServerStats: stats.NewServerStats("", ""), 302 LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)), 303 ErrorC: make(chan error), 304 } 305 306 rc.transport.Start() 307 for i := range rc.peers { 308 if i+1 != rc.id { 309 rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) 310 } 311 } 312 313 go rc.serveRaft() 314 go rc.serveChannels() 315} 316 317// stop closes http, closes all channels, and stops raft. 318func (rc *raftNode) stop() { 319 rc.stopHTTP() 320 close(rc.commitC) 321 close(rc.errorC) 322 rc.node.Stop() 323} 324 325func (rc *raftNode) stopHTTP() { 326 rc.transport.Stop() 327 close(rc.httpstopc) 328 <-rc.httpdonec 329} 330 331func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) { 332 if raft.IsEmptySnap(snapshotToSave) { 333 return 334 } 335 336 log.Printf("publishing snapshot at index %d", rc.snapshotIndex) 337 defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex) 338 339 if snapshotToSave.Metadata.Index <= rc.appliedIndex { 340 log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d]", snapshotToSave.Metadata.Index, rc.appliedIndex) 341 } 342 rc.commitC <- nil // trigger kvstore to load snapshot 343 344 rc.confState = snapshotToSave.Metadata.ConfState 345 rc.snapshotIndex = snapshotToSave.Metadata.Index 346 rc.appliedIndex = snapshotToSave.Metadata.Index 347} 348 349var snapshotCatchUpEntriesN uint64 = 10000 350 351func (rc *raftNode) maybeTriggerSnapshot() { 352 if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount { 353 return 354 } 355 356 log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex) 357 data, err := rc.getSnapshot() 358 if err != nil { 359 log.Panic(err) 360 } 361 snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data) 362 if err != nil { 363 panic(err) 364 } 365 if err := rc.saveSnap(snap); err != nil { 366 panic(err) 367 } 368 369 compactIndex := uint64(1) 370 if rc.appliedIndex > snapshotCatchUpEntriesN { 371 compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN 372 } 373 if err := rc.raftStorage.Compact(compactIndex); err != nil { 374 panic(err) 375 } 376 377 log.Printf("compacted log at index %d", compactIndex) 378 rc.snapshotIndex = rc.appliedIndex 379} 380 381func (rc *raftNode) serveChannels() { 382 snap, err := rc.raftStorage.Snapshot() 383 if err != nil { 384 panic(err) 385 } 386 rc.confState = snap.Metadata.ConfState 387 rc.snapshotIndex = snap.Metadata.Index 388 rc.appliedIndex = snap.Metadata.Index 389 390 defer rc.wal.Close() 391 392 ticker := time.NewTicker(100 * time.Millisecond) 393 defer ticker.Stop() 394 395 // send proposals over raft 396 go func() { 397 confChangeCount := uint64(0) 398 399 for rc.proposeC != nil && rc.confChangeC != nil { 400 select { 401 case prop, ok := <-rc.proposeC: 402 if !ok { 403 rc.proposeC = nil 404 } else { 405 // blocks until accepted by raft state machine 406 rc.node.Propose(context.TODO(), []byte(prop)) 407 } 408 409 case cc, ok := <-rc.confChangeC: 410 if !ok { 411 rc.confChangeC = nil 412 } else { 413 confChangeCount++ 414 cc.ID = confChangeCount 415 rc.node.ProposeConfChange(context.TODO(), cc) 416 } 417 } 418 } 419 // client closed channel; shutdown raft if not already 420 close(rc.stopc) 421 }() 422 423 // event loop on raft state machine updates 424 for { 425 select { 426 case <-ticker.C: 427 rc.node.Tick() 428 429 // store raft entries to wal, then publish over commit channel 430 case rd := <-rc.node.Ready(): 431 rc.wal.Save(rd.HardState, rd.Entries) 432 if !raft.IsEmptySnap(rd.Snapshot) { 433 rc.saveSnap(rd.Snapshot) 434 rc.raftStorage.ApplySnapshot(rd.Snapshot) 435 rc.publishSnapshot(rd.Snapshot) 436 } 437 rc.raftStorage.Append(rd.Entries) 438 rc.transport.Send(rd.Messages) 439 if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { 440 rc.stop() 441 return 442 } 443 rc.maybeTriggerSnapshot() 444 rc.node.Advance() 445 446 case err := <-rc.transport.ErrorC: 447 rc.writeError(err) 448 return 449 450 case <-rc.stopc: 451 rc.stop() 452 return 453 } 454 } 455} 456 457func (rc *raftNode) serveRaft() { 458 url, err := url.Parse(rc.peers[rc.id-1]) 459 if err != nil { 460 log.Fatalf("raftexample: Failed parsing URL (%v)", err) 461 } 462 463 ln, err := newStoppableListener(url.Host, rc.httpstopc) 464 if err != nil { 465 log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err) 466 } 467 468 err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln) 469 select { 470 case <-rc.httpstopc: 471 default: 472 log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err) 473 } 474 close(rc.httpdonec) 475} 476 477func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { 478 return rc.node.Step(ctx, m) 479} 480func (rc *raftNode) IsIDRemoved(id uint64) bool { return false } 481func (rc *raftNode) ReportUnreachable(id uint64) {} 482func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {} 483