1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import ( 18 "errors" 19 20 pb "github.com/coreos/etcd/raft/raftpb" 21 "golang.org/x/net/context" 22) 23 24type SnapshotStatus int 25 26const ( 27 SnapshotFinish SnapshotStatus = 1 28 SnapshotFailure SnapshotStatus = 2 29) 30 31var ( 32 emptyState = pb.HardState{} 33 34 // ErrStopped is returned by methods on Nodes that have been stopped. 35 ErrStopped = errors.New("raft: stopped") 36) 37 38// SoftState provides state that is useful for logging and debugging. 39// The state is volatile and does not need to be persisted to the WAL. 40type SoftState struct { 41 Lead uint64 // must use atomic operations to access; keep 64-bit aligned. 42 RaftState StateType 43} 44 45func (a *SoftState) equal(b *SoftState) bool { 46 return a.Lead == b.Lead && a.RaftState == b.RaftState 47} 48 49// Ready encapsulates the entries and messages that are ready to read, 50// be saved to stable storage, committed or sent to other peers. 51// All fields in Ready are read-only. 52type Ready struct { 53 // The current volatile state of a Node. 54 // SoftState will be nil if there is no update. 55 // It is not required to consume or store SoftState. 56 *SoftState 57 58 // The current state of a Node to be saved to stable storage BEFORE 59 // Messages are sent. 60 // HardState will be equal to empty state if there is no update. 61 pb.HardState 62 63 // ReadStates can be used for node to serve linearizable read requests locally 64 // when its applied index is greater than the index in ReadState. 65 // Note that the readState will be returned when raft receives msgReadIndex. 66 // The returned is only valid for the request that requested to read. 67 ReadStates []ReadState 68 69 // Entries specifies entries to be saved to stable storage BEFORE 70 // Messages are sent. 71 Entries []pb.Entry 72 73 // Snapshot specifies the snapshot to be saved to stable storage. 74 Snapshot pb.Snapshot 75 76 // CommittedEntries specifies entries to be committed to a 77 // store/state-machine. These have previously been committed to stable 78 // store. 79 CommittedEntries []pb.Entry 80 81 // Messages specifies outbound messages to be sent AFTER Entries are 82 // committed to stable storage. 83 // If it contains a MsgSnap message, the application MUST report back to raft 84 // when the snapshot has been received or has failed by calling ReportSnapshot. 85 Messages []pb.Message 86} 87 88func isHardStateEqual(a, b pb.HardState) bool { 89 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit 90} 91 92// IsEmptyHardState returns true if the given HardState is empty. 93func IsEmptyHardState(st pb.HardState) bool { 94 return isHardStateEqual(st, emptyState) 95} 96 97// IsEmptySnap returns true if the given Snapshot is empty. 98func IsEmptySnap(sp pb.Snapshot) bool { 99 return sp.Metadata.Index == 0 100} 101 102func (rd Ready) containsUpdates() bool { 103 return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || 104 !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || 105 len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 106} 107 108// Node represents a node in a raft cluster. 109type Node interface { 110 // Tick increments the internal logical clock for the Node by a single tick. Election 111 // timeouts and heartbeat timeouts are in units of ticks. 112 Tick() 113 // Campaign causes the Node to transition to candidate state and start campaigning to become leader. 114 Campaign(ctx context.Context) error 115 // Propose proposes that data be appended to the log. 116 Propose(ctx context.Context, data []byte) error 117 // ProposeConfChange proposes config change. 118 // At most one ConfChange can be in the process of going through consensus. 119 // Application needs to call ApplyConfChange when applying EntryConfChange type entry. 120 ProposeConfChange(ctx context.Context, cc pb.ConfChange) error 121 // Step advances the state machine using the given message. ctx.Err() will be returned, if any. 122 Step(ctx context.Context, msg pb.Message) error 123 124 // Ready returns a channel that returns the current point-in-time state. 125 // Users of the Node must call Advance after retrieving the state returned by Ready. 126 // 127 // NOTE: No committed entries from the next Ready may be applied until all committed entries 128 // and snapshots from the previous one have finished. 129 Ready() <-chan Ready 130 131 // Advance notifies the Node that the application has saved progress up to the last Ready. 132 // It prepares the node to return the next available Ready. 133 // 134 // The application should generally call Advance after it applies the entries in last Ready. 135 // 136 // However, as an optimization, the application may call Advance while it is applying the 137 // commands. For example. when the last Ready contains a snapshot, the application might take 138 // a long time to apply the snapshot data. To continue receiving Ready without blocking raft 139 // progress, it can call Advance before finishing applying the last ready. 140 Advance() 141 // ApplyConfChange applies config change to the local node. 142 // Returns an opaque ConfState protobuf which must be recorded 143 // in snapshots. Will never return nil; it returns a pointer only 144 // to match MemoryStorage.Compact. 145 ApplyConfChange(cc pb.ConfChange) *pb.ConfState 146 147 // TransferLeadership attempts to transfer leadership to the given transferee. 148 TransferLeadership(ctx context.Context, lead, transferee uint64) 149 150 // ReadIndex request a read state. The read state will be set in the ready. 151 // Read state has a read index. Once the application advances further than the read 152 // index, any linearizable read requests issued before the read request can be 153 // processed safely. The read state will have the same rctx attached. 154 ReadIndex(ctx context.Context, rctx []byte) error 155 156 // Status returns the current status of the raft state machine. 157 Status() Status 158 // ReportUnreachable reports the given node is not reachable for the last send. 159 ReportUnreachable(id uint64) 160 // ReportSnapshot reports the status of the sent snapshot. 161 ReportSnapshot(id uint64, status SnapshotStatus) 162 // Stop performs any necessary termination of the Node. 163 Stop() 164} 165 166type Peer struct { 167 ID uint64 168 Context []byte 169} 170 171// StartNode returns a new Node given configuration and a list of raft peers. 172// It appends a ConfChangeAddNode entry for each given peer to the initial log. 173func StartNode(c *Config, peers []Peer) Node { 174 r := newRaft(c) 175 // become the follower at term 1 and apply initial configuration 176 // entries of term 1 177 r.becomeFollower(1, None) 178 for _, peer := range peers { 179 cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} 180 d, err := cc.Marshal() 181 if err != nil { 182 panic("unexpected marshal error") 183 } 184 e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} 185 r.raftLog.append(e) 186 } 187 // Mark these initial entries as committed. 188 // TODO(bdarnell): These entries are still unstable; do we need to preserve 189 // the invariant that committed < unstable? 190 r.raftLog.committed = r.raftLog.lastIndex() 191 // Now apply them, mainly so that the application can call Campaign 192 // immediately after StartNode in tests. Note that these nodes will 193 // be added to raft twice: here and when the application's Ready 194 // loop calls ApplyConfChange. The calls to addNode must come after 195 // all calls to raftLog.append so progress.next is set after these 196 // bootstrapping entries (it is an error if we try to append these 197 // entries since they have already been committed). 198 // We do not set raftLog.applied so the application will be able 199 // to observe all conf changes via Ready.CommittedEntries. 200 for _, peer := range peers { 201 r.addNode(peer.ID) 202 } 203 204 n := newNode() 205 n.logger = c.Logger 206 go n.run(r) 207 return &n 208} 209 210// RestartNode is similar to StartNode but does not take a list of peers. 211// The current membership of the cluster will be restored from the Storage. 212// If the caller has an existing state machine, pass in the last log index that 213// has been applied to it; otherwise use zero. 214func RestartNode(c *Config) Node { 215 r := newRaft(c) 216 217 n := newNode() 218 n.logger = c.Logger 219 go n.run(r) 220 return &n 221} 222 223// node is the canonical implementation of the Node interface 224type node struct { 225 propc chan pb.Message 226 recvc chan pb.Message 227 confc chan pb.ConfChange 228 confstatec chan pb.ConfState 229 readyc chan Ready 230 advancec chan struct{} 231 tickc chan struct{} 232 done chan struct{} 233 stop chan struct{} 234 status chan chan Status 235 236 logger Logger 237} 238 239func newNode() node { 240 return node{ 241 propc: make(chan pb.Message), 242 recvc: make(chan pb.Message), 243 confc: make(chan pb.ConfChange), 244 confstatec: make(chan pb.ConfState), 245 readyc: make(chan Ready), 246 advancec: make(chan struct{}), 247 // make tickc a buffered chan, so raft node can buffer some ticks when the node 248 // is busy processing raft messages. Raft node will resume process buffered 249 // ticks when it becomes idle. 250 tickc: make(chan struct{}, 128), 251 done: make(chan struct{}), 252 stop: make(chan struct{}), 253 status: make(chan chan Status), 254 } 255} 256 257func (n *node) Stop() { 258 select { 259 case n.stop <- struct{}{}: 260 // Not already stopped, so trigger it 261 case <-n.done: 262 // Node has already been stopped - no need to do anything 263 return 264 } 265 // Block until the stop has been acknowledged by run() 266 <-n.done 267} 268 269func (n *node) run(r *raft) { 270 var propc chan pb.Message 271 var readyc chan Ready 272 var advancec chan struct{} 273 var prevLastUnstablei, prevLastUnstablet uint64 274 var havePrevLastUnstablei bool 275 var prevSnapi uint64 276 var rd Ready 277 278 lead := None 279 prevSoftSt := r.softState() 280 prevHardSt := emptyState 281 282 for { 283 if advancec != nil { 284 readyc = nil 285 } else { 286 rd = newReady(r, prevSoftSt, prevHardSt) 287 if rd.containsUpdates() { 288 readyc = n.readyc 289 } else { 290 readyc = nil 291 } 292 } 293 294 if lead != r.lead { 295 if r.hasLeader() { 296 if lead == None { 297 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) 298 } else { 299 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) 300 } 301 propc = n.propc 302 } else { 303 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) 304 propc = nil 305 } 306 lead = r.lead 307 } 308 309 select { 310 // TODO: maybe buffer the config propose if there exists one (the way 311 // described in raft dissertation) 312 // Currently it is dropped in Step silently. 313 case m := <-propc: 314 m.From = r.id 315 r.Step(m) 316 case m := <-n.recvc: 317 // filter out response message from unknown From. 318 if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { 319 r.Step(m) // raft never returns an error 320 } 321 case cc := <-n.confc: 322 if cc.NodeID == None { 323 r.resetPendingConf() 324 select { 325 case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: 326 case <-n.done: 327 } 328 break 329 } 330 switch cc.Type { 331 case pb.ConfChangeAddNode: 332 r.addNode(cc.NodeID) 333 case pb.ConfChangeRemoveNode: 334 // block incoming proposal when local node is 335 // removed 336 if cc.NodeID == r.id { 337 propc = nil 338 } 339 r.removeNode(cc.NodeID) 340 case pb.ConfChangeUpdateNode: 341 r.resetPendingConf() 342 default: 343 panic("unexpected conf type") 344 } 345 select { 346 case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: 347 case <-n.done: 348 } 349 case <-n.tickc: 350 r.tick() 351 case readyc <- rd: 352 if rd.SoftState != nil { 353 prevSoftSt = rd.SoftState 354 } 355 if len(rd.Entries) > 0 { 356 prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index 357 prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term 358 havePrevLastUnstablei = true 359 } 360 if !IsEmptyHardState(rd.HardState) { 361 prevHardSt = rd.HardState 362 } 363 if !IsEmptySnap(rd.Snapshot) { 364 prevSnapi = rd.Snapshot.Metadata.Index 365 } 366 367 r.msgs = nil 368 r.readStates = nil 369 advancec = n.advancec 370 case <-advancec: 371 if prevHardSt.Commit != 0 { 372 r.raftLog.appliedTo(prevHardSt.Commit) 373 } 374 if havePrevLastUnstablei { 375 r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) 376 havePrevLastUnstablei = false 377 } 378 r.raftLog.stableSnapTo(prevSnapi) 379 advancec = nil 380 case c := <-n.status: 381 c <- getStatus(r) 382 case <-n.stop: 383 close(n.done) 384 return 385 } 386 } 387} 388 389// Tick increments the internal logical clock for this Node. Election timeouts 390// and heartbeat timeouts are in units of ticks. 391func (n *node) Tick() { 392 select { 393 case n.tickc <- struct{}{}: 394 case <-n.done: 395 default: 396 n.logger.Warningf("A tick missed to fire. Node blocks too long!") 397 } 398} 399 400func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } 401 402func (n *node) Propose(ctx context.Context, data []byte) error { 403 return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 404} 405 406func (n *node) Step(ctx context.Context, m pb.Message) error { 407 // ignore unexpected local messages receiving over network 408 if IsLocalMsg(m.Type) { 409 // TODO: return an error? 410 return nil 411 } 412 return n.step(ctx, m) 413} 414 415func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { 416 data, err := cc.Marshal() 417 if err != nil { 418 return err 419 } 420 return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) 421} 422 423// Step advances the state machine using msgs. The ctx.Err() will be returned, 424// if any. 425func (n *node) step(ctx context.Context, m pb.Message) error { 426 ch := n.recvc 427 if m.Type == pb.MsgProp { 428 ch = n.propc 429 } 430 431 select { 432 case ch <- m: 433 return nil 434 case <-ctx.Done(): 435 return ctx.Err() 436 case <-n.done: 437 return ErrStopped 438 } 439} 440 441func (n *node) Ready() <-chan Ready { return n.readyc } 442 443func (n *node) Advance() { 444 select { 445 case n.advancec <- struct{}{}: 446 case <-n.done: 447 } 448} 449 450func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { 451 var cs pb.ConfState 452 select { 453 case n.confc <- cc: 454 case <-n.done: 455 } 456 select { 457 case cs = <-n.confstatec: 458 case <-n.done: 459 } 460 return &cs 461} 462 463func (n *node) Status() Status { 464 c := make(chan Status) 465 select { 466 case n.status <- c: 467 return <-c 468 case <-n.done: 469 return Status{} 470 } 471} 472 473func (n *node) ReportUnreachable(id uint64) { 474 select { 475 case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: 476 case <-n.done: 477 } 478} 479 480func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { 481 rej := status == SnapshotFailure 482 483 select { 484 case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: 485 case <-n.done: 486 } 487} 488 489func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { 490 select { 491 // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership 492 case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: 493 case <-n.done: 494 case <-ctx.Done(): 495 } 496} 497 498func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { 499 return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) 500} 501 502func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { 503 rd := Ready{ 504 Entries: r.raftLog.unstableEntries(), 505 CommittedEntries: r.raftLog.nextEnts(), 506 Messages: r.msgs, 507 } 508 if softSt := r.softState(); !softSt.equal(prevSoftSt) { 509 rd.SoftState = softSt 510 } 511 if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { 512 rd.HardState = hardSt 513 } 514 if r.raftLog.unstable.snapshot != nil { 515 rd.Snapshot = *r.raftLog.unstable.snapshot 516 } 517 if len(r.readStates) != 0 { 518 rd.ReadStates = r.readStates 519 } 520 return rd 521} 522