1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import ( 18 "bytes" 19 "errors" 20 "fmt" 21 "math" 22 "math/rand" 23 "sort" 24 "strings" 25 "sync" 26 "time" 27 28 "go.etcd.io/etcd/raft/confchange" 29 "go.etcd.io/etcd/raft/quorum" 30 pb "go.etcd.io/etcd/raft/raftpb" 31 "go.etcd.io/etcd/raft/tracker" 32) 33 34// None is a placeholder node ID used when there is no leader. 35const None uint64 = 0 36const noLimit = math.MaxUint64 37 38// Possible values for StateType. 39const ( 40 StateFollower StateType = iota 41 StateCandidate 42 StateLeader 43 StatePreCandidate 44 numStates 45) 46 47type ReadOnlyOption int 48 49const ( 50 // ReadOnlySafe guarantees the linearizability of the read only request by 51 // communicating with the quorum. It is the default and suggested option. 52 ReadOnlySafe ReadOnlyOption = iota 53 // ReadOnlyLeaseBased ensures linearizability of the read only request by 54 // relying on the leader lease. It can be affected by clock drift. 55 // If the clock drift is unbounded, leader might keep the lease longer than it 56 // should (clock can move backward/pause without any bound). ReadIndex is not safe 57 // in that case. 58 ReadOnlyLeaseBased 59) 60 61// Possible values for CampaignType 62const ( 63 // campaignPreElection represents the first phase of a normal election when 64 // Config.PreVote is true. 65 campaignPreElection CampaignType = "CampaignPreElection" 66 // campaignElection represents a normal (time-based) election (the second phase 67 // of the election when Config.PreVote is true). 68 campaignElection CampaignType = "CampaignElection" 69 // campaignTransfer represents the type of leader transfer 70 campaignTransfer CampaignType = "CampaignTransfer" 71) 72 73// ErrProposalDropped is returned when the proposal is ignored by some cases, 74// so that the proposer can be notified and fail fast. 75var ErrProposalDropped = errors.New("raft proposal dropped") 76 77// lockedRand is a small wrapper around rand.Rand to provide 78// synchronization among multiple raft groups. Only the methods needed 79// by the code are exposed (e.g. Intn). 80type lockedRand struct { 81 mu sync.Mutex 82 rand *rand.Rand 83} 84 85func (r *lockedRand) Intn(n int) int { 86 r.mu.Lock() 87 v := r.rand.Intn(n) 88 r.mu.Unlock() 89 return v 90} 91 92var globalRand = &lockedRand{ 93 rand: rand.New(rand.NewSource(time.Now().UnixNano())), 94} 95 96// CampaignType represents the type of campaigning 97// the reason we use the type of string instead of uint64 98// is because it's simpler to compare and fill in raft entries 99type CampaignType string 100 101// StateType represents the role of a node in a cluster. 102type StateType uint64 103 104var stmap = [...]string{ 105 "StateFollower", 106 "StateCandidate", 107 "StateLeader", 108 "StatePreCandidate", 109} 110 111func (st StateType) String() string { 112 return stmap[uint64(st)] 113} 114 115// Config contains the parameters to start a raft. 116type Config struct { 117 // ID is the identity of the local raft. ID cannot be 0. 118 ID uint64 119 120 // peers contains the IDs of all nodes (including self) in the raft cluster. It 121 // should only be set when starting a new raft cluster. Restarting raft from 122 // previous configuration will panic if peers is set. peer is private and only 123 // used for testing right now. 124 peers []uint64 125 126 // learners contains the IDs of all learner nodes (including self if the 127 // local node is a learner) in the raft cluster. learners only receives 128 // entries from the leader node. It does not vote or promote itself. 129 learners []uint64 130 131 // ElectionTick is the number of Node.Tick invocations that must pass between 132 // elections. That is, if a follower does not receive any message from the 133 // leader of current term before ElectionTick has elapsed, it will become 134 // candidate and start an election. ElectionTick must be greater than 135 // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid 136 // unnecessary leader switching. 137 ElectionTick int 138 // HeartbeatTick is the number of Node.Tick invocations that must pass between 139 // heartbeats. That is, a leader sends heartbeat messages to maintain its 140 // leadership every HeartbeatTick ticks. 141 HeartbeatTick int 142 143 // Storage is the storage for raft. raft generates entries and states to be 144 // stored in storage. raft reads the persisted entries and states out of 145 // Storage when it needs. raft reads out the previous state and configuration 146 // out of storage when restarting. 147 Storage Storage 148 // Applied is the last applied index. It should only be set when restarting 149 // raft. raft will not return entries to the application smaller or equal to 150 // Applied. If Applied is unset when restarting, raft might return previous 151 // applied entries. This is a very application dependent configuration. 152 Applied uint64 153 154 // MaxSizePerMsg limits the max byte size of each append message. Smaller 155 // value lowers the raft recovery cost(initial probing and message lost 156 // during normal operation). On the other side, it might affect the 157 // throughput during normal replication. Note: math.MaxUint64 for unlimited, 158 // 0 for at most one entry per message. 159 MaxSizePerMsg uint64 160 // MaxCommittedSizePerReady limits the size of the committed entries which 161 // can be applied. 162 MaxCommittedSizePerReady uint64 163 // MaxUncommittedEntriesSize limits the aggregate byte size of the 164 // uncommitted entries that may be appended to a leader's log. Once this 165 // limit is exceeded, proposals will begin to return ErrProposalDropped 166 // errors. Note: 0 for no limit. 167 MaxUncommittedEntriesSize uint64 168 // MaxInflightMsgs limits the max number of in-flight append messages during 169 // optimistic replication phase. The application transportation layer usually 170 // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid 171 // overflowing that sending buffer. TODO (xiangli): feedback to application to 172 // limit the proposal rate? 173 MaxInflightMsgs int 174 175 // CheckQuorum specifies if the leader should check quorum activity. Leader 176 // steps down when quorum is not active for an electionTimeout. 177 CheckQuorum bool 178 179 // PreVote enables the Pre-Vote algorithm described in raft thesis section 180 // 9.6. This prevents disruption when a node that has been partitioned away 181 // rejoins the cluster. 182 PreVote bool 183 184 // ReadOnlyOption specifies how the read only request is processed. 185 // 186 // ReadOnlySafe guarantees the linearizability of the read only request by 187 // communicating with the quorum. It is the default and suggested option. 188 // 189 // ReadOnlyLeaseBased ensures linearizability of the read only request by 190 // relying on the leader lease. It can be affected by clock drift. 191 // If the clock drift is unbounded, leader might keep the lease longer than it 192 // should (clock can move backward/pause without any bound). ReadIndex is not safe 193 // in that case. 194 // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. 195 ReadOnlyOption ReadOnlyOption 196 197 // Logger is the logger used for raft log. For multinode which can host 198 // multiple raft group, each raft group can have its own logger 199 Logger Logger 200 201 // DisableProposalForwarding set to true means that followers will drop 202 // proposals, rather than forwarding them to the leader. One use case for 203 // this feature would be in a situation where the Raft leader is used to 204 // compute the data of a proposal, for example, adding a timestamp from a 205 // hybrid logical clock to data in a monotonically increasing way. Forwarding 206 // should be disabled to prevent a follower with an inaccurate hybrid 207 // logical clock from assigning the timestamp and then forwarding the data 208 // to the leader. 209 DisableProposalForwarding bool 210} 211 212func (c *Config) validate() error { 213 if c.ID == None { 214 return errors.New("cannot use none as id") 215 } 216 217 if c.HeartbeatTick <= 0 { 218 return errors.New("heartbeat tick must be greater than 0") 219 } 220 221 if c.ElectionTick <= c.HeartbeatTick { 222 return errors.New("election tick must be greater than heartbeat tick") 223 } 224 225 if c.Storage == nil { 226 return errors.New("storage cannot be nil") 227 } 228 229 if c.MaxUncommittedEntriesSize == 0 { 230 c.MaxUncommittedEntriesSize = noLimit 231 } 232 233 // default MaxCommittedSizePerReady to MaxSizePerMsg because they were 234 // previously the same parameter. 235 if c.MaxCommittedSizePerReady == 0 { 236 c.MaxCommittedSizePerReady = c.MaxSizePerMsg 237 } 238 239 if c.MaxInflightMsgs <= 0 { 240 return errors.New("max inflight messages must be greater than 0") 241 } 242 243 if c.Logger == nil { 244 c.Logger = raftLogger 245 } 246 247 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { 248 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") 249 } 250 251 return nil 252} 253 254type raft struct { 255 id uint64 256 257 Term uint64 258 Vote uint64 259 260 readStates []ReadState 261 262 // the log 263 raftLog *raftLog 264 265 maxMsgSize uint64 266 maxUncommittedSize uint64 267 // TODO(tbg): rename to trk. 268 prs tracker.ProgressTracker 269 270 state StateType 271 272 // isLearner is true if the local raft node is a learner. 273 isLearner bool 274 275 msgs []pb.Message 276 277 // the leader id 278 lead uint64 279 // leadTransferee is id of the leader transfer target when its value is not zero. 280 // Follow the procedure defined in raft thesis 3.10. 281 leadTransferee uint64 282 // Only one conf change may be pending (in the log, but not yet 283 // applied) at a time. This is enforced via pendingConfIndex, which 284 // is set to a value >= the log index of the latest pending 285 // configuration change (if any). Config changes are only allowed to 286 // be proposed if the leader's applied index is greater than this 287 // value. 288 pendingConfIndex uint64 289 // an estimate of the size of the uncommitted tail of the Raft log. Used to 290 // prevent unbounded log growth. Only maintained by the leader. Reset on 291 // term changes. 292 uncommittedSize uint64 293 294 readOnly *readOnly 295 296 // number of ticks since it reached last electionTimeout when it is leader 297 // or candidate. 298 // number of ticks since it reached last electionTimeout or received a 299 // valid message from current leader when it is a follower. 300 electionElapsed int 301 302 // number of ticks since it reached last heartbeatTimeout. 303 // only leader keeps heartbeatElapsed. 304 heartbeatElapsed int 305 306 checkQuorum bool 307 preVote bool 308 309 heartbeatTimeout int 310 electionTimeout int 311 // randomizedElectionTimeout is a random number between 312 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset 313 // when raft changes its state to follower or candidate. 314 randomizedElectionTimeout int 315 disableProposalForwarding bool 316 317 tick func() 318 step stepFunc 319 320 logger Logger 321} 322 323func newRaft(c *Config) *raft { 324 if err := c.validate(); err != nil { 325 panic(err.Error()) 326 } 327 raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) 328 hs, cs, err := c.Storage.InitialState() 329 if err != nil { 330 panic(err) // TODO(bdarnell) 331 } 332 333 if len(c.peers) > 0 || len(c.learners) > 0 { 334 if len(cs.Voters) > 0 || len(cs.Learners) > 0 { 335 // TODO(bdarnell): the peers argument is always nil except in 336 // tests; the argument should be removed and these tests should be 337 // updated to specify their nodes through a snapshot. 338 panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)") 339 } 340 cs.Voters = c.peers 341 cs.Learners = c.learners 342 } 343 344 r := &raft{ 345 id: c.ID, 346 lead: None, 347 isLearner: false, 348 raftLog: raftlog, 349 maxMsgSize: c.MaxSizePerMsg, 350 maxUncommittedSize: c.MaxUncommittedEntriesSize, 351 prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), 352 electionTimeout: c.ElectionTick, 353 heartbeatTimeout: c.HeartbeatTick, 354 logger: c.Logger, 355 checkQuorum: c.CheckQuorum, 356 preVote: c.PreVote, 357 readOnly: newReadOnly(c.ReadOnlyOption), 358 disableProposalForwarding: c.DisableProposalForwarding, 359 } 360 361 cfg, prs, err := confchange.Restore(confchange.Changer{ 362 Tracker: r.prs, 363 LastIndex: raftlog.lastIndex(), 364 }, cs) 365 if err != nil { 366 panic(err) 367 } 368 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) 369 370 if !IsEmptyHardState(hs) { 371 r.loadState(hs) 372 } 373 if c.Applied > 0 { 374 raftlog.appliedTo(c.Applied) 375 } 376 r.becomeFollower(r.Term, None) 377 378 var nodesStrs []string 379 for _, n := range r.prs.VoterNodes() { 380 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) 381 } 382 383 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", 384 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) 385 return r 386} 387 388func (r *raft) hasLeader() bool { return r.lead != None } 389 390func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} } 391 392func (r *raft) hardState() pb.HardState { 393 return pb.HardState{ 394 Term: r.Term, 395 Vote: r.Vote, 396 Commit: r.raftLog.committed, 397 } 398} 399 400// send persists state to stable storage and then sends to its mailbox. 401func (r *raft) send(m pb.Message) { 402 if m.From == None { 403 m.From = r.id 404 } 405 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { 406 if m.Term == 0 { 407 // All {pre-,}campaign messages need to have the term set when 408 // sending. 409 // - MsgVote: m.Term is the term the node is campaigning for, 410 // non-zero as we increment the term when campaigning. 411 // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was 412 // granted, non-zero for the same reason MsgVote is 413 // - MsgPreVote: m.Term is the term the node will campaign, 414 // non-zero as we use m.Term to indicate the next term we'll be 415 // campaigning for 416 // - MsgPreVoteResp: m.Term is the term received in the original 417 // MsgPreVote if the pre-vote was granted, non-zero for the 418 // same reasons MsgPreVote is 419 panic(fmt.Sprintf("term should be set when sending %s", m.Type)) 420 } 421 } else { 422 if m.Term != 0 { 423 panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) 424 } 425 // do not attach term to MsgProp, MsgReadIndex 426 // proposals are a way to forward to the leader and 427 // should be treated as local message. 428 // MsgReadIndex is also forwarded to leader. 429 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { 430 m.Term = r.Term 431 } 432 } 433 r.msgs = append(r.msgs, m) 434} 435 436// sendAppend sends an append RPC with new entries (if any) and the 437// current commit index to the given peer. 438func (r *raft) sendAppend(to uint64) { 439 r.maybeSendAppend(to, true) 440} 441 442// maybeSendAppend sends an append RPC with new entries to the given peer, 443// if necessary. Returns true if a message was sent. The sendIfEmpty 444// argument controls whether messages with no entries will be sent 445// ("empty" messages are useful to convey updated Commit indexes, but 446// are undesirable when we're sending multiple messages in a batch). 447func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { 448 pr := r.prs.Progress[to] 449 if pr.IsPaused() { 450 return false 451 } 452 m := pb.Message{} 453 m.To = to 454 455 term, errt := r.raftLog.term(pr.Next - 1) 456 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) 457 if len(ents) == 0 && !sendIfEmpty { 458 return false 459 } 460 461 if errt != nil || erre != nil { // send snapshot if we failed to get term or entries 462 if !pr.RecentActive { 463 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) 464 return false 465 } 466 467 m.Type = pb.MsgSnap 468 snapshot, err := r.raftLog.snapshot() 469 if err != nil { 470 if err == ErrSnapshotTemporarilyUnavailable { 471 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) 472 return false 473 } 474 panic(err) // TODO(bdarnell) 475 } 476 if IsEmptySnap(snapshot) { 477 panic("need non-empty snapshot") 478 } 479 m.Snapshot = snapshot 480 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term 481 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", 482 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) 483 pr.BecomeSnapshot(sindex) 484 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) 485 } else { 486 m.Type = pb.MsgApp 487 m.Index = pr.Next - 1 488 m.LogTerm = term 489 m.Entries = ents 490 m.Commit = r.raftLog.committed 491 if n := len(m.Entries); n != 0 { 492 switch pr.State { 493 // optimistically increase the next when in StateReplicate 494 case tracker.StateReplicate: 495 last := m.Entries[n-1].Index 496 pr.OptimisticUpdate(last) 497 pr.Inflights.Add(last) 498 case tracker.StateProbe: 499 pr.ProbeSent = true 500 default: 501 r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) 502 } 503 } 504 } 505 r.send(m) 506 return true 507} 508 509// sendHeartbeat sends a heartbeat RPC to the given peer. 510func (r *raft) sendHeartbeat(to uint64, ctx []byte) { 511 // Attach the commit as min(to.matched, r.committed). 512 // When the leader sends out heartbeat message, 513 // the receiver(follower) might not be matched with the leader 514 // or it might not have all the committed entries. 515 // The leader MUST NOT forward the follower's commit to 516 // an unmatched index. 517 commit := min(r.prs.Progress[to].Match, r.raftLog.committed) 518 m := pb.Message{ 519 To: to, 520 Type: pb.MsgHeartbeat, 521 Commit: commit, 522 Context: ctx, 523 } 524 525 r.send(m) 526} 527 528// bcastAppend sends RPC, with entries to all peers that are not up-to-date 529// according to the progress recorded in r.prs. 530func (r *raft) bcastAppend() { 531 r.prs.Visit(func(id uint64, _ *tracker.Progress) { 532 if id == r.id { 533 return 534 } 535 r.sendAppend(id) 536 }) 537} 538 539// bcastHeartbeat sends RPC, without entries to all the peers. 540func (r *raft) bcastHeartbeat() { 541 lastCtx := r.readOnly.lastPendingRequestCtx() 542 if len(lastCtx) == 0 { 543 r.bcastHeartbeatWithCtx(nil) 544 } else { 545 r.bcastHeartbeatWithCtx([]byte(lastCtx)) 546 } 547} 548 549func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { 550 r.prs.Visit(func(id uint64, _ *tracker.Progress) { 551 if id == r.id { 552 return 553 } 554 r.sendHeartbeat(id, ctx) 555 }) 556} 557 558func (r *raft) advance(rd Ready) { 559 r.reduceUncommittedSize(rd.CommittedEntries) 560 561 // If entries were applied (or a snapshot), update our cursor for 562 // the next Ready. Note that if the current HardState contains a 563 // new Commit index, this does not mean that we're also applying 564 // all of the new entries due to commit pagination by size. 565 if newApplied := rd.appliedCursor(); newApplied > 0 { 566 oldApplied := r.raftLog.applied 567 r.raftLog.appliedTo(newApplied) 568 569 if r.prs.Config.AutoLeave && oldApplied < r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader { 570 // If the current (and most recent, at least for this leader's term) 571 // configuration should be auto-left, initiate that now. We use a 572 // nil Data which unmarshals into an empty ConfChangeV2 and has the 573 // benefit that appendEntry can never refuse it based on its size 574 // (which registers as zero). 575 ent := pb.Entry{ 576 Type: pb.EntryConfChangeV2, 577 Data: nil, 578 } 579 // There's no way in which this proposal should be able to be rejected. 580 if !r.appendEntry(ent) { 581 panic("refused un-refusable auto-leaving ConfChangeV2") 582 } 583 r.pendingConfIndex = r.raftLog.lastIndex() 584 r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) 585 } 586 } 587 588 if len(rd.Entries) > 0 { 589 e := rd.Entries[len(rd.Entries)-1] 590 r.raftLog.stableTo(e.Index, e.Term) 591 } 592 if !IsEmptySnap(rd.Snapshot) { 593 r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) 594 } 595} 596 597// maybeCommit attempts to advance the commit index. Returns true if 598// the commit index changed (in which case the caller should call 599// r.bcastAppend). 600func (r *raft) maybeCommit() bool { 601 mci := r.prs.Committed() 602 return r.raftLog.maybeCommit(mci, r.Term) 603} 604 605func (r *raft) reset(term uint64) { 606 if r.Term != term { 607 r.Term = term 608 r.Vote = None 609 } 610 r.lead = None 611 612 r.electionElapsed = 0 613 r.heartbeatElapsed = 0 614 r.resetRandomizedElectionTimeout() 615 616 r.abortLeaderTransfer() 617 618 r.prs.ResetVotes() 619 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 620 *pr = tracker.Progress{ 621 Match: 0, 622 Next: r.raftLog.lastIndex() + 1, 623 Inflights: tracker.NewInflights(r.prs.MaxInflight), 624 IsLearner: pr.IsLearner, 625 } 626 if id == r.id { 627 pr.Match = r.raftLog.lastIndex() 628 } 629 }) 630 631 r.pendingConfIndex = 0 632 r.uncommittedSize = 0 633 r.readOnly = newReadOnly(r.readOnly.option) 634} 635 636func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { 637 li := r.raftLog.lastIndex() 638 for i := range es { 639 es[i].Term = r.Term 640 es[i].Index = li + 1 + uint64(i) 641 } 642 // Track the size of this uncommitted proposal. 643 if !r.increaseUncommittedSize(es) { 644 r.logger.Debugf( 645 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", 646 r.id, 647 ) 648 // Drop the proposal. 649 return false 650 } 651 // use latest "last" index after truncate/append 652 li = r.raftLog.append(es...) 653 r.prs.Progress[r.id].MaybeUpdate(li) 654 // Regardless of maybeCommit's return, our caller will call bcastAppend. 655 r.maybeCommit() 656 return true 657} 658 659// tickElection is run by followers and candidates after r.electionTimeout. 660func (r *raft) tickElection() { 661 r.electionElapsed++ 662 663 if r.promotable() && r.pastElectionTimeout() { 664 r.electionElapsed = 0 665 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) 666 } 667} 668 669// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. 670func (r *raft) tickHeartbeat() { 671 r.heartbeatElapsed++ 672 r.electionElapsed++ 673 674 if r.electionElapsed >= r.electionTimeout { 675 r.electionElapsed = 0 676 if r.checkQuorum { 677 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) 678 } 679 // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. 680 if r.state == StateLeader && r.leadTransferee != None { 681 r.abortLeaderTransfer() 682 } 683 } 684 685 if r.state != StateLeader { 686 return 687 } 688 689 if r.heartbeatElapsed >= r.heartbeatTimeout { 690 r.heartbeatElapsed = 0 691 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) 692 } 693} 694 695func (r *raft) becomeFollower(term uint64, lead uint64) { 696 r.step = stepFollower 697 r.reset(term) 698 r.tick = r.tickElection 699 r.lead = lead 700 r.state = StateFollower 701 r.logger.Infof("%x became follower at term %d", r.id, r.Term) 702} 703 704func (r *raft) becomeCandidate() { 705 // TODO(xiangli) remove the panic when the raft implementation is stable 706 if r.state == StateLeader { 707 panic("invalid transition [leader -> candidate]") 708 } 709 r.step = stepCandidate 710 r.reset(r.Term + 1) 711 r.tick = r.tickElection 712 r.Vote = r.id 713 r.state = StateCandidate 714 r.logger.Infof("%x became candidate at term %d", r.id, r.Term) 715} 716 717func (r *raft) becomePreCandidate() { 718 // TODO(xiangli) remove the panic when the raft implementation is stable 719 if r.state == StateLeader { 720 panic("invalid transition [leader -> pre-candidate]") 721 } 722 // Becoming a pre-candidate changes our step functions and state, 723 // but doesn't change anything else. In particular it does not increase 724 // r.Term or change r.Vote. 725 r.step = stepCandidate 726 r.prs.ResetVotes() 727 r.tick = r.tickElection 728 r.lead = None 729 r.state = StatePreCandidate 730 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) 731} 732 733func (r *raft) becomeLeader() { 734 // TODO(xiangli) remove the panic when the raft implementation is stable 735 if r.state == StateFollower { 736 panic("invalid transition [follower -> leader]") 737 } 738 r.step = stepLeader 739 r.reset(r.Term) 740 r.tick = r.tickHeartbeat 741 r.lead = r.id 742 r.state = StateLeader 743 // Followers enter replicate mode when they've been successfully probed 744 // (perhaps after having received a snapshot as a result). The leader is 745 // trivially in this state. Note that r.reset() has initialized this 746 // progress with the last index already. 747 r.prs.Progress[r.id].BecomeReplicate() 748 749 // Conservatively set the pendingConfIndex to the last index in the 750 // log. There may or may not be a pending config change, but it's 751 // safe to delay any future proposals until we commit all our 752 // pending log entries, and scanning the entire tail of the log 753 // could be expensive. 754 r.pendingConfIndex = r.raftLog.lastIndex() 755 756 emptyEnt := pb.Entry{Data: nil} 757 if !r.appendEntry(emptyEnt) { 758 // This won't happen because we just called reset() above. 759 r.logger.Panic("empty entry was dropped") 760 } 761 // As a special case, don't count the initial empty entry towards the 762 // uncommitted log quota. This is because we want to preserve the 763 // behavior of allowing one entry larger than quota if the current 764 // usage is zero. 765 r.reduceUncommittedSize([]pb.Entry{emptyEnt}) 766 r.logger.Infof("%x became leader at term %d", r.id, r.Term) 767} 768 769// campaign transitions the raft instance to candidate state. This must only be 770// called after verifying that this is a legitimate transition. 771func (r *raft) campaign(t CampaignType) { 772 if !r.promotable() { 773 // This path should not be hit (callers are supposed to check), but 774 // better safe than sorry. 775 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id) 776 } 777 var term uint64 778 var voteMsg pb.MessageType 779 if t == campaignPreElection { 780 r.becomePreCandidate() 781 voteMsg = pb.MsgPreVote 782 // PreVote RPCs are sent for the next term before we've incremented r.Term. 783 term = r.Term + 1 784 } else { 785 r.becomeCandidate() 786 voteMsg = pb.MsgVote 787 term = r.Term 788 } 789 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { 790 // We won the election after voting for ourselves (which must mean that 791 // this is a single-node cluster). Advance to the next state. 792 if t == campaignPreElection { 793 r.campaign(campaignElection) 794 } else { 795 r.becomeLeader() 796 } 797 return 798 } 799 var ids []uint64 800 { 801 idMap := r.prs.Voters.IDs() 802 ids = make([]uint64, 0, len(idMap)) 803 for id := range idMap { 804 ids = append(ids, id) 805 } 806 sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) 807 } 808 for _, id := range ids { 809 if id == r.id { 810 continue 811 } 812 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", 813 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) 814 815 var ctx []byte 816 if t == campaignTransfer { 817 ctx = []byte(t) 818 } 819 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) 820 } 821} 822 823func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { 824 if v { 825 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) 826 } else { 827 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) 828 } 829 r.prs.RecordVote(id, v) 830 return r.prs.TallyVotes() 831} 832 833func (r *raft) Step(m pb.Message) error { 834 // Handle the message term, which may result in our stepping down to a follower. 835 switch { 836 case m.Term == 0: 837 // local message 838 case m.Term > r.Term: 839 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { 840 force := bytes.Equal(m.Context, []byte(campaignTransfer)) 841 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout 842 if !force && inLease { 843 // If a server receives a RequestVote request within the minimum election timeout 844 // of hearing from a current leader, it does not update its term or grant its vote 845 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", 846 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) 847 return nil 848 } 849 } 850 switch { 851 case m.Type == pb.MsgPreVote: 852 // Never change our term in response to a PreVote 853 case m.Type == pb.MsgPreVoteResp && !m.Reject: 854 // We send pre-vote requests with a term in our future. If the 855 // pre-vote is granted, we will increment our term when we get a 856 // quorum. If it is not, the term comes from the node that 857 // rejected our vote so we should become a follower at the new 858 // term. 859 default: 860 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", 861 r.id, r.Term, m.Type, m.From, m.Term) 862 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { 863 r.becomeFollower(m.Term, m.From) 864 } else { 865 r.becomeFollower(m.Term, None) 866 } 867 } 868 869 case m.Term < r.Term: 870 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { 871 // We have received messages from a leader at a lower term. It is possible 872 // that these messages were simply delayed in the network, but this could 873 // also mean that this node has advanced its term number during a network 874 // partition, and it is now unable to either win an election or to rejoin 875 // the majority on the old term. If checkQuorum is false, this will be 876 // handled by incrementing term numbers in response to MsgVote with a 877 // higher term, but if checkQuorum is true we may not advance the term on 878 // MsgVote and must generate other messages to advance the term. The net 879 // result of these two features is to minimize the disruption caused by 880 // nodes that have been removed from the cluster's configuration: a 881 // removed node will send MsgVotes (or MsgPreVotes) which will be ignored, 882 // but it will not receive MsgApp or MsgHeartbeat, so it will not create 883 // disruptive term increases, by notifying leader of this node's activeness. 884 // The above comments also true for Pre-Vote 885 // 886 // When follower gets isolated, it soon starts an election ending 887 // up with a higher term than leader, although it won't receive enough 888 // votes to win the election. When it regains connectivity, this response 889 // with "pb.MsgAppResp" of higher term would force leader to step down. 890 // However, this disruption is inevitable to free this stuck node with 891 // fresh election. This can be prevented with Pre-Vote phase. 892 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) 893 } else if m.Type == pb.MsgPreVote { 894 // Before Pre-Vote enable, there may have candidate with higher term, 895 // but less log. After update to Pre-Vote, the cluster may deadlock if 896 // we drop messages with a lower term. 897 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", 898 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 899 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) 900 } else { 901 // ignore other cases 902 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", 903 r.id, r.Term, m.Type, m.From, m.Term) 904 } 905 return nil 906 } 907 908 switch m.Type { 909 case pb.MsgHup: 910 if r.state != StateLeader { 911 if !r.promotable() { 912 r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id) 913 return nil 914 } 915 ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) 916 if err != nil { 917 r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) 918 } 919 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { 920 r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) 921 return nil 922 } 923 924 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) 925 if r.preVote { 926 r.campaign(campaignPreElection) 927 } else { 928 r.campaign(campaignElection) 929 } 930 } else { 931 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) 932 } 933 934 case pb.MsgVote, pb.MsgPreVote: 935 // We can vote if this is a repeat of a vote we've already cast... 936 canVote := r.Vote == m.From || 937 // ...we haven't voted and we don't think there's a leader yet in this term... 938 (r.Vote == None && r.lead == None) || 939 // ...or this is a PreVote for a future term... 940 (m.Type == pb.MsgPreVote && m.Term > r.Term) 941 // ...and we believe the candidate is up to date. 942 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { 943 // Note: it turns out that that learners must be allowed to cast votes. 944 // This seems counter- intuitive but is necessary in the situation in which 945 // a learner has been promoted (i.e. is now a voter) but has not learned 946 // about this yet. 947 // For example, consider a group in which id=1 is a learner and id=2 and 948 // id=3 are voters. A configuration change promoting 1 can be committed on 949 // the quorum `{2,3}` without the config change being appended to the 950 // learner's log. If the leader (say 2) fails, there are de facto two 951 // voters remaining. Only 3 can win an election (due to its log containing 952 // all committed entries), but to do so it will need 1 to vote. But 1 953 // considers itself a learner and will continue to do so until 3 has 954 // stepped up as leader, replicates the conf change to 1, and 1 applies it. 955 // Ultimately, by receiving a request to vote, the learner realizes that 956 // the candidate believes it to be a voter, and that it should act 957 // accordingly. The candidate's config may be stale, too; but in that case 958 // it won't win the election, at least in the absence of the bug discussed 959 // in: 960 // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263. 961 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", 962 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 963 // When responding to Msg{Pre,}Vote messages we include the term 964 // from the message, not the local term. To see why, consider the 965 // case where a single node was previously partitioned away and 966 // it's local term is now out of date. If we include the local term 967 // (recall that for pre-votes we don't update the local term), the 968 // (pre-)campaigning node on the other end will proceed to ignore 969 // the message (it ignores all out of date messages). 970 // The term in the original message and current local term are the 971 // same in the case of regular votes, but different for pre-votes. 972 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) 973 if m.Type == pb.MsgVote { 974 // Only record real votes. 975 r.electionElapsed = 0 976 r.Vote = m.From 977 } 978 } else { 979 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", 980 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 981 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) 982 } 983 984 default: 985 err := r.step(r, m) 986 if err != nil { 987 return err 988 } 989 } 990 return nil 991} 992 993type stepFunc func(r *raft, m pb.Message) error 994 995func stepLeader(r *raft, m pb.Message) error { 996 // These message types do not require any progress for m.From. 997 switch m.Type { 998 case pb.MsgBeat: 999 r.bcastHeartbeat() 1000 return nil 1001 case pb.MsgCheckQuorum: 1002 // The leader should always see itself as active. As a precaution, handle 1003 // the case in which the leader isn't in the configuration any more (for 1004 // example if it just removed itself). 1005 // 1006 // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the 1007 // leader steps down when removing itself. I might be missing something. 1008 if pr := r.prs.Progress[r.id]; pr != nil { 1009 pr.RecentActive = true 1010 } 1011 if !r.prs.QuorumActive() { 1012 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) 1013 r.becomeFollower(r.Term, None) 1014 } 1015 // Mark everyone (but ourselves) as inactive in preparation for the next 1016 // CheckQuorum. 1017 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 1018 if id != r.id { 1019 pr.RecentActive = false 1020 } 1021 }) 1022 return nil 1023 case pb.MsgProp: 1024 if len(m.Entries) == 0 { 1025 r.logger.Panicf("%x stepped empty MsgProp", r.id) 1026 } 1027 if r.prs.Progress[r.id] == nil { 1028 // If we are not currently a member of the range (i.e. this node 1029 // was removed from the configuration while serving as leader), 1030 // drop any new proposals. 1031 return ErrProposalDropped 1032 } 1033 if r.leadTransferee != None { 1034 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) 1035 return ErrProposalDropped 1036 } 1037 1038 for i := range m.Entries { 1039 e := &m.Entries[i] 1040 var cc pb.ConfChangeI 1041 if e.Type == pb.EntryConfChange { 1042 var ccc pb.ConfChange 1043 if err := ccc.Unmarshal(e.Data); err != nil { 1044 panic(err) 1045 } 1046 cc = ccc 1047 } else if e.Type == pb.EntryConfChangeV2 { 1048 var ccc pb.ConfChangeV2 1049 if err := ccc.Unmarshal(e.Data); err != nil { 1050 panic(err) 1051 } 1052 cc = ccc 1053 } 1054 if cc != nil { 1055 alreadyPending := r.pendingConfIndex > r.raftLog.applied 1056 alreadyJoint := len(r.prs.Config.Voters[1]) > 0 1057 wantsLeaveJoint := len(cc.AsV2().Changes) == 0 1058 1059 var refused string 1060 if alreadyPending { 1061 refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) 1062 } else if alreadyJoint && !wantsLeaveJoint { 1063 refused = "must transition out of joint config first" 1064 } else if !alreadyJoint && wantsLeaveJoint { 1065 refused = "not in joint state; refusing empty conf change" 1066 } 1067 1068 if refused != "" { 1069 r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) 1070 m.Entries[i] = pb.Entry{Type: pb.EntryNormal} 1071 } else { 1072 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 1073 } 1074 } 1075 } 1076 1077 if !r.appendEntry(m.Entries...) { 1078 return ErrProposalDropped 1079 } 1080 r.bcastAppend() 1081 return nil 1082 case pb.MsgReadIndex: 1083 // only one voting member (the leader) in the cluster 1084 if r.prs.IsSingleton() { 1085 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { 1086 r.send(resp) 1087 } 1088 return nil 1089 } 1090 1091 // Reject read only request when this leader has not committed any log entry at its term. 1092 if !r.committedEntryInCurrentTerm() { 1093 return nil 1094 } 1095 1096 // thinking: use an interally defined context instead of the user given context. 1097 // We can express this in terms of the term and index instead of a user-supplied value. 1098 // This would allow multiple reads to piggyback on the same message. 1099 switch r.readOnly.option { 1100 // If more than the local vote is needed, go through a full broadcast. 1101 case ReadOnlySafe: 1102 r.readOnly.addRequest(r.raftLog.committed, m) 1103 // The local node automatically acks the request. 1104 r.readOnly.recvAck(r.id, m.Entries[0].Data) 1105 r.bcastHeartbeatWithCtx(m.Entries[0].Data) 1106 case ReadOnlyLeaseBased: 1107 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { 1108 r.send(resp) 1109 } 1110 } 1111 return nil 1112 } 1113 1114 // All other message types require a progress for m.From (pr). 1115 pr := r.prs.Progress[m.From] 1116 if pr == nil { 1117 r.logger.Debugf("%x no progress available for %x", r.id, m.From) 1118 return nil 1119 } 1120 switch m.Type { 1121 case pb.MsgAppResp: 1122 pr.RecentActive = true 1123 1124 if m.Reject { 1125 r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d", 1126 r.id, m.RejectHint, m.From, m.Index) 1127 if pr.MaybeDecrTo(m.Index, m.RejectHint) { 1128 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) 1129 if pr.State == tracker.StateReplicate { 1130 pr.BecomeProbe() 1131 } 1132 r.sendAppend(m.From) 1133 } 1134 } else { 1135 oldPaused := pr.IsPaused() 1136 if pr.MaybeUpdate(m.Index) { 1137 switch { 1138 case pr.State == tracker.StateProbe: 1139 pr.BecomeReplicate() 1140 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: 1141 // TODO(tbg): we should also enter this branch if a snapshot is 1142 // received that is below pr.PendingSnapshot but which makes it 1143 // possible to use the log again. 1144 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1145 // Transition back to replicating state via probing state 1146 // (which takes the snapshot into account). If we didn't 1147 // move to replicating state, that would only happen with 1148 // the next round of appends (but there may not be a next 1149 // round for a while, exposing an inconsistent RaftStatus). 1150 pr.BecomeProbe() 1151 pr.BecomeReplicate() 1152 case pr.State == tracker.StateReplicate: 1153 pr.Inflights.FreeLE(m.Index) 1154 } 1155 1156 if r.maybeCommit() { 1157 r.bcastAppend() 1158 } else if oldPaused { 1159 // If we were paused before, this node may be missing the 1160 // latest commit index, so send it. 1161 r.sendAppend(m.From) 1162 } 1163 // We've updated flow control information above, which may 1164 // allow us to send multiple (size-limited) in-flight messages 1165 // at once (such as when transitioning from probe to 1166 // replicate, or when freeTo() covers multiple messages). If 1167 // we have more entries to send, send as many messages as we 1168 // can (without sending empty messages for the commit index) 1169 for r.maybeSendAppend(m.From, false) { 1170 } 1171 // Transfer leadership is in progress. 1172 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { 1173 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) 1174 r.sendTimeoutNow(m.From) 1175 } 1176 } 1177 } 1178 case pb.MsgHeartbeatResp: 1179 pr.RecentActive = true 1180 pr.ProbeSent = false 1181 1182 // free one slot for the full inflights window to allow progress. 1183 if pr.State == tracker.StateReplicate && pr.Inflights.Full() { 1184 pr.Inflights.FreeFirstOne() 1185 } 1186 if pr.Match < r.raftLog.lastIndex() { 1187 r.sendAppend(m.From) 1188 } 1189 1190 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { 1191 return nil 1192 } 1193 1194 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { 1195 return nil 1196 } 1197 1198 rss := r.readOnly.advance(m) 1199 for _, rs := range rss { 1200 if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { 1201 r.send(resp) 1202 } 1203 } 1204 case pb.MsgSnapStatus: 1205 if pr.State != tracker.StateSnapshot { 1206 return nil 1207 } 1208 // TODO(tbg): this code is very similar to the snapshot handling in 1209 // MsgAppResp above. In fact, the code there is more correct than the 1210 // code here and should likely be updated to match (or even better, the 1211 // logic pulled into a newly created Progress state machine handler). 1212 if !m.Reject { 1213 pr.BecomeProbe() 1214 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1215 } else { 1216 // NB: the order here matters or we'll be probing erroneously from 1217 // the snapshot index, but the snapshot never applied. 1218 pr.PendingSnapshot = 0 1219 pr.BecomeProbe() 1220 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1221 } 1222 // If snapshot finish, wait for the MsgAppResp from the remote node before sending 1223 // out the next MsgApp. 1224 // If snapshot failure, wait for a heartbeat interval before next try 1225 pr.ProbeSent = true 1226 case pb.MsgUnreachable: 1227 // During optimistic replication, if the remote becomes unreachable, 1228 // there is huge probability that a MsgApp is lost. 1229 if pr.State == tracker.StateReplicate { 1230 pr.BecomeProbe() 1231 } 1232 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) 1233 case pb.MsgTransferLeader: 1234 if pr.IsLearner { 1235 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) 1236 return nil 1237 } 1238 leadTransferee := m.From 1239 lastLeadTransferee := r.leadTransferee 1240 if lastLeadTransferee != None { 1241 if lastLeadTransferee == leadTransferee { 1242 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", 1243 r.id, r.Term, leadTransferee, leadTransferee) 1244 return nil 1245 } 1246 r.abortLeaderTransfer() 1247 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) 1248 } 1249 if leadTransferee == r.id { 1250 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) 1251 return nil 1252 } 1253 // Transfer leadership to third party. 1254 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) 1255 // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed. 1256 r.electionElapsed = 0 1257 r.leadTransferee = leadTransferee 1258 if pr.Match == r.raftLog.lastIndex() { 1259 r.sendTimeoutNow(leadTransferee) 1260 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) 1261 } else { 1262 r.sendAppend(leadTransferee) 1263 } 1264 } 1265 return nil 1266} 1267 1268// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is 1269// whether they respond to MsgVoteResp or MsgPreVoteResp. 1270func stepCandidate(r *raft, m pb.Message) error { 1271 // Only handle vote responses corresponding to our candidacy (while in 1272 // StateCandidate, we may get stale MsgPreVoteResp messages in this term from 1273 // our pre-candidate state). 1274 var myVoteRespType pb.MessageType 1275 if r.state == StatePreCandidate { 1276 myVoteRespType = pb.MsgPreVoteResp 1277 } else { 1278 myVoteRespType = pb.MsgVoteResp 1279 } 1280 switch m.Type { 1281 case pb.MsgProp: 1282 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) 1283 return ErrProposalDropped 1284 case pb.MsgApp: 1285 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1286 r.handleAppendEntries(m) 1287 case pb.MsgHeartbeat: 1288 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1289 r.handleHeartbeat(m) 1290 case pb.MsgSnap: 1291 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1292 r.handleSnapshot(m) 1293 case myVoteRespType: 1294 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) 1295 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) 1296 switch res { 1297 case quorum.VoteWon: 1298 if r.state == StatePreCandidate { 1299 r.campaign(campaignElection) 1300 } else { 1301 r.becomeLeader() 1302 r.bcastAppend() 1303 } 1304 case quorum.VoteLost: 1305 // pb.MsgPreVoteResp contains future term of pre-candidate 1306 // m.Term > r.Term; reuse r.Term 1307 r.becomeFollower(r.Term, None) 1308 } 1309 case pb.MsgTimeoutNow: 1310 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) 1311 } 1312 return nil 1313} 1314 1315func stepFollower(r *raft, m pb.Message) error { 1316 switch m.Type { 1317 case pb.MsgProp: 1318 if r.lead == None { 1319 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) 1320 return ErrProposalDropped 1321 } else if r.disableProposalForwarding { 1322 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) 1323 return ErrProposalDropped 1324 } 1325 m.To = r.lead 1326 r.send(m) 1327 case pb.MsgApp: 1328 r.electionElapsed = 0 1329 r.lead = m.From 1330 r.handleAppendEntries(m) 1331 case pb.MsgHeartbeat: 1332 r.electionElapsed = 0 1333 r.lead = m.From 1334 r.handleHeartbeat(m) 1335 case pb.MsgSnap: 1336 r.electionElapsed = 0 1337 r.lead = m.From 1338 r.handleSnapshot(m) 1339 case pb.MsgTransferLeader: 1340 if r.lead == None { 1341 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) 1342 return nil 1343 } 1344 m.To = r.lead 1345 r.send(m) 1346 case pb.MsgTimeoutNow: 1347 if r.promotable() { 1348 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) 1349 // Leadership transfers never use pre-vote even if r.preVote is true; we 1350 // know we are not recovering from a partition so there is no need for the 1351 // extra round trip. 1352 r.campaign(campaignTransfer) 1353 } else { 1354 r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From) 1355 } 1356 case pb.MsgReadIndex: 1357 if r.lead == None { 1358 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) 1359 return nil 1360 } 1361 m.To = r.lead 1362 r.send(m) 1363 case pb.MsgReadIndexResp: 1364 if len(m.Entries) != 1 { 1365 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) 1366 return nil 1367 } 1368 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) 1369 } 1370 return nil 1371} 1372 1373func (r *raft) handleAppendEntries(m pb.Message) { 1374 if m.Index < r.raftLog.committed { 1375 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) 1376 return 1377 } 1378 1379 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { 1380 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) 1381 } else { 1382 r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", 1383 r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) 1384 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) 1385 } 1386} 1387 1388func (r *raft) handleHeartbeat(m pb.Message) { 1389 r.raftLog.commitTo(m.Commit) 1390 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) 1391} 1392 1393func (r *raft) handleSnapshot(m pb.Message) { 1394 sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term 1395 if r.restore(m.Snapshot) { 1396 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", 1397 r.id, r.raftLog.committed, sindex, sterm) 1398 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) 1399 } else { 1400 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", 1401 r.id, r.raftLog.committed, sindex, sterm) 1402 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) 1403 } 1404} 1405 1406// restore recovers the state machine from a snapshot. It restores the log and the 1407// configuration of state machine. If this method returns false, the snapshot was 1408// ignored, either because it was obsolete or because of an error. 1409func (r *raft) restore(s pb.Snapshot) bool { 1410 if s.Metadata.Index <= r.raftLog.committed { 1411 return false 1412 } 1413 if r.state != StateFollower { 1414 // This is defense-in-depth: if the leader somehow ended up applying a 1415 // snapshot, it could move into a new term without moving into a 1416 // follower state. This should never fire, but if it did, we'd have 1417 // prevented damage by returning early, so log only a loud warning. 1418 // 1419 // At the time of writing, the instance is guaranteed to be in follower 1420 // state when this method is called. 1421 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id) 1422 r.becomeFollower(r.Term+1, None) 1423 return false 1424 } 1425 1426 // More defense-in-depth: throw away snapshot if recipient is not in the 1427 // config. This shouldn't ever happen (at the time of writing) but lots of 1428 // code here and there assumes that r.id is in the progress tracker. 1429 found := false 1430 cs := s.Metadata.ConfState 1431 for _, set := range [][]uint64{ 1432 cs.Voters, 1433 cs.Learners, 1434 } { 1435 for _, id := range set { 1436 if id == r.id { 1437 found = true 1438 break 1439 } 1440 } 1441 } 1442 if !found { 1443 r.logger.Warningf( 1444 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen", 1445 r.id, cs, 1446 ) 1447 return false 1448 } 1449 1450 // Now go ahead and actually restore. 1451 1452 if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { 1453 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", 1454 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) 1455 r.raftLog.commitTo(s.Metadata.Index) 1456 return false 1457 } 1458 1459 r.raftLog.restore(s) 1460 1461 // Reset the configuration and add the (potentially updated) peers in anew. 1462 r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) 1463 cfg, prs, err := confchange.Restore(confchange.Changer{ 1464 Tracker: r.prs, 1465 LastIndex: r.raftLog.lastIndex(), 1466 }, cs) 1467 1468 if err != nil { 1469 // This should never happen. Either there's a bug in our config change 1470 // handling or the client corrupted the conf change. 1471 panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err)) 1472 } 1473 1474 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) 1475 1476 pr := r.prs.Progress[r.id] 1477 pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded 1478 1479 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", 1480 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) 1481 return true 1482} 1483 1484// promotable indicates whether state machine can be promoted to leader, 1485// which is true when its own id is in progress list. 1486func (r *raft) promotable() bool { 1487 pr := r.prs.Progress[r.id] 1488 return pr != nil && !pr.IsLearner 1489} 1490 1491func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { 1492 cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) { 1493 changer := confchange.Changer{ 1494 Tracker: r.prs, 1495 LastIndex: r.raftLog.lastIndex(), 1496 } 1497 if cc.LeaveJoint() { 1498 return changer.LeaveJoint() 1499 } else if autoLeave, ok := cc.EnterJoint(); ok { 1500 return changer.EnterJoint(autoLeave, cc.Changes...) 1501 } 1502 return changer.Simple(cc.Changes...) 1503 }() 1504 1505 if err != nil { 1506 // TODO(tbg): return the error to the caller. 1507 panic(err) 1508 } 1509 1510 return r.switchToConfig(cfg, prs) 1511} 1512 1513// switchToConfig reconfigures this node to use the provided configuration. It 1514// updates the in-memory state and, when necessary, carries out additional 1515// actions such as reacting to the removal of nodes or changed quorum 1516// requirements. 1517// 1518// The inputs usually result from restoring a ConfState or applying a ConfChange. 1519func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState { 1520 r.prs.Config = cfg 1521 r.prs.Progress = prs 1522 1523 r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) 1524 cs := r.prs.ConfState() 1525 pr, ok := r.prs.Progress[r.id] 1526 1527 // Update whether the node itself is a learner, resetting to false when the 1528 // node is removed. 1529 r.isLearner = ok && pr.IsLearner 1530 1531 if (!ok || r.isLearner) && r.state == StateLeader { 1532 // This node is leader and was removed or demoted. We prevent demotions 1533 // at the time writing but hypothetically we handle them the same way as 1534 // removing the leader: stepping down into the next Term. 1535 // 1536 // TODO(tbg): step down (for sanity) and ask follower with largest Match 1537 // to TimeoutNow (to avoid interruption). This might still drop some 1538 // proposals but it's better than nothing. 1539 // 1540 // TODO(tbg): test this branch. It is untested at the time of writing. 1541 return cs 1542 } 1543 1544 // The remaining steps only make sense if this node is the leader and there 1545 // are other nodes. 1546 if r.state != StateLeader || len(cs.Voters) == 0 { 1547 return cs 1548 } 1549 1550 if r.maybeCommit() { 1551 // If the configuration change means that more entries are committed now, 1552 // broadcast/append to everyone in the updated config. 1553 r.bcastAppend() 1554 } else { 1555 // Otherwise, still probe the newly added replicas; there's no reason to 1556 // let them wait out a heartbeat interval (or the next incoming 1557 // proposal). 1558 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 1559 r.maybeSendAppend(id, false /* sendIfEmpty */) 1560 }) 1561 } 1562 // If the the leadTransferee was removed or demoted, abort the leadership transfer. 1563 if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { 1564 r.abortLeaderTransfer() 1565 } 1566 1567 return cs 1568} 1569 1570func (r *raft) loadState(state pb.HardState) { 1571 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { 1572 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) 1573 } 1574 r.raftLog.committed = state.Commit 1575 r.Term = state.Term 1576 r.Vote = state.Vote 1577} 1578 1579// pastElectionTimeout returns true iff r.electionElapsed is greater 1580// than or equal to the randomized election timeout in 1581// [electiontimeout, 2 * electiontimeout - 1]. 1582func (r *raft) pastElectionTimeout() bool { 1583 return r.electionElapsed >= r.randomizedElectionTimeout 1584} 1585 1586func (r *raft) resetRandomizedElectionTimeout() { 1587 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) 1588} 1589 1590func (r *raft) sendTimeoutNow(to uint64) { 1591 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) 1592} 1593 1594func (r *raft) abortLeaderTransfer() { 1595 r.leadTransferee = None 1596} 1597 1598// committedEntryInCurrentTerm return true if the peer has committed an entry in its term. 1599func (r *raft) committedEntryInCurrentTerm() bool { 1600 return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term 1601} 1602 1603// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer 1604// itself, a blank value will be returned. 1605func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { 1606 if req.From == None || req.From == r.id { 1607 r.readStates = append(r.readStates, ReadState{ 1608 Index: readIndex, 1609 RequestCtx: req.Entries[0].Data, 1610 }) 1611 return pb.Message{} 1612 } 1613 return pb.Message{ 1614 Type: pb.MsgReadIndexResp, 1615 To: req.From, 1616 Index: readIndex, 1617 Entries: req.Entries, 1618 } 1619} 1620 1621// increaseUncommittedSize computes the size of the proposed entries and 1622// determines whether they would push leader over its maxUncommittedSize limit. 1623// If the new entries would exceed the limit, the method returns false. If not, 1624// the increase in uncommitted entry size is recorded and the method returns 1625// true. 1626// 1627// Empty payloads are never refused. This is used both for appending an empty 1628// entry at a new leader's term, as well as leaving a joint configuration. 1629func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { 1630 var s uint64 1631 for _, e := range ents { 1632 s += uint64(PayloadSize(e)) 1633 } 1634 1635 if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { 1636 // If the uncommitted tail of the Raft log is empty, allow any size 1637 // proposal. Otherwise, limit the size of the uncommitted tail of the 1638 // log and drop any proposal that would push the size over the limit. 1639 // Note the added requirement s>0 which is used to make sure that 1640 // appending single empty entries to the log always succeeds, used both 1641 // for replicating a new leader's initial empty entry, and for 1642 // auto-leaving joint configurations. 1643 return false 1644 } 1645 r.uncommittedSize += s 1646 return true 1647} 1648 1649// reduceUncommittedSize accounts for the newly committed entries by decreasing 1650// the uncommitted entry size limit. 1651func (r *raft) reduceUncommittedSize(ents []pb.Entry) { 1652 if r.uncommittedSize == 0 { 1653 // Fast-path for followers, who do not track or enforce the limit. 1654 return 1655 } 1656 1657 var s uint64 1658 for _, e := range ents { 1659 s += uint64(PayloadSize(e)) 1660 } 1661 if s > r.uncommittedSize { 1662 // uncommittedSize may underestimate the size of the uncommitted Raft 1663 // log tail but will never overestimate it. Saturate at 0 instead of 1664 // allowing overflow. 1665 r.uncommittedSize = 0 1666 } else { 1667 r.uncommittedSize -= s 1668 } 1669} 1670 1671func numOfPendingConf(ents []pb.Entry) int { 1672 n := 0 1673 for i := range ents { 1674 if ents[i].Type == pb.EntryConfChange { 1675 n++ 1676 } 1677 } 1678 return n 1679} 1680