1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import ( 18 "bytes" 19 "errors" 20 "fmt" 21 "math" 22 "math/rand" 23 "sort" 24 "strings" 25 "sync" 26 "time" 27 28 "go.etcd.io/etcd/raft/v3/confchange" 29 "go.etcd.io/etcd/raft/v3/quorum" 30 pb "go.etcd.io/etcd/raft/v3/raftpb" 31 "go.etcd.io/etcd/raft/v3/tracker" 32) 33 34// None is a placeholder node ID used when there is no leader. 35const None uint64 = 0 36const noLimit = math.MaxUint64 37 38// Possible values for StateType. 39const ( 40 StateFollower StateType = iota 41 StateCandidate 42 StateLeader 43 StatePreCandidate 44 numStates 45) 46 47type ReadOnlyOption int 48 49const ( 50 // ReadOnlySafe guarantees the linearizability of the read only request by 51 // communicating with the quorum. It is the default and suggested option. 52 ReadOnlySafe ReadOnlyOption = iota 53 // ReadOnlyLeaseBased ensures linearizability of the read only request by 54 // relying on the leader lease. It can be affected by clock drift. 55 // If the clock drift is unbounded, leader might keep the lease longer than it 56 // should (clock can move backward/pause without any bound). ReadIndex is not safe 57 // in that case. 58 ReadOnlyLeaseBased 59) 60 61// Possible values for CampaignType 62const ( 63 // campaignPreElection represents the first phase of a normal election when 64 // Config.PreVote is true. 65 campaignPreElection CampaignType = "CampaignPreElection" 66 // campaignElection represents a normal (time-based) election (the second phase 67 // of the election when Config.PreVote is true). 68 campaignElection CampaignType = "CampaignElection" 69 // campaignTransfer represents the type of leader transfer 70 campaignTransfer CampaignType = "CampaignTransfer" 71) 72 73// ErrProposalDropped is returned when the proposal is ignored by some cases, 74// so that the proposer can be notified and fail fast. 75var ErrProposalDropped = errors.New("raft proposal dropped") 76 77// lockedRand is a small wrapper around rand.Rand to provide 78// synchronization among multiple raft groups. Only the methods needed 79// by the code are exposed (e.g. Intn). 80type lockedRand struct { 81 mu sync.Mutex 82 rand *rand.Rand 83} 84 85func (r *lockedRand) Intn(n int) int { 86 r.mu.Lock() 87 v := r.rand.Intn(n) 88 r.mu.Unlock() 89 return v 90} 91 92var globalRand = &lockedRand{ 93 rand: rand.New(rand.NewSource(time.Now().UnixNano())), 94} 95 96// CampaignType represents the type of campaigning 97// the reason we use the type of string instead of uint64 98// is because it's simpler to compare and fill in raft entries 99type CampaignType string 100 101// StateType represents the role of a node in a cluster. 102type StateType uint64 103 104var stmap = [...]string{ 105 "StateFollower", 106 "StateCandidate", 107 "StateLeader", 108 "StatePreCandidate", 109} 110 111func (st StateType) String() string { 112 return stmap[uint64(st)] 113} 114 115// Config contains the parameters to start a raft. 116type Config struct { 117 // ID is the identity of the local raft. ID cannot be 0. 118 ID uint64 119 120 // ElectionTick is the number of Node.Tick invocations that must pass between 121 // elections. That is, if a follower does not receive any message from the 122 // leader of current term before ElectionTick has elapsed, it will become 123 // candidate and start an election. ElectionTick must be greater than 124 // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid 125 // unnecessary leader switching. 126 ElectionTick int 127 // HeartbeatTick is the number of Node.Tick invocations that must pass between 128 // heartbeats. That is, a leader sends heartbeat messages to maintain its 129 // leadership every HeartbeatTick ticks. 130 HeartbeatTick int 131 132 // Storage is the storage for raft. raft generates entries and states to be 133 // stored in storage. raft reads the persisted entries and states out of 134 // Storage when it needs. raft reads out the previous state and configuration 135 // out of storage when restarting. 136 Storage Storage 137 // Applied is the last applied index. It should only be set when restarting 138 // raft. raft will not return entries to the application smaller or equal to 139 // Applied. If Applied is unset when restarting, raft might return previous 140 // applied entries. This is a very application dependent configuration. 141 Applied uint64 142 143 // MaxSizePerMsg limits the max byte size of each append message. Smaller 144 // value lowers the raft recovery cost(initial probing and message lost 145 // during normal operation). On the other side, it might affect the 146 // throughput during normal replication. Note: math.MaxUint64 for unlimited, 147 // 0 for at most one entry per message. 148 MaxSizePerMsg uint64 149 // MaxCommittedSizePerReady limits the size of the committed entries which 150 // can be applied. 151 MaxCommittedSizePerReady uint64 152 // MaxUncommittedEntriesSize limits the aggregate byte size of the 153 // uncommitted entries that may be appended to a leader's log. Once this 154 // limit is exceeded, proposals will begin to return ErrProposalDropped 155 // errors. Note: 0 for no limit. 156 MaxUncommittedEntriesSize uint64 157 // MaxInflightMsgs limits the max number of in-flight append messages during 158 // optimistic replication phase. The application transportation layer usually 159 // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid 160 // overflowing that sending buffer. TODO (xiangli): feedback to application to 161 // limit the proposal rate? 162 MaxInflightMsgs int 163 164 // CheckQuorum specifies if the leader should check quorum activity. Leader 165 // steps down when quorum is not active for an electionTimeout. 166 CheckQuorum bool 167 168 // PreVote enables the Pre-Vote algorithm described in raft thesis section 169 // 9.6. This prevents disruption when a node that has been partitioned away 170 // rejoins the cluster. 171 PreVote bool 172 173 // ReadOnlyOption specifies how the read only request is processed. 174 // 175 // ReadOnlySafe guarantees the linearizability of the read only request by 176 // communicating with the quorum. It is the default and suggested option. 177 // 178 // ReadOnlyLeaseBased ensures linearizability of the read only request by 179 // relying on the leader lease. It can be affected by clock drift. 180 // If the clock drift is unbounded, leader might keep the lease longer than it 181 // should (clock can move backward/pause without any bound). ReadIndex is not safe 182 // in that case. 183 // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. 184 ReadOnlyOption ReadOnlyOption 185 186 // Logger is the logger used for raft log. For multinode which can host 187 // multiple raft group, each raft group can have its own logger 188 Logger Logger 189 190 // DisableProposalForwarding set to true means that followers will drop 191 // proposals, rather than forwarding them to the leader. One use case for 192 // this feature would be in a situation where the Raft leader is used to 193 // compute the data of a proposal, for example, adding a timestamp from a 194 // hybrid logical clock to data in a monotonically increasing way. Forwarding 195 // should be disabled to prevent a follower with an inaccurate hybrid 196 // logical clock from assigning the timestamp and then forwarding the data 197 // to the leader. 198 DisableProposalForwarding bool 199} 200 201func (c *Config) validate() error { 202 if c.ID == None { 203 return errors.New("cannot use none as id") 204 } 205 206 if c.HeartbeatTick <= 0 { 207 return errors.New("heartbeat tick must be greater than 0") 208 } 209 210 if c.ElectionTick <= c.HeartbeatTick { 211 return errors.New("election tick must be greater than heartbeat tick") 212 } 213 214 if c.Storage == nil { 215 return errors.New("storage cannot be nil") 216 } 217 218 if c.MaxUncommittedEntriesSize == 0 { 219 c.MaxUncommittedEntriesSize = noLimit 220 } 221 222 // default MaxCommittedSizePerReady to MaxSizePerMsg because they were 223 // previously the same parameter. 224 if c.MaxCommittedSizePerReady == 0 { 225 c.MaxCommittedSizePerReady = c.MaxSizePerMsg 226 } 227 228 if c.MaxInflightMsgs <= 0 { 229 return errors.New("max inflight messages must be greater than 0") 230 } 231 232 if c.Logger == nil { 233 c.Logger = raftLogger 234 } 235 236 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { 237 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") 238 } 239 240 return nil 241} 242 243type raft struct { 244 id uint64 245 246 Term uint64 247 Vote uint64 248 249 readStates []ReadState 250 251 // the log 252 raftLog *raftLog 253 254 maxMsgSize uint64 255 maxUncommittedSize uint64 256 // TODO(tbg): rename to trk. 257 prs tracker.ProgressTracker 258 259 state StateType 260 261 // isLearner is true if the local raft node is a learner. 262 isLearner bool 263 264 msgs []pb.Message 265 266 // the leader id 267 lead uint64 268 // leadTransferee is id of the leader transfer target when its value is not zero. 269 // Follow the procedure defined in raft thesis 3.10. 270 leadTransferee uint64 271 // Only one conf change may be pending (in the log, but not yet 272 // applied) at a time. This is enforced via pendingConfIndex, which 273 // is set to a value >= the log index of the latest pending 274 // configuration change (if any). Config changes are only allowed to 275 // be proposed if the leader's applied index is greater than this 276 // value. 277 pendingConfIndex uint64 278 // an estimate of the size of the uncommitted tail of the Raft log. Used to 279 // prevent unbounded log growth. Only maintained by the leader. Reset on 280 // term changes. 281 uncommittedSize uint64 282 283 readOnly *readOnly 284 285 // number of ticks since it reached last electionTimeout when it is leader 286 // or candidate. 287 // number of ticks since it reached last electionTimeout or received a 288 // valid message from current leader when it is a follower. 289 electionElapsed int 290 291 // number of ticks since it reached last heartbeatTimeout. 292 // only leader keeps heartbeatElapsed. 293 heartbeatElapsed int 294 295 checkQuorum bool 296 preVote bool 297 298 heartbeatTimeout int 299 electionTimeout int 300 // randomizedElectionTimeout is a random number between 301 // [electiontimeout, 2 * electiontimeout - 1]. It gets reset 302 // when raft changes its state to follower or candidate. 303 randomizedElectionTimeout int 304 disableProposalForwarding bool 305 306 tick func() 307 step stepFunc 308 309 logger Logger 310} 311 312func newRaft(c *Config) *raft { 313 if err := c.validate(); err != nil { 314 panic(err.Error()) 315 } 316 raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady) 317 hs, cs, err := c.Storage.InitialState() 318 if err != nil { 319 panic(err) // TODO(bdarnell) 320 } 321 322 r := &raft{ 323 id: c.ID, 324 lead: None, 325 isLearner: false, 326 raftLog: raftlog, 327 maxMsgSize: c.MaxSizePerMsg, 328 maxUncommittedSize: c.MaxUncommittedEntriesSize, 329 prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), 330 electionTimeout: c.ElectionTick, 331 heartbeatTimeout: c.HeartbeatTick, 332 logger: c.Logger, 333 checkQuorum: c.CheckQuorum, 334 preVote: c.PreVote, 335 readOnly: newReadOnly(c.ReadOnlyOption), 336 disableProposalForwarding: c.DisableProposalForwarding, 337 } 338 339 cfg, prs, err := confchange.Restore(confchange.Changer{ 340 Tracker: r.prs, 341 LastIndex: raftlog.lastIndex(), 342 }, cs) 343 if err != nil { 344 panic(err) 345 } 346 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) 347 348 if !IsEmptyHardState(hs) { 349 r.loadState(hs) 350 } 351 if c.Applied > 0 { 352 raftlog.appliedTo(c.Applied) 353 } 354 r.becomeFollower(r.Term, None) 355 356 var nodesStrs []string 357 for _, n := range r.prs.VoterNodes() { 358 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) 359 } 360 361 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", 362 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) 363 return r 364} 365 366func (r *raft) hasLeader() bool { return r.lead != None } 367 368func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} } 369 370func (r *raft) hardState() pb.HardState { 371 return pb.HardState{ 372 Term: r.Term, 373 Vote: r.Vote, 374 Commit: r.raftLog.committed, 375 } 376} 377 378// send schedules persisting state to a stable storage and AFTER that 379// sending the message (as part of next Ready message processing). 380func (r *raft) send(m pb.Message) { 381 if m.From == None { 382 m.From = r.id 383 } 384 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { 385 if m.Term == 0 { 386 // All {pre-,}campaign messages need to have the term set when 387 // sending. 388 // - MsgVote: m.Term is the term the node is campaigning for, 389 // non-zero as we increment the term when campaigning. 390 // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was 391 // granted, non-zero for the same reason MsgVote is 392 // - MsgPreVote: m.Term is the term the node will campaign, 393 // non-zero as we use m.Term to indicate the next term we'll be 394 // campaigning for 395 // - MsgPreVoteResp: m.Term is the term received in the original 396 // MsgPreVote if the pre-vote was granted, non-zero for the 397 // same reasons MsgPreVote is 398 panic(fmt.Sprintf("term should be set when sending %s", m.Type)) 399 } 400 } else { 401 if m.Term != 0 { 402 panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) 403 } 404 // do not attach term to MsgProp, MsgReadIndex 405 // proposals are a way to forward to the leader and 406 // should be treated as local message. 407 // MsgReadIndex is also forwarded to leader. 408 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { 409 m.Term = r.Term 410 } 411 } 412 r.msgs = append(r.msgs, m) 413} 414 415// sendAppend sends an append RPC with new entries (if any) and the 416// current commit index to the given peer. 417func (r *raft) sendAppend(to uint64) { 418 r.maybeSendAppend(to, true) 419} 420 421// maybeSendAppend sends an append RPC with new entries to the given peer, 422// if necessary. Returns true if a message was sent. The sendIfEmpty 423// argument controls whether messages with no entries will be sent 424// ("empty" messages are useful to convey updated Commit indexes, but 425// are undesirable when we're sending multiple messages in a batch). 426func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { 427 pr := r.prs.Progress[to] 428 if pr.IsPaused() { 429 return false 430 } 431 m := pb.Message{} 432 m.To = to 433 434 term, errt := r.raftLog.term(pr.Next - 1) 435 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) 436 if len(ents) == 0 && !sendIfEmpty { 437 return false 438 } 439 440 if errt != nil || erre != nil { // send snapshot if we failed to get term or entries 441 if !pr.RecentActive { 442 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) 443 return false 444 } 445 446 m.Type = pb.MsgSnap 447 snapshot, err := r.raftLog.snapshot() 448 if err != nil { 449 if err == ErrSnapshotTemporarilyUnavailable { 450 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) 451 return false 452 } 453 panic(err) // TODO(bdarnell) 454 } 455 if IsEmptySnap(snapshot) { 456 panic("need non-empty snapshot") 457 } 458 m.Snapshot = snapshot 459 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term 460 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", 461 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) 462 pr.BecomeSnapshot(sindex) 463 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) 464 } else { 465 m.Type = pb.MsgApp 466 m.Index = pr.Next - 1 467 m.LogTerm = term 468 m.Entries = ents 469 m.Commit = r.raftLog.committed 470 if n := len(m.Entries); n != 0 { 471 switch pr.State { 472 // optimistically increase the next when in StateReplicate 473 case tracker.StateReplicate: 474 last := m.Entries[n-1].Index 475 pr.OptimisticUpdate(last) 476 pr.Inflights.Add(last) 477 case tracker.StateProbe: 478 pr.ProbeSent = true 479 default: 480 r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) 481 } 482 } 483 } 484 r.send(m) 485 return true 486} 487 488// sendHeartbeat sends a heartbeat RPC to the given peer. 489func (r *raft) sendHeartbeat(to uint64, ctx []byte) { 490 // Attach the commit as min(to.matched, r.committed). 491 // When the leader sends out heartbeat message, 492 // the receiver(follower) might not be matched with the leader 493 // or it might not have all the committed entries. 494 // The leader MUST NOT forward the follower's commit to 495 // an unmatched index. 496 commit := min(r.prs.Progress[to].Match, r.raftLog.committed) 497 m := pb.Message{ 498 To: to, 499 Type: pb.MsgHeartbeat, 500 Commit: commit, 501 Context: ctx, 502 } 503 504 r.send(m) 505} 506 507// bcastAppend sends RPC, with entries to all peers that are not up-to-date 508// according to the progress recorded in r.prs. 509func (r *raft) bcastAppend() { 510 r.prs.Visit(func(id uint64, _ *tracker.Progress) { 511 if id == r.id { 512 return 513 } 514 r.sendAppend(id) 515 }) 516} 517 518// bcastHeartbeat sends RPC, without entries to all the peers. 519func (r *raft) bcastHeartbeat() { 520 lastCtx := r.readOnly.lastPendingRequestCtx() 521 if len(lastCtx) == 0 { 522 r.bcastHeartbeatWithCtx(nil) 523 } else { 524 r.bcastHeartbeatWithCtx([]byte(lastCtx)) 525 } 526} 527 528func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { 529 r.prs.Visit(func(id uint64, _ *tracker.Progress) { 530 if id == r.id { 531 return 532 } 533 r.sendHeartbeat(id, ctx) 534 }) 535} 536 537func (r *raft) advance(rd Ready) { 538 r.reduceUncommittedSize(rd.CommittedEntries) 539 540 // If entries were applied (or a snapshot), update our cursor for 541 // the next Ready. Note that if the current HardState contains a 542 // new Commit index, this does not mean that we're also applying 543 // all of the new entries due to commit pagination by size. 544 if newApplied := rd.appliedCursor(); newApplied > 0 { 545 oldApplied := r.raftLog.applied 546 r.raftLog.appliedTo(newApplied) 547 548 if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader { 549 // If the current (and most recent, at least for this leader's term) 550 // configuration should be auto-left, initiate that now. We use a 551 // nil Data which unmarshals into an empty ConfChangeV2 and has the 552 // benefit that appendEntry can never refuse it based on its size 553 // (which registers as zero). 554 ent := pb.Entry{ 555 Type: pb.EntryConfChangeV2, 556 Data: nil, 557 } 558 // There's no way in which this proposal should be able to be rejected. 559 if !r.appendEntry(ent) { 560 panic("refused un-refusable auto-leaving ConfChangeV2") 561 } 562 r.pendingConfIndex = r.raftLog.lastIndex() 563 r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config) 564 } 565 } 566 567 if len(rd.Entries) > 0 { 568 e := rd.Entries[len(rd.Entries)-1] 569 r.raftLog.stableTo(e.Index, e.Term) 570 } 571 if !IsEmptySnap(rd.Snapshot) { 572 r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) 573 } 574} 575 576// maybeCommit attempts to advance the commit index. Returns true if 577// the commit index changed (in which case the caller should call 578// r.bcastAppend). 579func (r *raft) maybeCommit() bool { 580 mci := r.prs.Committed() 581 return r.raftLog.maybeCommit(mci, r.Term) 582} 583 584func (r *raft) reset(term uint64) { 585 if r.Term != term { 586 r.Term = term 587 r.Vote = None 588 } 589 r.lead = None 590 591 r.electionElapsed = 0 592 r.heartbeatElapsed = 0 593 r.resetRandomizedElectionTimeout() 594 595 r.abortLeaderTransfer() 596 597 r.prs.ResetVotes() 598 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 599 *pr = tracker.Progress{ 600 Match: 0, 601 Next: r.raftLog.lastIndex() + 1, 602 Inflights: tracker.NewInflights(r.prs.MaxInflight), 603 IsLearner: pr.IsLearner, 604 } 605 if id == r.id { 606 pr.Match = r.raftLog.lastIndex() 607 } 608 }) 609 610 r.pendingConfIndex = 0 611 r.uncommittedSize = 0 612 r.readOnly = newReadOnly(r.readOnly.option) 613} 614 615func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { 616 li := r.raftLog.lastIndex() 617 for i := range es { 618 es[i].Term = r.Term 619 es[i].Index = li + 1 + uint64(i) 620 } 621 // Track the size of this uncommitted proposal. 622 if !r.increaseUncommittedSize(es) { 623 r.logger.Debugf( 624 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", 625 r.id, 626 ) 627 // Drop the proposal. 628 return false 629 } 630 // use latest "last" index after truncate/append 631 li = r.raftLog.append(es...) 632 r.prs.Progress[r.id].MaybeUpdate(li) 633 // Regardless of maybeCommit's return, our caller will call bcastAppend. 634 r.maybeCommit() 635 return true 636} 637 638// tickElection is run by followers and candidates after r.electionTimeout. 639func (r *raft) tickElection() { 640 r.electionElapsed++ 641 642 if r.promotable() && r.pastElectionTimeout() { 643 r.electionElapsed = 0 644 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) 645 } 646} 647 648// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. 649func (r *raft) tickHeartbeat() { 650 r.heartbeatElapsed++ 651 r.electionElapsed++ 652 653 if r.electionElapsed >= r.electionTimeout { 654 r.electionElapsed = 0 655 if r.checkQuorum { 656 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) 657 } 658 // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. 659 if r.state == StateLeader && r.leadTransferee != None { 660 r.abortLeaderTransfer() 661 } 662 } 663 664 if r.state != StateLeader { 665 return 666 } 667 668 if r.heartbeatElapsed >= r.heartbeatTimeout { 669 r.heartbeatElapsed = 0 670 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) 671 } 672} 673 674func (r *raft) becomeFollower(term uint64, lead uint64) { 675 r.step = stepFollower 676 r.reset(term) 677 r.tick = r.tickElection 678 r.lead = lead 679 r.state = StateFollower 680 r.logger.Infof("%x became follower at term %d", r.id, r.Term) 681} 682 683func (r *raft) becomeCandidate() { 684 // TODO(xiangli) remove the panic when the raft implementation is stable 685 if r.state == StateLeader { 686 panic("invalid transition [leader -> candidate]") 687 } 688 r.step = stepCandidate 689 r.reset(r.Term + 1) 690 r.tick = r.tickElection 691 r.Vote = r.id 692 r.state = StateCandidate 693 r.logger.Infof("%x became candidate at term %d", r.id, r.Term) 694} 695 696func (r *raft) becomePreCandidate() { 697 // TODO(xiangli) remove the panic when the raft implementation is stable 698 if r.state == StateLeader { 699 panic("invalid transition [leader -> pre-candidate]") 700 } 701 // Becoming a pre-candidate changes our step functions and state, 702 // but doesn't change anything else. In particular it does not increase 703 // r.Term or change r.Vote. 704 r.step = stepCandidate 705 r.prs.ResetVotes() 706 r.tick = r.tickElection 707 r.lead = None 708 r.state = StatePreCandidate 709 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) 710} 711 712func (r *raft) becomeLeader() { 713 // TODO(xiangli) remove the panic when the raft implementation is stable 714 if r.state == StateFollower { 715 panic("invalid transition [follower -> leader]") 716 } 717 r.step = stepLeader 718 r.reset(r.Term) 719 r.tick = r.tickHeartbeat 720 r.lead = r.id 721 r.state = StateLeader 722 // Followers enter replicate mode when they've been successfully probed 723 // (perhaps after having received a snapshot as a result). The leader is 724 // trivially in this state. Note that r.reset() has initialized this 725 // progress with the last index already. 726 r.prs.Progress[r.id].BecomeReplicate() 727 728 // Conservatively set the pendingConfIndex to the last index in the 729 // log. There may or may not be a pending config change, but it's 730 // safe to delay any future proposals until we commit all our 731 // pending log entries, and scanning the entire tail of the log 732 // could be expensive. 733 r.pendingConfIndex = r.raftLog.lastIndex() 734 735 emptyEnt := pb.Entry{Data: nil} 736 if !r.appendEntry(emptyEnt) { 737 // This won't happen because we just called reset() above. 738 r.logger.Panic("empty entry was dropped") 739 } 740 // As a special case, don't count the initial empty entry towards the 741 // uncommitted log quota. This is because we want to preserve the 742 // behavior of allowing one entry larger than quota if the current 743 // usage is zero. 744 r.reduceUncommittedSize([]pb.Entry{emptyEnt}) 745 r.logger.Infof("%x became leader at term %d", r.id, r.Term) 746} 747 748func (r *raft) hup(t CampaignType) { 749 if r.state == StateLeader { 750 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) 751 return 752 } 753 754 if !r.promotable() { 755 r.logger.Warningf("%x is unpromotable and can not campaign", r.id) 756 return 757 } 758 ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) 759 if err != nil { 760 r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) 761 } 762 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { 763 r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) 764 return 765 } 766 767 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) 768 r.campaign(t) 769} 770 771// campaign transitions the raft instance to candidate state. This must only be 772// called after verifying that this is a legitimate transition. 773func (r *raft) campaign(t CampaignType) { 774 if !r.promotable() { 775 // This path should not be hit (callers are supposed to check), but 776 // better safe than sorry. 777 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id) 778 } 779 var term uint64 780 var voteMsg pb.MessageType 781 if t == campaignPreElection { 782 r.becomePreCandidate() 783 voteMsg = pb.MsgPreVote 784 // PreVote RPCs are sent for the next term before we've incremented r.Term. 785 term = r.Term + 1 786 } else { 787 r.becomeCandidate() 788 voteMsg = pb.MsgVote 789 term = r.Term 790 } 791 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { 792 // We won the election after voting for ourselves (which must mean that 793 // this is a single-node cluster). Advance to the next state. 794 if t == campaignPreElection { 795 r.campaign(campaignElection) 796 } else { 797 r.becomeLeader() 798 } 799 return 800 } 801 var ids []uint64 802 { 803 idMap := r.prs.Voters.IDs() 804 ids = make([]uint64, 0, len(idMap)) 805 for id := range idMap { 806 ids = append(ids, id) 807 } 808 sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) 809 } 810 for _, id := range ids { 811 if id == r.id { 812 continue 813 } 814 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", 815 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) 816 817 var ctx []byte 818 if t == campaignTransfer { 819 ctx = []byte(t) 820 } 821 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) 822 } 823} 824 825func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { 826 if v { 827 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) 828 } else { 829 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) 830 } 831 r.prs.RecordVote(id, v) 832 return r.prs.TallyVotes() 833} 834 835func (r *raft) Step(m pb.Message) error { 836 // Handle the message term, which may result in our stepping down to a follower. 837 switch { 838 case m.Term == 0: 839 // local message 840 case m.Term > r.Term: 841 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { 842 force := bytes.Equal(m.Context, []byte(campaignTransfer)) 843 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout 844 if !force && inLease { 845 // If a server receives a RequestVote request within the minimum election timeout 846 // of hearing from a current leader, it does not update its term or grant its vote 847 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", 848 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) 849 return nil 850 } 851 } 852 switch { 853 case m.Type == pb.MsgPreVote: 854 // Never change our term in response to a PreVote 855 case m.Type == pb.MsgPreVoteResp && !m.Reject: 856 // We send pre-vote requests with a term in our future. If the 857 // pre-vote is granted, we will increment our term when we get a 858 // quorum. If it is not, the term comes from the node that 859 // rejected our vote so we should become a follower at the new 860 // term. 861 default: 862 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", 863 r.id, r.Term, m.Type, m.From, m.Term) 864 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { 865 r.becomeFollower(m.Term, m.From) 866 } else { 867 r.becomeFollower(m.Term, None) 868 } 869 } 870 871 case m.Term < r.Term: 872 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { 873 // We have received messages from a leader at a lower term. It is possible 874 // that these messages were simply delayed in the network, but this could 875 // also mean that this node has advanced its term number during a network 876 // partition, and it is now unable to either win an election or to rejoin 877 // the majority on the old term. If checkQuorum is false, this will be 878 // handled by incrementing term numbers in response to MsgVote with a 879 // higher term, but if checkQuorum is true we may not advance the term on 880 // MsgVote and must generate other messages to advance the term. The net 881 // result of these two features is to minimize the disruption caused by 882 // nodes that have been removed from the cluster's configuration: a 883 // removed node will send MsgVotes (or MsgPreVotes) which will be ignored, 884 // but it will not receive MsgApp or MsgHeartbeat, so it will not create 885 // disruptive term increases, by notifying leader of this node's activeness. 886 // The above comments also true for Pre-Vote 887 // 888 // When follower gets isolated, it soon starts an election ending 889 // up with a higher term than leader, although it won't receive enough 890 // votes to win the election. When it regains connectivity, this response 891 // with "pb.MsgAppResp" of higher term would force leader to step down. 892 // However, this disruption is inevitable to free this stuck node with 893 // fresh election. This can be prevented with Pre-Vote phase. 894 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) 895 } else if m.Type == pb.MsgPreVote { 896 // Before Pre-Vote enable, there may have candidate with higher term, 897 // but less log. After update to Pre-Vote, the cluster may deadlock if 898 // we drop messages with a lower term. 899 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", 900 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 901 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) 902 } else { 903 // ignore other cases 904 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", 905 r.id, r.Term, m.Type, m.From, m.Term) 906 } 907 return nil 908 } 909 910 switch m.Type { 911 case pb.MsgHup: 912 if r.preVote { 913 r.hup(campaignPreElection) 914 } else { 915 r.hup(campaignElection) 916 } 917 918 case pb.MsgVote, pb.MsgPreVote: 919 // We can vote if this is a repeat of a vote we've already cast... 920 canVote := r.Vote == m.From || 921 // ...we haven't voted and we don't think there's a leader yet in this term... 922 (r.Vote == None && r.lead == None) || 923 // ...or this is a PreVote for a future term... 924 (m.Type == pb.MsgPreVote && m.Term > r.Term) 925 // ...and we believe the candidate is up to date. 926 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { 927 // Note: it turns out that that learners must be allowed to cast votes. 928 // This seems counter- intuitive but is necessary in the situation in which 929 // a learner has been promoted (i.e. is now a voter) but has not learned 930 // about this yet. 931 // For example, consider a group in which id=1 is a learner and id=2 and 932 // id=3 are voters. A configuration change promoting 1 can be committed on 933 // the quorum `{2,3}` without the config change being appended to the 934 // learner's log. If the leader (say 2) fails, there are de facto two 935 // voters remaining. Only 3 can win an election (due to its log containing 936 // all committed entries), but to do so it will need 1 to vote. But 1 937 // considers itself a learner and will continue to do so until 3 has 938 // stepped up as leader, replicates the conf change to 1, and 1 applies it. 939 // Ultimately, by receiving a request to vote, the learner realizes that 940 // the candidate believes it to be a voter, and that it should act 941 // accordingly. The candidate's config may be stale, too; but in that case 942 // it won't win the election, at least in the absence of the bug discussed 943 // in: 944 // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263. 945 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", 946 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 947 // When responding to Msg{Pre,}Vote messages we include the term 948 // from the message, not the local term. To see why, consider the 949 // case where a single node was previously partitioned away and 950 // it's local term is now out of date. If we include the local term 951 // (recall that for pre-votes we don't update the local term), the 952 // (pre-)campaigning node on the other end will proceed to ignore 953 // the message (it ignores all out of date messages). 954 // The term in the original message and current local term are the 955 // same in the case of regular votes, but different for pre-votes. 956 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) 957 if m.Type == pb.MsgVote { 958 // Only record real votes. 959 r.electionElapsed = 0 960 r.Vote = m.From 961 } 962 } else { 963 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", 964 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) 965 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) 966 } 967 968 default: 969 err := r.step(r, m) 970 if err != nil { 971 return err 972 } 973 } 974 return nil 975} 976 977type stepFunc func(r *raft, m pb.Message) error 978 979func stepLeader(r *raft, m pb.Message) error { 980 // These message types do not require any progress for m.From. 981 switch m.Type { 982 case pb.MsgBeat: 983 r.bcastHeartbeat() 984 return nil 985 case pb.MsgCheckQuorum: 986 // The leader should always see itself as active. As a precaution, handle 987 // the case in which the leader isn't in the configuration any more (for 988 // example if it just removed itself). 989 // 990 // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the 991 // leader steps down when removing itself. I might be missing something. 992 if pr := r.prs.Progress[r.id]; pr != nil { 993 pr.RecentActive = true 994 } 995 if !r.prs.QuorumActive() { 996 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) 997 r.becomeFollower(r.Term, None) 998 } 999 // Mark everyone (but ourselves) as inactive in preparation for the next 1000 // CheckQuorum. 1001 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 1002 if id != r.id { 1003 pr.RecentActive = false 1004 } 1005 }) 1006 return nil 1007 case pb.MsgProp: 1008 if len(m.Entries) == 0 { 1009 r.logger.Panicf("%x stepped empty MsgProp", r.id) 1010 } 1011 if r.prs.Progress[r.id] == nil { 1012 // If we are not currently a member of the range (i.e. this node 1013 // was removed from the configuration while serving as leader), 1014 // drop any new proposals. 1015 return ErrProposalDropped 1016 } 1017 if r.leadTransferee != None { 1018 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) 1019 return ErrProposalDropped 1020 } 1021 1022 for i := range m.Entries { 1023 e := &m.Entries[i] 1024 var cc pb.ConfChangeI 1025 if e.Type == pb.EntryConfChange { 1026 var ccc pb.ConfChange 1027 if err := ccc.Unmarshal(e.Data); err != nil { 1028 panic(err) 1029 } 1030 cc = ccc 1031 } else if e.Type == pb.EntryConfChangeV2 { 1032 var ccc pb.ConfChangeV2 1033 if err := ccc.Unmarshal(e.Data); err != nil { 1034 panic(err) 1035 } 1036 cc = ccc 1037 } 1038 if cc != nil { 1039 alreadyPending := r.pendingConfIndex > r.raftLog.applied 1040 alreadyJoint := len(r.prs.Config.Voters[1]) > 0 1041 wantsLeaveJoint := len(cc.AsV2().Changes) == 0 1042 1043 var refused string 1044 if alreadyPending { 1045 refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) 1046 } else if alreadyJoint && !wantsLeaveJoint { 1047 refused = "must transition out of joint config first" 1048 } else if !alreadyJoint && wantsLeaveJoint { 1049 refused = "not in joint state; refusing empty conf change" 1050 } 1051 1052 if refused != "" { 1053 r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) 1054 m.Entries[i] = pb.Entry{Type: pb.EntryNormal} 1055 } else { 1056 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 1057 } 1058 } 1059 } 1060 1061 if !r.appendEntry(m.Entries...) { 1062 return ErrProposalDropped 1063 } 1064 r.bcastAppend() 1065 return nil 1066 case pb.MsgReadIndex: 1067 // only one voting member (the leader) in the cluster 1068 if r.prs.IsSingleton() { 1069 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { 1070 r.send(resp) 1071 } 1072 return nil 1073 } 1074 1075 // Reject read only request when this leader has not committed any log entry at its term. 1076 if !r.committedEntryInCurrentTerm() { 1077 return nil 1078 } 1079 1080 // thinking: use an interally defined context instead of the user given context. 1081 // We can express this in terms of the term and index instead of a user-supplied value. 1082 // This would allow multiple reads to piggyback on the same message. 1083 switch r.readOnly.option { 1084 // If more than the local vote is needed, go through a full broadcast. 1085 case ReadOnlySafe: 1086 r.readOnly.addRequest(r.raftLog.committed, m) 1087 // The local node automatically acks the request. 1088 r.readOnly.recvAck(r.id, m.Entries[0].Data) 1089 r.bcastHeartbeatWithCtx(m.Entries[0].Data) 1090 case ReadOnlyLeaseBased: 1091 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None { 1092 r.send(resp) 1093 } 1094 } 1095 return nil 1096 } 1097 1098 // All other message types require a progress for m.From (pr). 1099 pr := r.prs.Progress[m.From] 1100 if pr == nil { 1101 r.logger.Debugf("%x no progress available for %x", r.id, m.From) 1102 return nil 1103 } 1104 switch m.Type { 1105 case pb.MsgAppResp: 1106 pr.RecentActive = true 1107 1108 if m.Reject { 1109 // RejectHint is the suggested next base entry for appending (i.e. 1110 // we try to append entry RejectHint+1 next), and LogTerm is the 1111 // term that the follower has at index RejectHint. Older versions 1112 // of this library did not populate LogTerm for rejections and it 1113 // is zero for followers with an empty log. 1114 // 1115 // Under normal circumstances, the leader's log is longer than the 1116 // follower's and the follower's log is a prefix of the leader's 1117 // (i.e. there is no divergent uncommitted suffix of the log on the 1118 // follower). In that case, the first probe reveals where the 1119 // follower's log ends (RejectHint=follower's last index) and the 1120 // subsequent probe succeeds. 1121 // 1122 // However, when networks are partitioned or systems overloaded, 1123 // large divergent log tails can occur. The naive attempt, probing 1124 // entry by entry in decreasing order, will be the product of the 1125 // length of the diverging tails and the network round-trip latency, 1126 // which can easily result in hours of time spent probing and can 1127 // even cause outright outages. The probes are thus optimized as 1128 // described below. 1129 r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d", 1130 r.id, m.RejectHint, m.LogTerm, m.From, m.Index) 1131 nextProbeIdx := m.RejectHint 1132 if m.LogTerm > 0 { 1133 // If the follower has an uncommitted log tail, we would end up 1134 // probing one by one until we hit the common prefix. 1135 // 1136 // For example, if the leader has: 1137 // 1138 // idx 1 2 3 4 5 6 7 8 9 1139 // ----------------- 1140 // term (L) 1 3 3 3 5 5 5 5 5 1141 // term (F) 1 1 1 1 2 2 1142 // 1143 // Then, after sending an append anchored at (idx=9,term=5) we 1144 // would receive a RejectHint of 6 and LogTerm of 2. Without the 1145 // code below, we would try an append at index 6, which would 1146 // fail again. 1147 // 1148 // However, looking only at what the leader knows about its own 1149 // log and the rejection hint, it is clear that a probe at index 1150 // 6, 5, 4, 3, and 2 must fail as well: 1151 // 1152 // For all of these indexes, the leader's log term is larger than 1153 // the rejection's log term. If a probe at one of these indexes 1154 // succeeded, its log term at that index would match the leader's, 1155 // i.e. 3 or 5 in this example. But the follower already told the 1156 // leader that it is still at term 2 at index 9, and since the 1157 // log term only ever goes up (within a log), this is a contradiction. 1158 // 1159 // At index 1, however, the leader can draw no such conclusion, 1160 // as its term 1 is not larger than the term 2 from the 1161 // follower's rejection. We thus probe at 1, which will succeed 1162 // in this example. In general, with this approach we probe at 1163 // most once per term found in the leader's log. 1164 // 1165 // There is a similar mechanism on the follower (implemented in 1166 // handleAppendEntries via a call to findConflictByTerm) that is 1167 // useful if the follower has a large divergent uncommitted log 1168 // tail[1], as in this example: 1169 // 1170 // idx 1 2 3 4 5 6 7 8 9 1171 // ----------------- 1172 // term (L) 1 3 3 3 3 3 3 3 7 1173 // term (F) 1 3 3 4 4 5 5 5 6 1174 // 1175 // Naively, the leader would probe at idx=9, receive a rejection 1176 // revealing the log term of 6 at the follower. Since the leader's 1177 // term at the previous index is already smaller than 6, the leader- 1178 // side optimization discussed above is ineffective. The leader thus 1179 // probes at index 8 and, naively, receives a rejection for the same 1180 // index and log term 5. Again, the leader optimization does not improve 1181 // over linear probing as term 5 is above the leader's term 3 for that 1182 // and many preceding indexes; the leader would have to probe linearly 1183 // until it would finally hit index 3, where the probe would succeed. 1184 // 1185 // Instead, we apply a similar optimization on the follower. When the 1186 // follower receives the probe at index 8 (log term 3), it concludes 1187 // that all of the leader's log preceding that index has log terms of 1188 // 3 or below. The largest index in the follower's log with a log term 1189 // of 3 or below is index 3. The follower will thus return a rejection 1190 // for index=3, log term=3 instead. The leader's next probe will then 1191 // succeed at that index. 1192 // 1193 // [1]: more precisely, if the log terms in the large uncommitted 1194 // tail on the follower are larger than the leader's. At first, 1195 // it may seem unintuitive that a follower could even have such 1196 // a large tail, but it can happen: 1197 // 1198 // 1. Leader appends (but does not commit) entries 2 and 3, crashes. 1199 // idx 1 2 3 4 5 6 7 8 9 1200 // ----------------- 1201 // term (L) 1 2 2 [crashes] 1202 // term (F) 1 1203 // term (F) 1 1204 // 1205 // 2. a follower becomes leader and appends entries at term 3. 1206 // ----------------- 1207 // term (x) 1 2 2 [down] 1208 // term (F) 1 3 3 3 3 1209 // term (F) 1 1210 // 1211 // 3. term 3 leader goes down, term 2 leader returns as term 4 1212 // leader. It commits the log & entries at term 4. 1213 // 1214 // ----------------- 1215 // term (L) 1 2 2 2 1216 // term (x) 1 3 3 3 3 [down] 1217 // term (F) 1 1218 // ----------------- 1219 // term (L) 1 2 2 2 4 4 4 1220 // term (F) 1 3 3 3 3 [gets probed] 1221 // term (F) 1 2 2 2 4 4 4 1222 // 1223 // 4. the leader will now probe the returning follower at index 1224 // 7, the rejection points it at the end of the follower's log 1225 // which is at a higher log term than the actually committed 1226 // log. 1227 nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) 1228 } 1229 if pr.MaybeDecrTo(m.Index, nextProbeIdx) { 1230 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) 1231 if pr.State == tracker.StateReplicate { 1232 pr.BecomeProbe() 1233 } 1234 r.sendAppend(m.From) 1235 } 1236 } else { 1237 oldPaused := pr.IsPaused() 1238 if pr.MaybeUpdate(m.Index) { 1239 switch { 1240 case pr.State == tracker.StateProbe: 1241 pr.BecomeReplicate() 1242 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: 1243 // TODO(tbg): we should also enter this branch if a snapshot is 1244 // received that is below pr.PendingSnapshot but which makes it 1245 // possible to use the log again. 1246 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1247 // Transition back to replicating state via probing state 1248 // (which takes the snapshot into account). If we didn't 1249 // move to replicating state, that would only happen with 1250 // the next round of appends (but there may not be a next 1251 // round for a while, exposing an inconsistent RaftStatus). 1252 pr.BecomeProbe() 1253 pr.BecomeReplicate() 1254 case pr.State == tracker.StateReplicate: 1255 pr.Inflights.FreeLE(m.Index) 1256 } 1257 1258 if r.maybeCommit() { 1259 r.bcastAppend() 1260 } else if oldPaused { 1261 // If we were paused before, this node may be missing the 1262 // latest commit index, so send it. 1263 r.sendAppend(m.From) 1264 } 1265 // We've updated flow control information above, which may 1266 // allow us to send multiple (size-limited) in-flight messages 1267 // at once (such as when transitioning from probe to 1268 // replicate, or when freeTo() covers multiple messages). If 1269 // we have more entries to send, send as many messages as we 1270 // can (without sending empty messages for the commit index) 1271 for r.maybeSendAppend(m.From, false) { 1272 } 1273 // Transfer leadership is in progress. 1274 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { 1275 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) 1276 r.sendTimeoutNow(m.From) 1277 } 1278 } 1279 } 1280 case pb.MsgHeartbeatResp: 1281 pr.RecentActive = true 1282 pr.ProbeSent = false 1283 1284 // free one slot for the full inflights window to allow progress. 1285 if pr.State == tracker.StateReplicate && pr.Inflights.Full() { 1286 pr.Inflights.FreeFirstOne() 1287 } 1288 if pr.Match < r.raftLog.lastIndex() { 1289 r.sendAppend(m.From) 1290 } 1291 1292 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { 1293 return nil 1294 } 1295 1296 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { 1297 return nil 1298 } 1299 1300 rss := r.readOnly.advance(m) 1301 for _, rs := range rss { 1302 if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None { 1303 r.send(resp) 1304 } 1305 } 1306 case pb.MsgSnapStatus: 1307 if pr.State != tracker.StateSnapshot { 1308 return nil 1309 } 1310 // TODO(tbg): this code is very similar to the snapshot handling in 1311 // MsgAppResp above. In fact, the code there is more correct than the 1312 // code here and should likely be updated to match (or even better, the 1313 // logic pulled into a newly created Progress state machine handler). 1314 if !m.Reject { 1315 pr.BecomeProbe() 1316 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1317 } else { 1318 // NB: the order here matters or we'll be probing erroneously from 1319 // the snapshot index, but the snapshot never applied. 1320 pr.PendingSnapshot = 0 1321 pr.BecomeProbe() 1322 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) 1323 } 1324 // If snapshot finish, wait for the MsgAppResp from the remote node before sending 1325 // out the next MsgApp. 1326 // If snapshot failure, wait for a heartbeat interval before next try 1327 pr.ProbeSent = true 1328 case pb.MsgUnreachable: 1329 // During optimistic replication, if the remote becomes unreachable, 1330 // there is huge probability that a MsgApp is lost. 1331 if pr.State == tracker.StateReplicate { 1332 pr.BecomeProbe() 1333 } 1334 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) 1335 case pb.MsgTransferLeader: 1336 if pr.IsLearner { 1337 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) 1338 return nil 1339 } 1340 leadTransferee := m.From 1341 lastLeadTransferee := r.leadTransferee 1342 if lastLeadTransferee != None { 1343 if lastLeadTransferee == leadTransferee { 1344 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", 1345 r.id, r.Term, leadTransferee, leadTransferee) 1346 return nil 1347 } 1348 r.abortLeaderTransfer() 1349 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) 1350 } 1351 if leadTransferee == r.id { 1352 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) 1353 return nil 1354 } 1355 // Transfer leadership to third party. 1356 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) 1357 // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed. 1358 r.electionElapsed = 0 1359 r.leadTransferee = leadTransferee 1360 if pr.Match == r.raftLog.lastIndex() { 1361 r.sendTimeoutNow(leadTransferee) 1362 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) 1363 } else { 1364 r.sendAppend(leadTransferee) 1365 } 1366 } 1367 return nil 1368} 1369 1370// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is 1371// whether they respond to MsgVoteResp or MsgPreVoteResp. 1372func stepCandidate(r *raft, m pb.Message) error { 1373 // Only handle vote responses corresponding to our candidacy (while in 1374 // StateCandidate, we may get stale MsgPreVoteResp messages in this term from 1375 // our pre-candidate state). 1376 var myVoteRespType pb.MessageType 1377 if r.state == StatePreCandidate { 1378 myVoteRespType = pb.MsgPreVoteResp 1379 } else { 1380 myVoteRespType = pb.MsgVoteResp 1381 } 1382 switch m.Type { 1383 case pb.MsgProp: 1384 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) 1385 return ErrProposalDropped 1386 case pb.MsgApp: 1387 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1388 r.handleAppendEntries(m) 1389 case pb.MsgHeartbeat: 1390 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1391 r.handleHeartbeat(m) 1392 case pb.MsgSnap: 1393 r.becomeFollower(m.Term, m.From) // always m.Term == r.Term 1394 r.handleSnapshot(m) 1395 case myVoteRespType: 1396 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) 1397 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) 1398 switch res { 1399 case quorum.VoteWon: 1400 if r.state == StatePreCandidate { 1401 r.campaign(campaignElection) 1402 } else { 1403 r.becomeLeader() 1404 r.bcastAppend() 1405 } 1406 case quorum.VoteLost: 1407 // pb.MsgPreVoteResp contains future term of pre-candidate 1408 // m.Term > r.Term; reuse r.Term 1409 r.becomeFollower(r.Term, None) 1410 } 1411 case pb.MsgTimeoutNow: 1412 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) 1413 } 1414 return nil 1415} 1416 1417func stepFollower(r *raft, m pb.Message) error { 1418 switch m.Type { 1419 case pb.MsgProp: 1420 if r.lead == None { 1421 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) 1422 return ErrProposalDropped 1423 } else if r.disableProposalForwarding { 1424 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) 1425 return ErrProposalDropped 1426 } 1427 m.To = r.lead 1428 r.send(m) 1429 case pb.MsgApp: 1430 r.electionElapsed = 0 1431 r.lead = m.From 1432 r.handleAppendEntries(m) 1433 case pb.MsgHeartbeat: 1434 r.electionElapsed = 0 1435 r.lead = m.From 1436 r.handleHeartbeat(m) 1437 case pb.MsgSnap: 1438 r.electionElapsed = 0 1439 r.lead = m.From 1440 r.handleSnapshot(m) 1441 case pb.MsgTransferLeader: 1442 if r.lead == None { 1443 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) 1444 return nil 1445 } 1446 m.To = r.lead 1447 r.send(m) 1448 case pb.MsgTimeoutNow: 1449 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) 1450 // Leadership transfers never use pre-vote even if r.preVote is true; we 1451 // know we are not recovering from a partition so there is no need for the 1452 // extra round trip. 1453 r.hup(campaignTransfer) 1454 case pb.MsgReadIndex: 1455 if r.lead == None { 1456 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) 1457 return nil 1458 } 1459 m.To = r.lead 1460 r.send(m) 1461 case pb.MsgReadIndexResp: 1462 if len(m.Entries) != 1 { 1463 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) 1464 return nil 1465 } 1466 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) 1467 } 1468 return nil 1469} 1470 1471func (r *raft) handleAppendEntries(m pb.Message) { 1472 if m.Index < r.raftLog.committed { 1473 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) 1474 return 1475 } 1476 1477 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { 1478 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) 1479 } else { 1480 r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", 1481 r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) 1482 1483 // Return a hint to the leader about the maximum index and term that the 1484 // two logs could be divergent at. Do this by searching through the 1485 // follower's log for the maximum (index, term) pair with a term <= the 1486 // MsgApp's LogTerm and an index <= the MsgApp's Index. This can help 1487 // skip all indexes in the follower's uncommitted tail with terms 1488 // greater than the MsgApp's LogTerm. 1489 // 1490 // See the other caller for findConflictByTerm (in stepLeader) for a much 1491 // more detailed explanation of this mechanism. 1492 hintIndex := min(m.Index, r.raftLog.lastIndex()) 1493 hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) 1494 hintTerm, err := r.raftLog.term(hintIndex) 1495 if err != nil { 1496 panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) 1497 } 1498 r.send(pb.Message{ 1499 To: m.From, 1500 Type: pb.MsgAppResp, 1501 Index: m.Index, 1502 Reject: true, 1503 RejectHint: hintIndex, 1504 LogTerm: hintTerm, 1505 }) 1506 } 1507} 1508 1509func (r *raft) handleHeartbeat(m pb.Message) { 1510 r.raftLog.commitTo(m.Commit) 1511 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) 1512} 1513 1514func (r *raft) handleSnapshot(m pb.Message) { 1515 sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term 1516 if r.restore(m.Snapshot) { 1517 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", 1518 r.id, r.raftLog.committed, sindex, sterm) 1519 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) 1520 } else { 1521 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", 1522 r.id, r.raftLog.committed, sindex, sterm) 1523 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) 1524 } 1525} 1526 1527// restore recovers the state machine from a snapshot. It restores the log and the 1528// configuration of state machine. If this method returns false, the snapshot was 1529// ignored, either because it was obsolete or because of an error. 1530func (r *raft) restore(s pb.Snapshot) bool { 1531 if s.Metadata.Index <= r.raftLog.committed { 1532 return false 1533 } 1534 if r.state != StateFollower { 1535 // This is defense-in-depth: if the leader somehow ended up applying a 1536 // snapshot, it could move into a new term without moving into a 1537 // follower state. This should never fire, but if it did, we'd have 1538 // prevented damage by returning early, so log only a loud warning. 1539 // 1540 // At the time of writing, the instance is guaranteed to be in follower 1541 // state when this method is called. 1542 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id) 1543 r.becomeFollower(r.Term+1, None) 1544 return false 1545 } 1546 1547 // More defense-in-depth: throw away snapshot if recipient is not in the 1548 // config. This shouldn't ever happen (at the time of writing) but lots of 1549 // code here and there assumes that r.id is in the progress tracker. 1550 found := false 1551 cs := s.Metadata.ConfState 1552 1553 for _, set := range [][]uint64{ 1554 cs.Voters, 1555 cs.Learners, 1556 cs.VotersOutgoing, 1557 // `LearnersNext` doesn't need to be checked. According to the rules, if a peer in 1558 // `LearnersNext`, it has to be in `VotersOutgoing`. 1559 } { 1560 for _, id := range set { 1561 if id == r.id { 1562 found = true 1563 break 1564 } 1565 } 1566 if found { 1567 break 1568 } 1569 } 1570 if !found { 1571 r.logger.Warningf( 1572 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen", 1573 r.id, cs, 1574 ) 1575 return false 1576 } 1577 1578 // Now go ahead and actually restore. 1579 1580 if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { 1581 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", 1582 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) 1583 r.raftLog.commitTo(s.Metadata.Index) 1584 return false 1585 } 1586 1587 r.raftLog.restore(s) 1588 1589 // Reset the configuration and add the (potentially updated) peers in anew. 1590 r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) 1591 cfg, prs, err := confchange.Restore(confchange.Changer{ 1592 Tracker: r.prs, 1593 LastIndex: r.raftLog.lastIndex(), 1594 }, cs) 1595 1596 if err != nil { 1597 // This should never happen. Either there's a bug in our config change 1598 // handling or the client corrupted the conf change. 1599 panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err)) 1600 } 1601 1602 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) 1603 1604 pr := r.prs.Progress[r.id] 1605 pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded 1606 1607 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", 1608 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) 1609 return true 1610} 1611 1612// promotable indicates whether state machine can be promoted to leader, 1613// which is true when its own id is in progress list. 1614func (r *raft) promotable() bool { 1615 pr := r.prs.Progress[r.id] 1616 return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot() 1617} 1618 1619func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState { 1620 cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) { 1621 changer := confchange.Changer{ 1622 Tracker: r.prs, 1623 LastIndex: r.raftLog.lastIndex(), 1624 } 1625 if cc.LeaveJoint() { 1626 return changer.LeaveJoint() 1627 } else if autoLeave, ok := cc.EnterJoint(); ok { 1628 return changer.EnterJoint(autoLeave, cc.Changes...) 1629 } 1630 return changer.Simple(cc.Changes...) 1631 }() 1632 1633 if err != nil { 1634 // TODO(tbg): return the error to the caller. 1635 panic(err) 1636 } 1637 1638 return r.switchToConfig(cfg, prs) 1639} 1640 1641// switchToConfig reconfigures this node to use the provided configuration. It 1642// updates the in-memory state and, when necessary, carries out additional 1643// actions such as reacting to the removal of nodes or changed quorum 1644// requirements. 1645// 1646// The inputs usually result from restoring a ConfState or applying a ConfChange. 1647func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState { 1648 r.prs.Config = cfg 1649 r.prs.Progress = prs 1650 1651 r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config) 1652 cs := r.prs.ConfState() 1653 pr, ok := r.prs.Progress[r.id] 1654 1655 // Update whether the node itself is a learner, resetting to false when the 1656 // node is removed. 1657 r.isLearner = ok && pr.IsLearner 1658 1659 if (!ok || r.isLearner) && r.state == StateLeader { 1660 // This node is leader and was removed or demoted. We prevent demotions 1661 // at the time writing but hypothetically we handle them the same way as 1662 // removing the leader: stepping down into the next Term. 1663 // 1664 // TODO(tbg): step down (for sanity) and ask follower with largest Match 1665 // to TimeoutNow (to avoid interruption). This might still drop some 1666 // proposals but it's better than nothing. 1667 // 1668 // TODO(tbg): test this branch. It is untested at the time of writing. 1669 return cs 1670 } 1671 1672 // The remaining steps only make sense if this node is the leader and there 1673 // are other nodes. 1674 if r.state != StateLeader || len(cs.Voters) == 0 { 1675 return cs 1676 } 1677 1678 if r.maybeCommit() { 1679 // If the configuration change means that more entries are committed now, 1680 // broadcast/append to everyone in the updated config. 1681 r.bcastAppend() 1682 } else { 1683 // Otherwise, still probe the newly added replicas; there's no reason to 1684 // let them wait out a heartbeat interval (or the next incoming 1685 // proposal). 1686 r.prs.Visit(func(id uint64, pr *tracker.Progress) { 1687 r.maybeSendAppend(id, false /* sendIfEmpty */) 1688 }) 1689 } 1690 // If the the leadTransferee was removed or demoted, abort the leadership transfer. 1691 if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { 1692 r.abortLeaderTransfer() 1693 } 1694 1695 return cs 1696} 1697 1698func (r *raft) loadState(state pb.HardState) { 1699 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { 1700 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) 1701 } 1702 r.raftLog.committed = state.Commit 1703 r.Term = state.Term 1704 r.Vote = state.Vote 1705} 1706 1707// pastElectionTimeout returns true iff r.electionElapsed is greater 1708// than or equal to the randomized election timeout in 1709// [electiontimeout, 2 * electiontimeout - 1]. 1710func (r *raft) pastElectionTimeout() bool { 1711 return r.electionElapsed >= r.randomizedElectionTimeout 1712} 1713 1714func (r *raft) resetRandomizedElectionTimeout() { 1715 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) 1716} 1717 1718func (r *raft) sendTimeoutNow(to uint64) { 1719 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) 1720} 1721 1722func (r *raft) abortLeaderTransfer() { 1723 r.leadTransferee = None 1724} 1725 1726// committedEntryInCurrentTerm return true if the peer has committed an entry in its term. 1727func (r *raft) committedEntryInCurrentTerm() bool { 1728 return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term 1729} 1730 1731// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer 1732// itself, a blank value will be returned. 1733func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message { 1734 if req.From == None || req.From == r.id { 1735 r.readStates = append(r.readStates, ReadState{ 1736 Index: readIndex, 1737 RequestCtx: req.Entries[0].Data, 1738 }) 1739 return pb.Message{} 1740 } 1741 return pb.Message{ 1742 Type: pb.MsgReadIndexResp, 1743 To: req.From, 1744 Index: readIndex, 1745 Entries: req.Entries, 1746 } 1747} 1748 1749// increaseUncommittedSize computes the size of the proposed entries and 1750// determines whether they would push leader over its maxUncommittedSize limit. 1751// If the new entries would exceed the limit, the method returns false. If not, 1752// the increase in uncommitted entry size is recorded and the method returns 1753// true. 1754// 1755// Empty payloads are never refused. This is used both for appending an empty 1756// entry at a new leader's term, as well as leaving a joint configuration. 1757func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { 1758 var s uint64 1759 for _, e := range ents { 1760 s += uint64(PayloadSize(e)) 1761 } 1762 1763 if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { 1764 // If the uncommitted tail of the Raft log is empty, allow any size 1765 // proposal. Otherwise, limit the size of the uncommitted tail of the 1766 // log and drop any proposal that would push the size over the limit. 1767 // Note the added requirement s>0 which is used to make sure that 1768 // appending single empty entries to the log always succeeds, used both 1769 // for replicating a new leader's initial empty entry, and for 1770 // auto-leaving joint configurations. 1771 return false 1772 } 1773 r.uncommittedSize += s 1774 return true 1775} 1776 1777// reduceUncommittedSize accounts for the newly committed entries by decreasing 1778// the uncommitted entry size limit. 1779func (r *raft) reduceUncommittedSize(ents []pb.Entry) { 1780 if r.uncommittedSize == 0 { 1781 // Fast-path for followers, who do not track or enforce the limit. 1782 return 1783 } 1784 1785 var s uint64 1786 for _, e := range ents { 1787 s += uint64(PayloadSize(e)) 1788 } 1789 if s > r.uncommittedSize { 1790 // uncommittedSize may underestimate the size of the uncommitted Raft 1791 // log tail but will never overestimate it. Saturate at 0 instead of 1792 // allowing overflow. 1793 r.uncommittedSize = 0 1794 } else { 1795 r.uncommittedSize -= s 1796 } 1797} 1798 1799func numOfPendingConf(ents []pb.Entry) int { 1800 n := 0 1801 for i := range ents { 1802 if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { 1803 n++ 1804 } 1805 } 1806 return n 1807} 1808