1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18	"bytes"
19	"errors"
20	"fmt"
21	"math"
22	"math/rand"
23	"sort"
24	"strings"
25	"sync"
26	"time"
27
28	"go.etcd.io/etcd/raft/v3/confchange"
29	"go.etcd.io/etcd/raft/v3/quorum"
30	pb "go.etcd.io/etcd/raft/v3/raftpb"
31	"go.etcd.io/etcd/raft/v3/tracker"
32)
33
34// None is a placeholder node ID used when there is no leader.
35const None uint64 = 0
36const noLimit = math.MaxUint64
37
38// Possible values for StateType.
39const (
40	StateFollower StateType = iota
41	StateCandidate
42	StateLeader
43	StatePreCandidate
44	numStates
45)
46
47type ReadOnlyOption int
48
49const (
50	// ReadOnlySafe guarantees the linearizability of the read only request by
51	// communicating with the quorum. It is the default and suggested option.
52	ReadOnlySafe ReadOnlyOption = iota
53	// ReadOnlyLeaseBased ensures linearizability of the read only request by
54	// relying on the leader lease. It can be affected by clock drift.
55	// If the clock drift is unbounded, leader might keep the lease longer than it
56	// should (clock can move backward/pause without any bound). ReadIndex is not safe
57	// in that case.
58	ReadOnlyLeaseBased
59)
60
61// Possible values for CampaignType
62const (
63	// campaignPreElection represents the first phase of a normal election when
64	// Config.PreVote is true.
65	campaignPreElection CampaignType = "CampaignPreElection"
66	// campaignElection represents a normal (time-based) election (the second phase
67	// of the election when Config.PreVote is true).
68	campaignElection CampaignType = "CampaignElection"
69	// campaignTransfer represents the type of leader transfer
70	campaignTransfer CampaignType = "CampaignTransfer"
71)
72
73// ErrProposalDropped is returned when the proposal is ignored by some cases,
74// so that the proposer can be notified and fail fast.
75var ErrProposalDropped = errors.New("raft proposal dropped")
76
77// lockedRand is a small wrapper around rand.Rand to provide
78// synchronization among multiple raft groups. Only the methods needed
79// by the code are exposed (e.g. Intn).
80type lockedRand struct {
81	mu   sync.Mutex
82	rand *rand.Rand
83}
84
85func (r *lockedRand) Intn(n int) int {
86	r.mu.Lock()
87	v := r.rand.Intn(n)
88	r.mu.Unlock()
89	return v
90}
91
92var globalRand = &lockedRand{
93	rand: rand.New(rand.NewSource(time.Now().UnixNano())),
94}
95
96// CampaignType represents the type of campaigning
97// the reason we use the type of string instead of uint64
98// is because it's simpler to compare and fill in raft entries
99type CampaignType string
100
101// StateType represents the role of a node in a cluster.
102type StateType uint64
103
104var stmap = [...]string{
105	"StateFollower",
106	"StateCandidate",
107	"StateLeader",
108	"StatePreCandidate",
109}
110
111func (st StateType) String() string {
112	return stmap[uint64(st)]
113}
114
115// Config contains the parameters to start a raft.
116type Config struct {
117	// ID is the identity of the local raft. ID cannot be 0.
118	ID uint64
119
120	// ElectionTick is the number of Node.Tick invocations that must pass between
121	// elections. That is, if a follower does not receive any message from the
122	// leader of current term before ElectionTick has elapsed, it will become
123	// candidate and start an election. ElectionTick must be greater than
124	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
125	// unnecessary leader switching.
126	ElectionTick int
127	// HeartbeatTick is the number of Node.Tick invocations that must pass between
128	// heartbeats. That is, a leader sends heartbeat messages to maintain its
129	// leadership every HeartbeatTick ticks.
130	HeartbeatTick int
131
132	// Storage is the storage for raft. raft generates entries and states to be
133	// stored in storage. raft reads the persisted entries and states out of
134	// Storage when it needs. raft reads out the previous state and configuration
135	// out of storage when restarting.
136	Storage Storage
137	// Applied is the last applied index. It should only be set when restarting
138	// raft. raft will not return entries to the application smaller or equal to
139	// Applied. If Applied is unset when restarting, raft might return previous
140	// applied entries. This is a very application dependent configuration.
141	Applied uint64
142
143	// MaxSizePerMsg limits the max byte size of each append message. Smaller
144	// value lowers the raft recovery cost(initial probing and message lost
145	// during normal operation). On the other side, it might affect the
146	// throughput during normal replication. Note: math.MaxUint64 for unlimited,
147	// 0 for at most one entry per message.
148	MaxSizePerMsg uint64
149	// MaxCommittedSizePerReady limits the size of the committed entries which
150	// can be applied.
151	MaxCommittedSizePerReady uint64
152	// MaxUncommittedEntriesSize limits the aggregate byte size of the
153	// uncommitted entries that may be appended to a leader's log. Once this
154	// limit is exceeded, proposals will begin to return ErrProposalDropped
155	// errors. Note: 0 for no limit.
156	MaxUncommittedEntriesSize uint64
157	// MaxInflightMsgs limits the max number of in-flight append messages during
158	// optimistic replication phase. The application transportation layer usually
159	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
160	// overflowing that sending buffer. TODO (xiangli): feedback to application to
161	// limit the proposal rate?
162	MaxInflightMsgs int
163
164	// CheckQuorum specifies if the leader should check quorum activity. Leader
165	// steps down when quorum is not active for an electionTimeout.
166	CheckQuorum bool
167
168	// PreVote enables the Pre-Vote algorithm described in raft thesis section
169	// 9.6. This prevents disruption when a node that has been partitioned away
170	// rejoins the cluster.
171	PreVote bool
172
173	// ReadOnlyOption specifies how the read only request is processed.
174	//
175	// ReadOnlySafe guarantees the linearizability of the read only request by
176	// communicating with the quorum. It is the default and suggested option.
177	//
178	// ReadOnlyLeaseBased ensures linearizability of the read only request by
179	// relying on the leader lease. It can be affected by clock drift.
180	// If the clock drift is unbounded, leader might keep the lease longer than it
181	// should (clock can move backward/pause without any bound). ReadIndex is not safe
182	// in that case.
183	// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
184	ReadOnlyOption ReadOnlyOption
185
186	// Logger is the logger used for raft log. For multinode which can host
187	// multiple raft group, each raft group can have its own logger
188	Logger Logger
189
190	// DisableProposalForwarding set to true means that followers will drop
191	// proposals, rather than forwarding them to the leader. One use case for
192	// this feature would be in a situation where the Raft leader is used to
193	// compute the data of a proposal, for example, adding a timestamp from a
194	// hybrid logical clock to data in a monotonically increasing way. Forwarding
195	// should be disabled to prevent a follower with an inaccurate hybrid
196	// logical clock from assigning the timestamp and then forwarding the data
197	// to the leader.
198	DisableProposalForwarding bool
199}
200
201func (c *Config) validate() error {
202	if c.ID == None {
203		return errors.New("cannot use none as id")
204	}
205
206	if c.HeartbeatTick <= 0 {
207		return errors.New("heartbeat tick must be greater than 0")
208	}
209
210	if c.ElectionTick <= c.HeartbeatTick {
211		return errors.New("election tick must be greater than heartbeat tick")
212	}
213
214	if c.Storage == nil {
215		return errors.New("storage cannot be nil")
216	}
217
218	if c.MaxUncommittedEntriesSize == 0 {
219		c.MaxUncommittedEntriesSize = noLimit
220	}
221
222	// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
223	// previously the same parameter.
224	if c.MaxCommittedSizePerReady == 0 {
225		c.MaxCommittedSizePerReady = c.MaxSizePerMsg
226	}
227
228	if c.MaxInflightMsgs <= 0 {
229		return errors.New("max inflight messages must be greater than 0")
230	}
231
232	if c.Logger == nil {
233		c.Logger = raftLogger
234	}
235
236	if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
237		return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
238	}
239
240	return nil
241}
242
243type raft struct {
244	id uint64
245
246	Term uint64
247	Vote uint64
248
249	readStates []ReadState
250
251	// the log
252	raftLog *raftLog
253
254	maxMsgSize         uint64
255	maxUncommittedSize uint64
256	// TODO(tbg): rename to trk.
257	prs tracker.ProgressTracker
258
259	state StateType
260
261	// isLearner is true if the local raft node is a learner.
262	isLearner bool
263
264	msgs []pb.Message
265
266	// the leader id
267	lead uint64
268	// leadTransferee is id of the leader transfer target when its value is not zero.
269	// Follow the procedure defined in raft thesis 3.10.
270	leadTransferee uint64
271	// Only one conf change may be pending (in the log, but not yet
272	// applied) at a time. This is enforced via pendingConfIndex, which
273	// is set to a value >= the log index of the latest pending
274	// configuration change (if any). Config changes are only allowed to
275	// be proposed if the leader's applied index is greater than this
276	// value.
277	pendingConfIndex uint64
278	// an estimate of the size of the uncommitted tail of the Raft log. Used to
279	// prevent unbounded log growth. Only maintained by the leader. Reset on
280	// term changes.
281	uncommittedSize uint64
282
283	readOnly *readOnly
284
285	// number of ticks since it reached last electionTimeout when it is leader
286	// or candidate.
287	// number of ticks since it reached last electionTimeout or received a
288	// valid message from current leader when it is a follower.
289	electionElapsed int
290
291	// number of ticks since it reached last heartbeatTimeout.
292	// only leader keeps heartbeatElapsed.
293	heartbeatElapsed int
294
295	checkQuorum bool
296	preVote     bool
297
298	heartbeatTimeout int
299	electionTimeout  int
300	// randomizedElectionTimeout is a random number between
301	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
302	// when raft changes its state to follower or candidate.
303	randomizedElectionTimeout int
304	disableProposalForwarding bool
305
306	tick func()
307	step stepFunc
308
309	logger Logger
310}
311
312func newRaft(c *Config) *raft {
313	if err := c.validate(); err != nil {
314		panic(err.Error())
315	}
316	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
317	hs, cs, err := c.Storage.InitialState()
318	if err != nil {
319		panic(err) // TODO(bdarnell)
320	}
321
322	r := &raft{
323		id:                        c.ID,
324		lead:                      None,
325		isLearner:                 false,
326		raftLog:                   raftlog,
327		maxMsgSize:                c.MaxSizePerMsg,
328		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
329		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
330		electionTimeout:           c.ElectionTick,
331		heartbeatTimeout:          c.HeartbeatTick,
332		logger:                    c.Logger,
333		checkQuorum:               c.CheckQuorum,
334		preVote:                   c.PreVote,
335		readOnly:                  newReadOnly(c.ReadOnlyOption),
336		disableProposalForwarding: c.DisableProposalForwarding,
337	}
338
339	cfg, prs, err := confchange.Restore(confchange.Changer{
340		Tracker:   r.prs,
341		LastIndex: raftlog.lastIndex(),
342	}, cs)
343	if err != nil {
344		panic(err)
345	}
346	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
347
348	if !IsEmptyHardState(hs) {
349		r.loadState(hs)
350	}
351	if c.Applied > 0 {
352		raftlog.appliedTo(c.Applied)
353	}
354	r.becomeFollower(r.Term, None)
355
356	var nodesStrs []string
357	for _, n := range r.prs.VoterNodes() {
358		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
359	}
360
361	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
362		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
363	return r
364}
365
366func (r *raft) hasLeader() bool { return r.lead != None }
367
368func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
369
370func (r *raft) hardState() pb.HardState {
371	return pb.HardState{
372		Term:   r.Term,
373		Vote:   r.Vote,
374		Commit: r.raftLog.committed,
375	}
376}
377
378// send schedules persisting state to a stable storage and AFTER that
379// sending the message (as part of next Ready message processing).
380func (r *raft) send(m pb.Message) {
381	if m.From == None {
382		m.From = r.id
383	}
384	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
385		if m.Term == 0 {
386			// All {pre-,}campaign messages need to have the term set when
387			// sending.
388			// - MsgVote: m.Term is the term the node is campaigning for,
389			//   non-zero as we increment the term when campaigning.
390			// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
391			//   granted, non-zero for the same reason MsgVote is
392			// - MsgPreVote: m.Term is the term the node will campaign,
393			//   non-zero as we use m.Term to indicate the next term we'll be
394			//   campaigning for
395			// - MsgPreVoteResp: m.Term is the term received in the original
396			//   MsgPreVote if the pre-vote was granted, non-zero for the
397			//   same reasons MsgPreVote is
398			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
399		}
400	} else {
401		if m.Term != 0 {
402			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
403		}
404		// do not attach term to MsgProp, MsgReadIndex
405		// proposals are a way to forward to the leader and
406		// should be treated as local message.
407		// MsgReadIndex is also forwarded to leader.
408		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
409			m.Term = r.Term
410		}
411	}
412	r.msgs = append(r.msgs, m)
413}
414
415// sendAppend sends an append RPC with new entries (if any) and the
416// current commit index to the given peer.
417func (r *raft) sendAppend(to uint64) {
418	r.maybeSendAppend(to, true)
419}
420
421// maybeSendAppend sends an append RPC with new entries to the given peer,
422// if necessary. Returns true if a message was sent. The sendIfEmpty
423// argument controls whether messages with no entries will be sent
424// ("empty" messages are useful to convey updated Commit indexes, but
425// are undesirable when we're sending multiple messages in a batch).
426func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
427	pr := r.prs.Progress[to]
428	if pr.IsPaused() {
429		return false
430	}
431	m := pb.Message{}
432	m.To = to
433
434	term, errt := r.raftLog.term(pr.Next - 1)
435	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
436	if len(ents) == 0 && !sendIfEmpty {
437		return false
438	}
439
440	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
441		if !pr.RecentActive {
442			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
443			return false
444		}
445
446		m.Type = pb.MsgSnap
447		snapshot, err := r.raftLog.snapshot()
448		if err != nil {
449			if err == ErrSnapshotTemporarilyUnavailable {
450				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
451				return false
452			}
453			panic(err) // TODO(bdarnell)
454		}
455		if IsEmptySnap(snapshot) {
456			panic("need non-empty snapshot")
457		}
458		m.Snapshot = snapshot
459		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
460		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
461			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
462		pr.BecomeSnapshot(sindex)
463		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
464	} else {
465		m.Type = pb.MsgApp
466		m.Index = pr.Next - 1
467		m.LogTerm = term
468		m.Entries = ents
469		m.Commit = r.raftLog.committed
470		if n := len(m.Entries); n != 0 {
471			switch pr.State {
472			// optimistically increase the next when in StateReplicate
473			case tracker.StateReplicate:
474				last := m.Entries[n-1].Index
475				pr.OptimisticUpdate(last)
476				pr.Inflights.Add(last)
477			case tracker.StateProbe:
478				pr.ProbeSent = true
479			default:
480				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
481			}
482		}
483	}
484	r.send(m)
485	return true
486}
487
488// sendHeartbeat sends a heartbeat RPC to the given peer.
489func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
490	// Attach the commit as min(to.matched, r.committed).
491	// When the leader sends out heartbeat message,
492	// the receiver(follower) might not be matched with the leader
493	// or it might not have all the committed entries.
494	// The leader MUST NOT forward the follower's commit to
495	// an unmatched index.
496	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
497	m := pb.Message{
498		To:      to,
499		Type:    pb.MsgHeartbeat,
500		Commit:  commit,
501		Context: ctx,
502	}
503
504	r.send(m)
505}
506
507// bcastAppend sends RPC, with entries to all peers that are not up-to-date
508// according to the progress recorded in r.prs.
509func (r *raft) bcastAppend() {
510	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
511		if id == r.id {
512			return
513		}
514		r.sendAppend(id)
515	})
516}
517
518// bcastHeartbeat sends RPC, without entries to all the peers.
519func (r *raft) bcastHeartbeat() {
520	lastCtx := r.readOnly.lastPendingRequestCtx()
521	if len(lastCtx) == 0 {
522		r.bcastHeartbeatWithCtx(nil)
523	} else {
524		r.bcastHeartbeatWithCtx([]byte(lastCtx))
525	}
526}
527
528func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
529	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
530		if id == r.id {
531			return
532		}
533		r.sendHeartbeat(id, ctx)
534	})
535}
536
537func (r *raft) advance(rd Ready) {
538	r.reduceUncommittedSize(rd.CommittedEntries)
539
540	// If entries were applied (or a snapshot), update our cursor for
541	// the next Ready. Note that if the current HardState contains a
542	// new Commit index, this does not mean that we're also applying
543	// all of the new entries due to commit pagination by size.
544	if newApplied := rd.appliedCursor(); newApplied > 0 {
545		oldApplied := r.raftLog.applied
546		r.raftLog.appliedTo(newApplied)
547
548		if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
549			// If the current (and most recent, at least for this leader's term)
550			// configuration should be auto-left, initiate that now. We use a
551			// nil Data which unmarshals into an empty ConfChangeV2 and has the
552			// benefit that appendEntry can never refuse it based on its size
553			// (which registers as zero).
554			ent := pb.Entry{
555				Type: pb.EntryConfChangeV2,
556				Data: nil,
557			}
558			// There's no way in which this proposal should be able to be rejected.
559			if !r.appendEntry(ent) {
560				panic("refused un-refusable auto-leaving ConfChangeV2")
561			}
562			r.pendingConfIndex = r.raftLog.lastIndex()
563			r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
564		}
565	}
566
567	if len(rd.Entries) > 0 {
568		e := rd.Entries[len(rd.Entries)-1]
569		r.raftLog.stableTo(e.Index, e.Term)
570	}
571	if !IsEmptySnap(rd.Snapshot) {
572		r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
573	}
574}
575
576// maybeCommit attempts to advance the commit index. Returns true if
577// the commit index changed (in which case the caller should call
578// r.bcastAppend).
579func (r *raft) maybeCommit() bool {
580	mci := r.prs.Committed()
581	return r.raftLog.maybeCommit(mci, r.Term)
582}
583
584func (r *raft) reset(term uint64) {
585	if r.Term != term {
586		r.Term = term
587		r.Vote = None
588	}
589	r.lead = None
590
591	r.electionElapsed = 0
592	r.heartbeatElapsed = 0
593	r.resetRandomizedElectionTimeout()
594
595	r.abortLeaderTransfer()
596
597	r.prs.ResetVotes()
598	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
599		*pr = tracker.Progress{
600			Match:     0,
601			Next:      r.raftLog.lastIndex() + 1,
602			Inflights: tracker.NewInflights(r.prs.MaxInflight),
603			IsLearner: pr.IsLearner,
604		}
605		if id == r.id {
606			pr.Match = r.raftLog.lastIndex()
607		}
608	})
609
610	r.pendingConfIndex = 0
611	r.uncommittedSize = 0
612	r.readOnly = newReadOnly(r.readOnly.option)
613}
614
615func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
616	li := r.raftLog.lastIndex()
617	for i := range es {
618		es[i].Term = r.Term
619		es[i].Index = li + 1 + uint64(i)
620	}
621	// Track the size of this uncommitted proposal.
622	if !r.increaseUncommittedSize(es) {
623		r.logger.Debugf(
624			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
625			r.id,
626		)
627		// Drop the proposal.
628		return false
629	}
630	// use latest "last" index after truncate/append
631	li = r.raftLog.append(es...)
632	r.prs.Progress[r.id].MaybeUpdate(li)
633	// Regardless of maybeCommit's return, our caller will call bcastAppend.
634	r.maybeCommit()
635	return true
636}
637
638// tickElection is run by followers and candidates after r.electionTimeout.
639func (r *raft) tickElection() {
640	r.electionElapsed++
641
642	if r.promotable() && r.pastElectionTimeout() {
643		r.electionElapsed = 0
644		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
645	}
646}
647
648// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
649func (r *raft) tickHeartbeat() {
650	r.heartbeatElapsed++
651	r.electionElapsed++
652
653	if r.electionElapsed >= r.electionTimeout {
654		r.electionElapsed = 0
655		if r.checkQuorum {
656			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
657		}
658		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
659		if r.state == StateLeader && r.leadTransferee != None {
660			r.abortLeaderTransfer()
661		}
662	}
663
664	if r.state != StateLeader {
665		return
666	}
667
668	if r.heartbeatElapsed >= r.heartbeatTimeout {
669		r.heartbeatElapsed = 0
670		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
671	}
672}
673
674func (r *raft) becomeFollower(term uint64, lead uint64) {
675	r.step = stepFollower
676	r.reset(term)
677	r.tick = r.tickElection
678	r.lead = lead
679	r.state = StateFollower
680	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
681}
682
683func (r *raft) becomeCandidate() {
684	// TODO(xiangli) remove the panic when the raft implementation is stable
685	if r.state == StateLeader {
686		panic("invalid transition [leader -> candidate]")
687	}
688	r.step = stepCandidate
689	r.reset(r.Term + 1)
690	r.tick = r.tickElection
691	r.Vote = r.id
692	r.state = StateCandidate
693	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
694}
695
696func (r *raft) becomePreCandidate() {
697	// TODO(xiangli) remove the panic when the raft implementation is stable
698	if r.state == StateLeader {
699		panic("invalid transition [leader -> pre-candidate]")
700	}
701	// Becoming a pre-candidate changes our step functions and state,
702	// but doesn't change anything else. In particular it does not increase
703	// r.Term or change r.Vote.
704	r.step = stepCandidate
705	r.prs.ResetVotes()
706	r.tick = r.tickElection
707	r.lead = None
708	r.state = StatePreCandidate
709	r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
710}
711
712func (r *raft) becomeLeader() {
713	// TODO(xiangli) remove the panic when the raft implementation is stable
714	if r.state == StateFollower {
715		panic("invalid transition [follower -> leader]")
716	}
717	r.step = stepLeader
718	r.reset(r.Term)
719	r.tick = r.tickHeartbeat
720	r.lead = r.id
721	r.state = StateLeader
722	// Followers enter replicate mode when they've been successfully probed
723	// (perhaps after having received a snapshot as a result). The leader is
724	// trivially in this state. Note that r.reset() has initialized this
725	// progress with the last index already.
726	r.prs.Progress[r.id].BecomeReplicate()
727
728	// Conservatively set the pendingConfIndex to the last index in the
729	// log. There may or may not be a pending config change, but it's
730	// safe to delay any future proposals until we commit all our
731	// pending log entries, and scanning the entire tail of the log
732	// could be expensive.
733	r.pendingConfIndex = r.raftLog.lastIndex()
734
735	emptyEnt := pb.Entry{Data: nil}
736	if !r.appendEntry(emptyEnt) {
737		// This won't happen because we just called reset() above.
738		r.logger.Panic("empty entry was dropped")
739	}
740	// As a special case, don't count the initial empty entry towards the
741	// uncommitted log quota. This is because we want to preserve the
742	// behavior of allowing one entry larger than quota if the current
743	// usage is zero.
744	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
745	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
746}
747
748func (r *raft) hup(t CampaignType) {
749	if r.state == StateLeader {
750		r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
751		return
752	}
753
754	if !r.promotable() {
755		r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
756		return
757	}
758	ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
759	if err != nil {
760		r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
761	}
762	if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
763		r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
764		return
765	}
766
767	r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
768	r.campaign(t)
769}
770
771// campaign transitions the raft instance to candidate state. This must only be
772// called after verifying that this is a legitimate transition.
773func (r *raft) campaign(t CampaignType) {
774	if !r.promotable() {
775		// This path should not be hit (callers are supposed to check), but
776		// better safe than sorry.
777		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
778	}
779	var term uint64
780	var voteMsg pb.MessageType
781	if t == campaignPreElection {
782		r.becomePreCandidate()
783		voteMsg = pb.MsgPreVote
784		// PreVote RPCs are sent for the next term before we've incremented r.Term.
785		term = r.Term + 1
786	} else {
787		r.becomeCandidate()
788		voteMsg = pb.MsgVote
789		term = r.Term
790	}
791	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
792		// We won the election after voting for ourselves (which must mean that
793		// this is a single-node cluster). Advance to the next state.
794		if t == campaignPreElection {
795			r.campaign(campaignElection)
796		} else {
797			r.becomeLeader()
798		}
799		return
800	}
801	var ids []uint64
802	{
803		idMap := r.prs.Voters.IDs()
804		ids = make([]uint64, 0, len(idMap))
805		for id := range idMap {
806			ids = append(ids, id)
807		}
808		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
809	}
810	for _, id := range ids {
811		if id == r.id {
812			continue
813		}
814		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
815			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
816
817		var ctx []byte
818		if t == campaignTransfer {
819			ctx = []byte(t)
820		}
821		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
822	}
823}
824
825func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
826	if v {
827		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
828	} else {
829		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
830	}
831	r.prs.RecordVote(id, v)
832	return r.prs.TallyVotes()
833}
834
835func (r *raft) Step(m pb.Message) error {
836	// Handle the message term, which may result in our stepping down to a follower.
837	switch {
838	case m.Term == 0:
839		// local message
840	case m.Term > r.Term:
841		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
842			force := bytes.Equal(m.Context, []byte(campaignTransfer))
843			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
844			if !force && inLease {
845				// If a server receives a RequestVote request within the minimum election timeout
846				// of hearing from a current leader, it does not update its term or grant its vote
847				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
848					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
849				return nil
850			}
851		}
852		switch {
853		case m.Type == pb.MsgPreVote:
854			// Never change our term in response to a PreVote
855		case m.Type == pb.MsgPreVoteResp && !m.Reject:
856			// We send pre-vote requests with a term in our future. If the
857			// pre-vote is granted, we will increment our term when we get a
858			// quorum. If it is not, the term comes from the node that
859			// rejected our vote so we should become a follower at the new
860			// term.
861		default:
862			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
863				r.id, r.Term, m.Type, m.From, m.Term)
864			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
865				r.becomeFollower(m.Term, m.From)
866			} else {
867				r.becomeFollower(m.Term, None)
868			}
869		}
870
871	case m.Term < r.Term:
872		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
873			// We have received messages from a leader at a lower term. It is possible
874			// that these messages were simply delayed in the network, but this could
875			// also mean that this node has advanced its term number during a network
876			// partition, and it is now unable to either win an election or to rejoin
877			// the majority on the old term. If checkQuorum is false, this will be
878			// handled by incrementing term numbers in response to MsgVote with a
879			// higher term, but if checkQuorum is true we may not advance the term on
880			// MsgVote and must generate other messages to advance the term. The net
881			// result of these two features is to minimize the disruption caused by
882			// nodes that have been removed from the cluster's configuration: a
883			// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
884			// but it will not receive MsgApp or MsgHeartbeat, so it will not create
885			// disruptive term increases, by notifying leader of this node's activeness.
886			// The above comments also true for Pre-Vote
887			//
888			// When follower gets isolated, it soon starts an election ending
889			// up with a higher term than leader, although it won't receive enough
890			// votes to win the election. When it regains connectivity, this response
891			// with "pb.MsgAppResp" of higher term would force leader to step down.
892			// However, this disruption is inevitable to free this stuck node with
893			// fresh election. This can be prevented with Pre-Vote phase.
894			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
895		} else if m.Type == pb.MsgPreVote {
896			// Before Pre-Vote enable, there may have candidate with higher term,
897			// but less log. After update to Pre-Vote, the cluster may deadlock if
898			// we drop messages with a lower term.
899			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
900				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
901			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
902		} else {
903			// ignore other cases
904			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
905				r.id, r.Term, m.Type, m.From, m.Term)
906		}
907		return nil
908	}
909
910	switch m.Type {
911	case pb.MsgHup:
912		if r.preVote {
913			r.hup(campaignPreElection)
914		} else {
915			r.hup(campaignElection)
916		}
917
918	case pb.MsgVote, pb.MsgPreVote:
919		// We can vote if this is a repeat of a vote we've already cast...
920		canVote := r.Vote == m.From ||
921			// ...we haven't voted and we don't think there's a leader yet in this term...
922			(r.Vote == None && r.lead == None) ||
923			// ...or this is a PreVote for a future term...
924			(m.Type == pb.MsgPreVote && m.Term > r.Term)
925		// ...and we believe the candidate is up to date.
926		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
927			// Note: it turns out that that learners must be allowed to cast votes.
928			// This seems counter- intuitive but is necessary in the situation in which
929			// a learner has been promoted (i.e. is now a voter) but has not learned
930			// about this yet.
931			// For example, consider a group in which id=1 is a learner and id=2 and
932			// id=3 are voters. A configuration change promoting 1 can be committed on
933			// the quorum `{2,3}` without the config change being appended to the
934			// learner's log. If the leader (say 2) fails, there are de facto two
935			// voters remaining. Only 3 can win an election (due to its log containing
936			// all committed entries), but to do so it will need 1 to vote. But 1
937			// considers itself a learner and will continue to do so until 3 has
938			// stepped up as leader, replicates the conf change to 1, and 1 applies it.
939			// Ultimately, by receiving a request to vote, the learner realizes that
940			// the candidate believes it to be a voter, and that it should act
941			// accordingly. The candidate's config may be stale, too; but in that case
942			// it won't win the election, at least in the absence of the bug discussed
943			// in:
944			// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
945			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
946				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
947			// When responding to Msg{Pre,}Vote messages we include the term
948			// from the message, not the local term. To see why, consider the
949			// case where a single node was previously partitioned away and
950			// it's local term is now out of date. If we include the local term
951			// (recall that for pre-votes we don't update the local term), the
952			// (pre-)campaigning node on the other end will proceed to ignore
953			// the message (it ignores all out of date messages).
954			// The term in the original message and current local term are the
955			// same in the case of regular votes, but different for pre-votes.
956			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
957			if m.Type == pb.MsgVote {
958				// Only record real votes.
959				r.electionElapsed = 0
960				r.Vote = m.From
961			}
962		} else {
963			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
964				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
965			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
966		}
967
968	default:
969		err := r.step(r, m)
970		if err != nil {
971			return err
972		}
973	}
974	return nil
975}
976
977type stepFunc func(r *raft, m pb.Message) error
978
979func stepLeader(r *raft, m pb.Message) error {
980	// These message types do not require any progress for m.From.
981	switch m.Type {
982	case pb.MsgBeat:
983		r.bcastHeartbeat()
984		return nil
985	case pb.MsgCheckQuorum:
986		// The leader should always see itself as active. As a precaution, handle
987		// the case in which the leader isn't in the configuration any more (for
988		// example if it just removed itself).
989		//
990		// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
991		// leader steps down when removing itself. I might be missing something.
992		if pr := r.prs.Progress[r.id]; pr != nil {
993			pr.RecentActive = true
994		}
995		if !r.prs.QuorumActive() {
996			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
997			r.becomeFollower(r.Term, None)
998		}
999		// Mark everyone (but ourselves) as inactive in preparation for the next
1000		// CheckQuorum.
1001		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1002			if id != r.id {
1003				pr.RecentActive = false
1004			}
1005		})
1006		return nil
1007	case pb.MsgProp:
1008		if len(m.Entries) == 0 {
1009			r.logger.Panicf("%x stepped empty MsgProp", r.id)
1010		}
1011		if r.prs.Progress[r.id] == nil {
1012			// If we are not currently a member of the range (i.e. this node
1013			// was removed from the configuration while serving as leader),
1014			// drop any new proposals.
1015			return ErrProposalDropped
1016		}
1017		if r.leadTransferee != None {
1018			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
1019			return ErrProposalDropped
1020		}
1021
1022		for i := range m.Entries {
1023			e := &m.Entries[i]
1024			var cc pb.ConfChangeI
1025			if e.Type == pb.EntryConfChange {
1026				var ccc pb.ConfChange
1027				if err := ccc.Unmarshal(e.Data); err != nil {
1028					panic(err)
1029				}
1030				cc = ccc
1031			} else if e.Type == pb.EntryConfChangeV2 {
1032				var ccc pb.ConfChangeV2
1033				if err := ccc.Unmarshal(e.Data); err != nil {
1034					panic(err)
1035				}
1036				cc = ccc
1037			}
1038			if cc != nil {
1039				alreadyPending := r.pendingConfIndex > r.raftLog.applied
1040				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
1041				wantsLeaveJoint := len(cc.AsV2().Changes) == 0
1042
1043				var refused string
1044				if alreadyPending {
1045					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
1046				} else if alreadyJoint && !wantsLeaveJoint {
1047					refused = "must transition out of joint config first"
1048				} else if !alreadyJoint && wantsLeaveJoint {
1049					refused = "not in joint state; refusing empty conf change"
1050				}
1051
1052				if refused != "" {
1053					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
1054					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
1055				} else {
1056					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
1057				}
1058			}
1059		}
1060
1061		if !r.appendEntry(m.Entries...) {
1062			return ErrProposalDropped
1063		}
1064		r.bcastAppend()
1065		return nil
1066	case pb.MsgReadIndex:
1067		// only one voting member (the leader) in the cluster
1068		if r.prs.IsSingleton() {
1069			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1070				r.send(resp)
1071			}
1072			return nil
1073		}
1074
1075		// Reject read only request when this leader has not committed any log entry at its term.
1076		if !r.committedEntryInCurrentTerm() {
1077			return nil
1078		}
1079
1080		// thinking: use an interally defined context instead of the user given context.
1081		// We can express this in terms of the term and index instead of a user-supplied value.
1082		// This would allow multiple reads to piggyback on the same message.
1083		switch r.readOnly.option {
1084		// If more than the local vote is needed, go through a full broadcast.
1085		case ReadOnlySafe:
1086			r.readOnly.addRequest(r.raftLog.committed, m)
1087			// The local node automatically acks the request.
1088			r.readOnly.recvAck(r.id, m.Entries[0].Data)
1089			r.bcastHeartbeatWithCtx(m.Entries[0].Data)
1090		case ReadOnlyLeaseBased:
1091			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1092				r.send(resp)
1093			}
1094		}
1095		return nil
1096	}
1097
1098	// All other message types require a progress for m.From (pr).
1099	pr := r.prs.Progress[m.From]
1100	if pr == nil {
1101		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1102		return nil
1103	}
1104	switch m.Type {
1105	case pb.MsgAppResp:
1106		pr.RecentActive = true
1107
1108		if m.Reject {
1109			// RejectHint is the suggested next base entry for appending (i.e.
1110			// we try to append entry RejectHint+1 next), and LogTerm is the
1111			// term that the follower has at index RejectHint. Older versions
1112			// of this library did not populate LogTerm for rejections and it
1113			// is zero for followers with an empty log.
1114			//
1115			// Under normal circumstances, the leader's log is longer than the
1116			// follower's and the follower's log is a prefix of the leader's
1117			// (i.e. there is no divergent uncommitted suffix of the log on the
1118			// follower). In that case, the first probe reveals where the
1119			// follower's log ends (RejectHint=follower's last index) and the
1120			// subsequent probe succeeds.
1121			//
1122			// However, when networks are partitioned or systems overloaded,
1123			// large divergent log tails can occur. The naive attempt, probing
1124			// entry by entry in decreasing order, will be the product of the
1125			// length of the diverging tails and the network round-trip latency,
1126			// which can easily result in hours of time spent probing and can
1127			// even cause outright outages. The probes are thus optimized as
1128			// described below.
1129			r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
1130				r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
1131			nextProbeIdx := m.RejectHint
1132			if m.LogTerm > 0 {
1133				// If the follower has an uncommitted log tail, we would end up
1134				// probing one by one until we hit the common prefix.
1135				//
1136				// For example, if the leader has:
1137				//
1138				//   idx        1 2 3 4 5 6 7 8 9
1139				//              -----------------
1140				//   term (L)   1 3 3 3 5 5 5 5 5
1141				//   term (F)   1 1 1 1 2 2
1142				//
1143				// Then, after sending an append anchored at (idx=9,term=5) we
1144				// would receive a RejectHint of 6 and LogTerm of 2. Without the
1145				// code below, we would try an append at index 6, which would
1146				// fail again.
1147				//
1148				// However, looking only at what the leader knows about its own
1149				// log and the rejection hint, it is clear that a probe at index
1150				// 6, 5, 4, 3, and 2 must fail as well:
1151				//
1152				// For all of these indexes, the leader's log term is larger than
1153				// the rejection's log term. If a probe at one of these indexes
1154				// succeeded, its log term at that index would match the leader's,
1155				// i.e. 3 or 5 in this example. But the follower already told the
1156				// leader that it is still at term 2 at index 9, and since the
1157				// log term only ever goes up (within a log), this is a contradiction.
1158				//
1159				// At index 1, however, the leader can draw no such conclusion,
1160				// as its term 1 is not larger than the term 2 from the
1161				// follower's rejection. We thus probe at 1, which will succeed
1162				// in this example. In general, with this approach we probe at
1163				// most once per term found in the leader's log.
1164				//
1165				// There is a similar mechanism on the follower (implemented in
1166				// handleAppendEntries via a call to findConflictByTerm) that is
1167				// useful if the follower has a large divergent uncommitted log
1168				// tail[1], as in this example:
1169				//
1170				//   idx        1 2 3 4 5 6 7 8 9
1171				//              -----------------
1172				//   term (L)   1 3 3 3 3 3 3 3 7
1173				//   term (F)   1 3 3 4 4 5 5 5 6
1174				//
1175				// Naively, the leader would probe at idx=9, receive a rejection
1176				// revealing the log term of 6 at the follower. Since the leader's
1177				// term at the previous index is already smaller than 6, the leader-
1178				// side optimization discussed above is ineffective. The leader thus
1179				// probes at index 8 and, naively, receives a rejection for the same
1180				// index and log term 5. Again, the leader optimization does not improve
1181				// over linear probing as term 5 is above the leader's term 3 for that
1182				// and many preceding indexes; the leader would have to probe linearly
1183				// until it would finally hit index 3, where the probe would succeed.
1184				//
1185				// Instead, we apply a similar optimization on the follower. When the
1186				// follower receives the probe at index 8 (log term 3), it concludes
1187				// that all of the leader's log preceding that index has log terms of
1188				// 3 or below. The largest index in the follower's log with a log term
1189				// of 3 or below is index 3. The follower will thus return a rejection
1190				// for index=3, log term=3 instead. The leader's next probe will then
1191				// succeed at that index.
1192				//
1193				// [1]: more precisely, if the log terms in the large uncommitted
1194				// tail on the follower are larger than the leader's. At first,
1195				// it may seem unintuitive that a follower could even have such
1196				// a large tail, but it can happen:
1197				//
1198				// 1. Leader appends (but does not commit) entries 2 and 3, crashes.
1199				//   idx        1 2 3 4 5 6 7 8 9
1200				//              -----------------
1201				//   term (L)   1 2 2     [crashes]
1202				//   term (F)   1
1203				//   term (F)   1
1204				//
1205				// 2. a follower becomes leader and appends entries at term 3.
1206				//              -----------------
1207				//   term (x)   1 2 2     [down]
1208				//   term (F)   1 3 3 3 3
1209				//   term (F)   1
1210				//
1211				// 3. term 3 leader goes down, term 2 leader returns as term 4
1212				//    leader. It commits the log & entries at term 4.
1213				//
1214				//              -----------------
1215				//   term (L)   1 2 2 2
1216				//   term (x)   1 3 3 3 3 [down]
1217				//   term (F)   1
1218				//              -----------------
1219				//   term (L)   1 2 2 2 4 4 4
1220				//   term (F)   1 3 3 3 3 [gets probed]
1221				//   term (F)   1 2 2 2 4 4 4
1222				//
1223				// 4. the leader will now probe the returning follower at index
1224				//    7, the rejection points it at the end of the follower's log
1225				//    which is at a higher log term than the actually committed
1226				//    log.
1227				nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
1228			}
1229			if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
1230				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
1231				if pr.State == tracker.StateReplicate {
1232					pr.BecomeProbe()
1233				}
1234				r.sendAppend(m.From)
1235			}
1236		} else {
1237			oldPaused := pr.IsPaused()
1238			if pr.MaybeUpdate(m.Index) {
1239				switch {
1240				case pr.State == tracker.StateProbe:
1241					pr.BecomeReplicate()
1242				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
1243					// TODO(tbg): we should also enter this branch if a snapshot is
1244					// received that is below pr.PendingSnapshot but which makes it
1245					// possible to use the log again.
1246					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1247					// Transition back to replicating state via probing state
1248					// (which takes the snapshot into account). If we didn't
1249					// move to replicating state, that would only happen with
1250					// the next round of appends (but there may not be a next
1251					// round for a while, exposing an inconsistent RaftStatus).
1252					pr.BecomeProbe()
1253					pr.BecomeReplicate()
1254				case pr.State == tracker.StateReplicate:
1255					pr.Inflights.FreeLE(m.Index)
1256				}
1257
1258				if r.maybeCommit() {
1259					r.bcastAppend()
1260				} else if oldPaused {
1261					// If we were paused before, this node may be missing the
1262					// latest commit index, so send it.
1263					r.sendAppend(m.From)
1264				}
1265				// We've updated flow control information above, which may
1266				// allow us to send multiple (size-limited) in-flight messages
1267				// at once (such as when transitioning from probe to
1268				// replicate, or when freeTo() covers multiple messages). If
1269				// we have more entries to send, send as many messages as we
1270				// can (without sending empty messages for the commit index)
1271				for r.maybeSendAppend(m.From, false) {
1272				}
1273				// Transfer leadership is in progress.
1274				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1275					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1276					r.sendTimeoutNow(m.From)
1277				}
1278			}
1279		}
1280	case pb.MsgHeartbeatResp:
1281		pr.RecentActive = true
1282		pr.ProbeSent = false
1283
1284		// free one slot for the full inflights window to allow progress.
1285		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
1286			pr.Inflights.FreeFirstOne()
1287		}
1288		if pr.Match < r.raftLog.lastIndex() {
1289			r.sendAppend(m.From)
1290		}
1291
1292		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1293			return nil
1294		}
1295
1296		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
1297			return nil
1298		}
1299
1300		rss := r.readOnly.advance(m)
1301		for _, rs := range rss {
1302			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
1303				r.send(resp)
1304			}
1305		}
1306	case pb.MsgSnapStatus:
1307		if pr.State != tracker.StateSnapshot {
1308			return nil
1309		}
1310		// TODO(tbg): this code is very similar to the snapshot handling in
1311		// MsgAppResp above. In fact, the code there is more correct than the
1312		// code here and should likely be updated to match (or even better, the
1313		// logic pulled into a newly created Progress state machine handler).
1314		if !m.Reject {
1315			pr.BecomeProbe()
1316			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1317		} else {
1318			// NB: the order here matters or we'll be probing erroneously from
1319			// the snapshot index, but the snapshot never applied.
1320			pr.PendingSnapshot = 0
1321			pr.BecomeProbe()
1322			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1323		}
1324		// If snapshot finish, wait for the MsgAppResp from the remote node before sending
1325		// out the next MsgApp.
1326		// If snapshot failure, wait for a heartbeat interval before next try
1327		pr.ProbeSent = true
1328	case pb.MsgUnreachable:
1329		// During optimistic replication, if the remote becomes unreachable,
1330		// there is huge probability that a MsgApp is lost.
1331		if pr.State == tracker.StateReplicate {
1332			pr.BecomeProbe()
1333		}
1334		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1335	case pb.MsgTransferLeader:
1336		if pr.IsLearner {
1337			r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1338			return nil
1339		}
1340		leadTransferee := m.From
1341		lastLeadTransferee := r.leadTransferee
1342		if lastLeadTransferee != None {
1343			if lastLeadTransferee == leadTransferee {
1344				r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1345					r.id, r.Term, leadTransferee, leadTransferee)
1346				return nil
1347			}
1348			r.abortLeaderTransfer()
1349			r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1350		}
1351		if leadTransferee == r.id {
1352			r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1353			return nil
1354		}
1355		// Transfer leadership to third party.
1356		r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1357		// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
1358		r.electionElapsed = 0
1359		r.leadTransferee = leadTransferee
1360		if pr.Match == r.raftLog.lastIndex() {
1361			r.sendTimeoutNow(leadTransferee)
1362			r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1363		} else {
1364			r.sendAppend(leadTransferee)
1365		}
1366	}
1367	return nil
1368}
1369
1370// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
1371// whether they respond to MsgVoteResp or MsgPreVoteResp.
1372func stepCandidate(r *raft, m pb.Message) error {
1373	// Only handle vote responses corresponding to our candidacy (while in
1374	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
1375	// our pre-candidate state).
1376	var myVoteRespType pb.MessageType
1377	if r.state == StatePreCandidate {
1378		myVoteRespType = pb.MsgPreVoteResp
1379	} else {
1380		myVoteRespType = pb.MsgVoteResp
1381	}
1382	switch m.Type {
1383	case pb.MsgProp:
1384		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1385		return ErrProposalDropped
1386	case pb.MsgApp:
1387		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1388		r.handleAppendEntries(m)
1389	case pb.MsgHeartbeat:
1390		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1391		r.handleHeartbeat(m)
1392	case pb.MsgSnap:
1393		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1394		r.handleSnapshot(m)
1395	case myVoteRespType:
1396		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1397		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1398		switch res {
1399		case quorum.VoteWon:
1400			if r.state == StatePreCandidate {
1401				r.campaign(campaignElection)
1402			} else {
1403				r.becomeLeader()
1404				r.bcastAppend()
1405			}
1406		case quorum.VoteLost:
1407			// pb.MsgPreVoteResp contains future term of pre-candidate
1408			// m.Term > r.Term; reuse r.Term
1409			r.becomeFollower(r.Term, None)
1410		}
1411	case pb.MsgTimeoutNow:
1412		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1413	}
1414	return nil
1415}
1416
1417func stepFollower(r *raft, m pb.Message) error {
1418	switch m.Type {
1419	case pb.MsgProp:
1420		if r.lead == None {
1421			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1422			return ErrProposalDropped
1423		} else if r.disableProposalForwarding {
1424			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1425			return ErrProposalDropped
1426		}
1427		m.To = r.lead
1428		r.send(m)
1429	case pb.MsgApp:
1430		r.electionElapsed = 0
1431		r.lead = m.From
1432		r.handleAppendEntries(m)
1433	case pb.MsgHeartbeat:
1434		r.electionElapsed = 0
1435		r.lead = m.From
1436		r.handleHeartbeat(m)
1437	case pb.MsgSnap:
1438		r.electionElapsed = 0
1439		r.lead = m.From
1440		r.handleSnapshot(m)
1441	case pb.MsgTransferLeader:
1442		if r.lead == None {
1443			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1444			return nil
1445		}
1446		m.To = r.lead
1447		r.send(m)
1448	case pb.MsgTimeoutNow:
1449		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1450		// Leadership transfers never use pre-vote even if r.preVote is true; we
1451		// know we are not recovering from a partition so there is no need for the
1452		// extra round trip.
1453		r.hup(campaignTransfer)
1454	case pb.MsgReadIndex:
1455		if r.lead == None {
1456			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1457			return nil
1458		}
1459		m.To = r.lead
1460		r.send(m)
1461	case pb.MsgReadIndexResp:
1462		if len(m.Entries) != 1 {
1463			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1464			return nil
1465		}
1466		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1467	}
1468	return nil
1469}
1470
1471func (r *raft) handleAppendEntries(m pb.Message) {
1472	if m.Index < r.raftLog.committed {
1473		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1474		return
1475	}
1476
1477	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1478		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1479	} else {
1480		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
1481			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1482
1483		// Return a hint to the leader about the maximum index and term that the
1484		// two logs could be divergent at. Do this by searching through the
1485		// follower's log for the maximum (index, term) pair with a term <= the
1486		// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
1487		// skip all indexes in the follower's uncommitted tail with terms
1488		// greater than the MsgApp's LogTerm.
1489		//
1490		// See the other caller for findConflictByTerm (in stepLeader) for a much
1491		// more detailed explanation of this mechanism.
1492		hintIndex := min(m.Index, r.raftLog.lastIndex())
1493		hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
1494		hintTerm, err := r.raftLog.term(hintIndex)
1495		if err != nil {
1496			panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
1497		}
1498		r.send(pb.Message{
1499			To:         m.From,
1500			Type:       pb.MsgAppResp,
1501			Index:      m.Index,
1502			Reject:     true,
1503			RejectHint: hintIndex,
1504			LogTerm:    hintTerm,
1505		})
1506	}
1507}
1508
1509func (r *raft) handleHeartbeat(m pb.Message) {
1510	r.raftLog.commitTo(m.Commit)
1511	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1512}
1513
1514func (r *raft) handleSnapshot(m pb.Message) {
1515	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
1516	if r.restore(m.Snapshot) {
1517		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1518			r.id, r.raftLog.committed, sindex, sterm)
1519		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1520	} else {
1521		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1522			r.id, r.raftLog.committed, sindex, sterm)
1523		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1524	}
1525}
1526
1527// restore recovers the state machine from a snapshot. It restores the log and the
1528// configuration of state machine. If this method returns false, the snapshot was
1529// ignored, either because it was obsolete or because of an error.
1530func (r *raft) restore(s pb.Snapshot) bool {
1531	if s.Metadata.Index <= r.raftLog.committed {
1532		return false
1533	}
1534	if r.state != StateFollower {
1535		// This is defense-in-depth: if the leader somehow ended up applying a
1536		// snapshot, it could move into a new term without moving into a
1537		// follower state. This should never fire, but if it did, we'd have
1538		// prevented damage by returning early, so log only a loud warning.
1539		//
1540		// At the time of writing, the instance is guaranteed to be in follower
1541		// state when this method is called.
1542		r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1543		r.becomeFollower(r.Term+1, None)
1544		return false
1545	}
1546
1547	// More defense-in-depth: throw away snapshot if recipient is not in the
1548	// config. This shouldn't ever happen (at the time of writing) but lots of
1549	// code here and there assumes that r.id is in the progress tracker.
1550	found := false
1551	cs := s.Metadata.ConfState
1552
1553	for _, set := range [][]uint64{
1554		cs.Voters,
1555		cs.Learners,
1556		cs.VotersOutgoing,
1557		// `LearnersNext` doesn't need to be checked. According to the rules, if a peer in
1558		// `LearnersNext`, it has to be in `VotersOutgoing`.
1559	} {
1560		for _, id := range set {
1561			if id == r.id {
1562				found = true
1563				break
1564			}
1565		}
1566		if found {
1567			break
1568		}
1569	}
1570	if !found {
1571		r.logger.Warningf(
1572			"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1573			r.id, cs,
1574		)
1575		return false
1576	}
1577
1578	// Now go ahead and actually restore.
1579
1580	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
1581		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1582			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1583		r.raftLog.commitTo(s.Metadata.Index)
1584		return false
1585	}
1586
1587	r.raftLog.restore(s)
1588
1589	// Reset the configuration and add the (potentially updated) peers in anew.
1590	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
1591	cfg, prs, err := confchange.Restore(confchange.Changer{
1592		Tracker:   r.prs,
1593		LastIndex: r.raftLog.lastIndex(),
1594	}, cs)
1595
1596	if err != nil {
1597		// This should never happen. Either there's a bug in our config change
1598		// handling or the client corrupted the conf change.
1599		panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
1600	}
1601
1602	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
1603
1604	pr := r.prs.Progress[r.id]
1605	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
1606
1607	r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1608		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1609	return true
1610}
1611
1612// promotable indicates whether state machine can be promoted to leader,
1613// which is true when its own id is in progress list.
1614func (r *raft) promotable() bool {
1615	pr := r.prs.Progress[r.id]
1616	return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
1617}
1618
1619func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
1620	cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
1621		changer := confchange.Changer{
1622			Tracker:   r.prs,
1623			LastIndex: r.raftLog.lastIndex(),
1624		}
1625		if cc.LeaveJoint() {
1626			return changer.LeaveJoint()
1627		} else if autoLeave, ok := cc.EnterJoint(); ok {
1628			return changer.EnterJoint(autoLeave, cc.Changes...)
1629		}
1630		return changer.Simple(cc.Changes...)
1631	}()
1632
1633	if err != nil {
1634		// TODO(tbg): return the error to the caller.
1635		panic(err)
1636	}
1637
1638	return r.switchToConfig(cfg, prs)
1639}
1640
1641// switchToConfig reconfigures this node to use the provided configuration. It
1642// updates the in-memory state and, when necessary, carries out additional
1643// actions such as reacting to the removal of nodes or changed quorum
1644// requirements.
1645//
1646// The inputs usually result from restoring a ConfState or applying a ConfChange.
1647func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
1648	r.prs.Config = cfg
1649	r.prs.Progress = prs
1650
1651	r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
1652	cs := r.prs.ConfState()
1653	pr, ok := r.prs.Progress[r.id]
1654
1655	// Update whether the node itself is a learner, resetting to false when the
1656	// node is removed.
1657	r.isLearner = ok && pr.IsLearner
1658
1659	if (!ok || r.isLearner) && r.state == StateLeader {
1660		// This node is leader and was removed or demoted. We prevent demotions
1661		// at the time writing but hypothetically we handle them the same way as
1662		// removing the leader: stepping down into the next Term.
1663		//
1664		// TODO(tbg): step down (for sanity) and ask follower with largest Match
1665		// to TimeoutNow (to avoid interruption). This might still drop some
1666		// proposals but it's better than nothing.
1667		//
1668		// TODO(tbg): test this branch. It is untested at the time of writing.
1669		return cs
1670	}
1671
1672	// The remaining steps only make sense if this node is the leader and there
1673	// are other nodes.
1674	if r.state != StateLeader || len(cs.Voters) == 0 {
1675		return cs
1676	}
1677
1678	if r.maybeCommit() {
1679		// If the configuration change means that more entries are committed now,
1680		// broadcast/append to everyone in the updated config.
1681		r.bcastAppend()
1682	} else {
1683		// Otherwise, still probe the newly added replicas; there's no reason to
1684		// let them wait out a heartbeat interval (or the next incoming
1685		// proposal).
1686		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1687			r.maybeSendAppend(id, false /* sendIfEmpty */)
1688		})
1689	}
1690	// If the the leadTransferee was removed or demoted, abort the leadership transfer.
1691	if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
1692		r.abortLeaderTransfer()
1693	}
1694
1695	return cs
1696}
1697
1698func (r *raft) loadState(state pb.HardState) {
1699	if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
1700		r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
1701	}
1702	r.raftLog.committed = state.Commit
1703	r.Term = state.Term
1704	r.Vote = state.Vote
1705}
1706
1707// pastElectionTimeout returns true iff r.electionElapsed is greater
1708// than or equal to the randomized election timeout in
1709// [electiontimeout, 2 * electiontimeout - 1].
1710func (r *raft) pastElectionTimeout() bool {
1711	return r.electionElapsed >= r.randomizedElectionTimeout
1712}
1713
1714func (r *raft) resetRandomizedElectionTimeout() {
1715	r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
1716}
1717
1718func (r *raft) sendTimeoutNow(to uint64) {
1719	r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
1720}
1721
1722func (r *raft) abortLeaderTransfer() {
1723	r.leadTransferee = None
1724}
1725
1726// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
1727func (r *raft) committedEntryInCurrentTerm() bool {
1728	return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
1729}
1730
1731// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
1732// itself, a blank value will be returned.
1733func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
1734	if req.From == None || req.From == r.id {
1735		r.readStates = append(r.readStates, ReadState{
1736			Index:      readIndex,
1737			RequestCtx: req.Entries[0].Data,
1738		})
1739		return pb.Message{}
1740	}
1741	return pb.Message{
1742		Type:    pb.MsgReadIndexResp,
1743		To:      req.From,
1744		Index:   readIndex,
1745		Entries: req.Entries,
1746	}
1747}
1748
1749// increaseUncommittedSize computes the size of the proposed entries and
1750// determines whether they would push leader over its maxUncommittedSize limit.
1751// If the new entries would exceed the limit, the method returns false. If not,
1752// the increase in uncommitted entry size is recorded and the method returns
1753// true.
1754//
1755// Empty payloads are never refused. This is used both for appending an empty
1756// entry at a new leader's term, as well as leaving a joint configuration.
1757func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
1758	var s uint64
1759	for _, e := range ents {
1760		s += uint64(PayloadSize(e))
1761	}
1762
1763	if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
1764		// If the uncommitted tail of the Raft log is empty, allow any size
1765		// proposal. Otherwise, limit the size of the uncommitted tail of the
1766		// log and drop any proposal that would push the size over the limit.
1767		// Note the added requirement s>0 which is used to make sure that
1768		// appending single empty entries to the log always succeeds, used both
1769		// for replicating a new leader's initial empty entry, and for
1770		// auto-leaving joint configurations.
1771		return false
1772	}
1773	r.uncommittedSize += s
1774	return true
1775}
1776
1777// reduceUncommittedSize accounts for the newly committed entries by decreasing
1778// the uncommitted entry size limit.
1779func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
1780	if r.uncommittedSize == 0 {
1781		// Fast-path for followers, who do not track or enforce the limit.
1782		return
1783	}
1784
1785	var s uint64
1786	for _, e := range ents {
1787		s += uint64(PayloadSize(e))
1788	}
1789	if s > r.uncommittedSize {
1790		// uncommittedSize may underestimate the size of the uncommitted Raft
1791		// log tail but will never overestimate it. Saturate at 0 instead of
1792		// allowing overflow.
1793		r.uncommittedSize = 0
1794	} else {
1795		r.uncommittedSize -= s
1796	}
1797}
1798
1799func numOfPendingConf(ents []pb.Entry) int {
1800	n := 0
1801	for i := range ents {
1802		if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
1803			n++
1804		}
1805	}
1806	return n
1807}
1808