1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18	"bytes"
19	"errors"
20	"fmt"
21	"math"
22	"math/rand"
23	"sort"
24	"strings"
25	"sync"
26	"time"
27
28	"go.etcd.io/etcd/raft/confchange"
29	"go.etcd.io/etcd/raft/quorum"
30	pb "go.etcd.io/etcd/raft/raftpb"
31	"go.etcd.io/etcd/raft/tracker"
32)
33
34// None is a placeholder node ID used when there is no leader.
35const None uint64 = 0
36const noLimit = math.MaxUint64
37
38// Possible values for StateType.
39const (
40	StateFollower StateType = iota
41	StateCandidate
42	StateLeader
43	StatePreCandidate
44	numStates
45)
46
47type ReadOnlyOption int
48
49const (
50	// ReadOnlySafe guarantees the linearizability of the read only request by
51	// communicating with the quorum. It is the default and suggested option.
52	ReadOnlySafe ReadOnlyOption = iota
53	// ReadOnlyLeaseBased ensures linearizability of the read only request by
54	// relying on the leader lease. It can be affected by clock drift.
55	// If the clock drift is unbounded, leader might keep the lease longer than it
56	// should (clock can move backward/pause without any bound). ReadIndex is not safe
57	// in that case.
58	ReadOnlyLeaseBased
59)
60
61// Possible values for CampaignType
62const (
63	// campaignPreElection represents the first phase of a normal election when
64	// Config.PreVote is true.
65	campaignPreElection CampaignType = "CampaignPreElection"
66	// campaignElection represents a normal (time-based) election (the second phase
67	// of the election when Config.PreVote is true).
68	campaignElection CampaignType = "CampaignElection"
69	// campaignTransfer represents the type of leader transfer
70	campaignTransfer CampaignType = "CampaignTransfer"
71)
72
73// ErrProposalDropped is returned when the proposal is ignored by some cases,
74// so that the proposer can be notified and fail fast.
75var ErrProposalDropped = errors.New("raft proposal dropped")
76
77// lockedRand is a small wrapper around rand.Rand to provide
78// synchronization among multiple raft groups. Only the methods needed
79// by the code are exposed (e.g. Intn).
80type lockedRand struct {
81	mu   sync.Mutex
82	rand *rand.Rand
83}
84
85func (r *lockedRand) Intn(n int) int {
86	r.mu.Lock()
87	v := r.rand.Intn(n)
88	r.mu.Unlock()
89	return v
90}
91
92var globalRand = &lockedRand{
93	rand: rand.New(rand.NewSource(time.Now().UnixNano())),
94}
95
96// CampaignType represents the type of campaigning
97// the reason we use the type of string instead of uint64
98// is because it's simpler to compare and fill in raft entries
99type CampaignType string
100
101// StateType represents the role of a node in a cluster.
102type StateType uint64
103
104var stmap = [...]string{
105	"StateFollower",
106	"StateCandidate",
107	"StateLeader",
108	"StatePreCandidate",
109}
110
111func (st StateType) String() string {
112	return stmap[uint64(st)]
113}
114
115// Config contains the parameters to start a raft.
116type Config struct {
117	// ID is the identity of the local raft. ID cannot be 0.
118	ID uint64
119
120	// peers contains the IDs of all nodes (including self) in the raft cluster. It
121	// should only be set when starting a new raft cluster. Restarting raft from
122	// previous configuration will panic if peers is set. peer is private and only
123	// used for testing right now.
124	peers []uint64
125
126	// learners contains the IDs of all learner nodes (including self if the
127	// local node is a learner) in the raft cluster. learners only receives
128	// entries from the leader node. It does not vote or promote itself.
129	learners []uint64
130
131	// ElectionTick is the number of Node.Tick invocations that must pass between
132	// elections. That is, if a follower does not receive any message from the
133	// leader of current term before ElectionTick has elapsed, it will become
134	// candidate and start an election. ElectionTick must be greater than
135	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
136	// unnecessary leader switching.
137	ElectionTick int
138	// HeartbeatTick is the number of Node.Tick invocations that must pass between
139	// heartbeats. That is, a leader sends heartbeat messages to maintain its
140	// leadership every HeartbeatTick ticks.
141	HeartbeatTick int
142
143	// Storage is the storage for raft. raft generates entries and states to be
144	// stored in storage. raft reads the persisted entries and states out of
145	// Storage when it needs. raft reads out the previous state and configuration
146	// out of storage when restarting.
147	Storage Storage
148	// Applied is the last applied index. It should only be set when restarting
149	// raft. raft will not return entries to the application smaller or equal to
150	// Applied. If Applied is unset when restarting, raft might return previous
151	// applied entries. This is a very application dependent configuration.
152	Applied uint64
153
154	// MaxSizePerMsg limits the max byte size of each append message. Smaller
155	// value lowers the raft recovery cost(initial probing and message lost
156	// during normal operation). On the other side, it might affect the
157	// throughput during normal replication. Note: math.MaxUint64 for unlimited,
158	// 0 for at most one entry per message.
159	MaxSizePerMsg uint64
160	// MaxCommittedSizePerReady limits the size of the committed entries which
161	// can be applied.
162	MaxCommittedSizePerReady uint64
163	// MaxUncommittedEntriesSize limits the aggregate byte size of the
164	// uncommitted entries that may be appended to a leader's log. Once this
165	// limit is exceeded, proposals will begin to return ErrProposalDropped
166	// errors. Note: 0 for no limit.
167	MaxUncommittedEntriesSize uint64
168	// MaxInflightMsgs limits the max number of in-flight append messages during
169	// optimistic replication phase. The application transportation layer usually
170	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
171	// overflowing that sending buffer. TODO (xiangli): feedback to application to
172	// limit the proposal rate?
173	MaxInflightMsgs int
174
175	// CheckQuorum specifies if the leader should check quorum activity. Leader
176	// steps down when quorum is not active for an electionTimeout.
177	CheckQuorum bool
178
179	// PreVote enables the Pre-Vote algorithm described in raft thesis section
180	// 9.6. This prevents disruption when a node that has been partitioned away
181	// rejoins the cluster.
182	PreVote bool
183
184	// ReadOnlyOption specifies how the read only request is processed.
185	//
186	// ReadOnlySafe guarantees the linearizability of the read only request by
187	// communicating with the quorum. It is the default and suggested option.
188	//
189	// ReadOnlyLeaseBased ensures linearizability of the read only request by
190	// relying on the leader lease. It can be affected by clock drift.
191	// If the clock drift is unbounded, leader might keep the lease longer than it
192	// should (clock can move backward/pause without any bound). ReadIndex is not safe
193	// in that case.
194	// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
195	ReadOnlyOption ReadOnlyOption
196
197	// Logger is the logger used for raft log. For multinode which can host
198	// multiple raft group, each raft group can have its own logger
199	Logger Logger
200
201	// DisableProposalForwarding set to true means that followers will drop
202	// proposals, rather than forwarding them to the leader. One use case for
203	// this feature would be in a situation where the Raft leader is used to
204	// compute the data of a proposal, for example, adding a timestamp from a
205	// hybrid logical clock to data in a monotonically increasing way. Forwarding
206	// should be disabled to prevent a follower with an inaccurate hybrid
207	// logical clock from assigning the timestamp and then forwarding the data
208	// to the leader.
209	DisableProposalForwarding bool
210}
211
212func (c *Config) validate() error {
213	if c.ID == None {
214		return errors.New("cannot use none as id")
215	}
216
217	if c.HeartbeatTick <= 0 {
218		return errors.New("heartbeat tick must be greater than 0")
219	}
220
221	if c.ElectionTick <= c.HeartbeatTick {
222		return errors.New("election tick must be greater than heartbeat tick")
223	}
224
225	if c.Storage == nil {
226		return errors.New("storage cannot be nil")
227	}
228
229	if c.MaxUncommittedEntriesSize == 0 {
230		c.MaxUncommittedEntriesSize = noLimit
231	}
232
233	// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
234	// previously the same parameter.
235	if c.MaxCommittedSizePerReady == 0 {
236		c.MaxCommittedSizePerReady = c.MaxSizePerMsg
237	}
238
239	if c.MaxInflightMsgs <= 0 {
240		return errors.New("max inflight messages must be greater than 0")
241	}
242
243	if c.Logger == nil {
244		c.Logger = raftLogger
245	}
246
247	if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
248		return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
249	}
250
251	return nil
252}
253
254type raft struct {
255	id uint64
256
257	Term uint64
258	Vote uint64
259
260	readStates []ReadState
261
262	// the log
263	raftLog *raftLog
264
265	maxMsgSize         uint64
266	maxUncommittedSize uint64
267	// TODO(tbg): rename to trk.
268	prs tracker.ProgressTracker
269
270	state StateType
271
272	// isLearner is true if the local raft node is a learner.
273	isLearner bool
274
275	msgs []pb.Message
276
277	// the leader id
278	lead uint64
279	// leadTransferee is id of the leader transfer target when its value is not zero.
280	// Follow the procedure defined in raft thesis 3.10.
281	leadTransferee uint64
282	// Only one conf change may be pending (in the log, but not yet
283	// applied) at a time. This is enforced via pendingConfIndex, which
284	// is set to a value >= the log index of the latest pending
285	// configuration change (if any). Config changes are only allowed to
286	// be proposed if the leader's applied index is greater than this
287	// value.
288	pendingConfIndex uint64
289	// an estimate of the size of the uncommitted tail of the Raft log. Used to
290	// prevent unbounded log growth. Only maintained by the leader. Reset on
291	// term changes.
292	uncommittedSize uint64
293
294	readOnly *readOnly
295
296	// number of ticks since it reached last electionTimeout when it is leader
297	// or candidate.
298	// number of ticks since it reached last electionTimeout or received a
299	// valid message from current leader when it is a follower.
300	electionElapsed int
301
302	// number of ticks since it reached last heartbeatTimeout.
303	// only leader keeps heartbeatElapsed.
304	heartbeatElapsed int
305
306	checkQuorum bool
307	preVote     bool
308
309	heartbeatTimeout int
310	electionTimeout  int
311	// randomizedElectionTimeout is a random number between
312	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
313	// when raft changes its state to follower or candidate.
314	randomizedElectionTimeout int
315	disableProposalForwarding bool
316
317	tick func()
318	step stepFunc
319
320	logger Logger
321}
322
323func newRaft(c *Config) *raft {
324	if err := c.validate(); err != nil {
325		panic(err.Error())
326	}
327	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
328	hs, cs, err := c.Storage.InitialState()
329	if err != nil {
330		panic(err) // TODO(bdarnell)
331	}
332
333	if len(c.peers) > 0 || len(c.learners) > 0 {
334		if len(cs.Voters) > 0 || len(cs.Learners) > 0 {
335			// TODO(bdarnell): the peers argument is always nil except in
336			// tests; the argument should be removed and these tests should be
337			// updated to specify their nodes through a snapshot.
338			panic("cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)")
339		}
340		cs.Voters = c.peers
341		cs.Learners = c.learners
342	}
343
344	r := &raft{
345		id:                        c.ID,
346		lead:                      None,
347		isLearner:                 false,
348		raftLog:                   raftlog,
349		maxMsgSize:                c.MaxSizePerMsg,
350		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
351		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
352		electionTimeout:           c.ElectionTick,
353		heartbeatTimeout:          c.HeartbeatTick,
354		logger:                    c.Logger,
355		checkQuorum:               c.CheckQuorum,
356		preVote:                   c.PreVote,
357		readOnly:                  newReadOnly(c.ReadOnlyOption),
358		disableProposalForwarding: c.DisableProposalForwarding,
359	}
360
361	cfg, prs, err := confchange.Restore(confchange.Changer{
362		Tracker:   r.prs,
363		LastIndex: raftlog.lastIndex(),
364	}, cs)
365	if err != nil {
366		panic(err)
367	}
368	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
369
370	if !IsEmptyHardState(hs) {
371		r.loadState(hs)
372	}
373	if c.Applied > 0 {
374		raftlog.appliedTo(c.Applied)
375	}
376	r.becomeFollower(r.Term, None)
377
378	var nodesStrs []string
379	for _, n := range r.prs.VoterNodes() {
380		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
381	}
382
383	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
384		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
385	return r
386}
387
388func (r *raft) hasLeader() bool { return r.lead != None }
389
390func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
391
392func (r *raft) hardState() pb.HardState {
393	return pb.HardState{
394		Term:   r.Term,
395		Vote:   r.Vote,
396		Commit: r.raftLog.committed,
397	}
398}
399
400// send persists state to stable storage and then sends to its mailbox.
401func (r *raft) send(m pb.Message) {
402	if m.From == None {
403		m.From = r.id
404	}
405	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
406		if m.Term == 0 {
407			// All {pre-,}campaign messages need to have the term set when
408			// sending.
409			// - MsgVote: m.Term is the term the node is campaigning for,
410			//   non-zero as we increment the term when campaigning.
411			// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
412			//   granted, non-zero for the same reason MsgVote is
413			// - MsgPreVote: m.Term is the term the node will campaign,
414			//   non-zero as we use m.Term to indicate the next term we'll be
415			//   campaigning for
416			// - MsgPreVoteResp: m.Term is the term received in the original
417			//   MsgPreVote if the pre-vote was granted, non-zero for the
418			//   same reasons MsgPreVote is
419			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
420		}
421	} else {
422		if m.Term != 0 {
423			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
424		}
425		// do not attach term to MsgProp, MsgReadIndex
426		// proposals are a way to forward to the leader and
427		// should be treated as local message.
428		// MsgReadIndex is also forwarded to leader.
429		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
430			m.Term = r.Term
431		}
432	}
433	r.msgs = append(r.msgs, m)
434}
435
436// sendAppend sends an append RPC with new entries (if any) and the
437// current commit index to the given peer.
438func (r *raft) sendAppend(to uint64) {
439	r.maybeSendAppend(to, true)
440}
441
442// maybeSendAppend sends an append RPC with new entries to the given peer,
443// if necessary. Returns true if a message was sent. The sendIfEmpty
444// argument controls whether messages with no entries will be sent
445// ("empty" messages are useful to convey updated Commit indexes, but
446// are undesirable when we're sending multiple messages in a batch).
447func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
448	pr := r.prs.Progress[to]
449	if pr.IsPaused() {
450		return false
451	}
452	m := pb.Message{}
453	m.To = to
454
455	term, errt := r.raftLog.term(pr.Next - 1)
456	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
457	if len(ents) == 0 && !sendIfEmpty {
458		return false
459	}
460
461	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
462		if !pr.RecentActive {
463			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
464			return false
465		}
466
467		m.Type = pb.MsgSnap
468		snapshot, err := r.raftLog.snapshot()
469		if err != nil {
470			if err == ErrSnapshotTemporarilyUnavailable {
471				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
472				return false
473			}
474			panic(err) // TODO(bdarnell)
475		}
476		if IsEmptySnap(snapshot) {
477			panic("need non-empty snapshot")
478		}
479		m.Snapshot = snapshot
480		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
481		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
482			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
483		pr.BecomeSnapshot(sindex)
484		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
485	} else {
486		m.Type = pb.MsgApp
487		m.Index = pr.Next - 1
488		m.LogTerm = term
489		m.Entries = ents
490		m.Commit = r.raftLog.committed
491		if n := len(m.Entries); n != 0 {
492			switch pr.State {
493			// optimistically increase the next when in StateReplicate
494			case tracker.StateReplicate:
495				last := m.Entries[n-1].Index
496				pr.OptimisticUpdate(last)
497				pr.Inflights.Add(last)
498			case tracker.StateProbe:
499				pr.ProbeSent = true
500			default:
501				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
502			}
503		}
504	}
505	r.send(m)
506	return true
507}
508
509// sendHeartbeat sends a heartbeat RPC to the given peer.
510func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
511	// Attach the commit as min(to.matched, r.committed).
512	// When the leader sends out heartbeat message,
513	// the receiver(follower) might not be matched with the leader
514	// or it might not have all the committed entries.
515	// The leader MUST NOT forward the follower's commit to
516	// an unmatched index.
517	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
518	m := pb.Message{
519		To:      to,
520		Type:    pb.MsgHeartbeat,
521		Commit:  commit,
522		Context: ctx,
523	}
524
525	r.send(m)
526}
527
528// bcastAppend sends RPC, with entries to all peers that are not up-to-date
529// according to the progress recorded in r.prs.
530func (r *raft) bcastAppend() {
531	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
532		if id == r.id {
533			return
534		}
535		r.sendAppend(id)
536	})
537}
538
539// bcastHeartbeat sends RPC, without entries to all the peers.
540func (r *raft) bcastHeartbeat() {
541	lastCtx := r.readOnly.lastPendingRequestCtx()
542	if len(lastCtx) == 0 {
543		r.bcastHeartbeatWithCtx(nil)
544	} else {
545		r.bcastHeartbeatWithCtx([]byte(lastCtx))
546	}
547}
548
549func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
550	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
551		if id == r.id {
552			return
553		}
554		r.sendHeartbeat(id, ctx)
555	})
556}
557
558func (r *raft) advance(rd Ready) {
559	r.reduceUncommittedSize(rd.CommittedEntries)
560
561	// If entries were applied (or a snapshot), update our cursor for
562	// the next Ready. Note that if the current HardState contains a
563	// new Commit index, this does not mean that we're also applying
564	// all of the new entries due to commit pagination by size.
565	if newApplied := rd.appliedCursor(); newApplied > 0 {
566		oldApplied := r.raftLog.applied
567		r.raftLog.appliedTo(newApplied)
568
569		if r.prs.Config.AutoLeave && oldApplied < r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
570			// If the current (and most recent, at least for this leader's term)
571			// configuration should be auto-left, initiate that now. We use a
572			// nil Data which unmarshals into an empty ConfChangeV2 and has the
573			// benefit that appendEntry can never refuse it based on its size
574			// (which registers as zero).
575			ent := pb.Entry{
576				Type: pb.EntryConfChangeV2,
577				Data: nil,
578			}
579			// There's no way in which this proposal should be able to be rejected.
580			if !r.appendEntry(ent) {
581				panic("refused un-refusable auto-leaving ConfChangeV2")
582			}
583			r.pendingConfIndex = r.raftLog.lastIndex()
584			r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
585		}
586	}
587
588	if len(rd.Entries) > 0 {
589		e := rd.Entries[len(rd.Entries)-1]
590		r.raftLog.stableTo(e.Index, e.Term)
591	}
592	if !IsEmptySnap(rd.Snapshot) {
593		r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
594	}
595}
596
597// maybeCommit attempts to advance the commit index. Returns true if
598// the commit index changed (in which case the caller should call
599// r.bcastAppend).
600func (r *raft) maybeCommit() bool {
601	mci := r.prs.Committed()
602	return r.raftLog.maybeCommit(mci, r.Term)
603}
604
605func (r *raft) reset(term uint64) {
606	if r.Term != term {
607		r.Term = term
608		r.Vote = None
609	}
610	r.lead = None
611
612	r.electionElapsed = 0
613	r.heartbeatElapsed = 0
614	r.resetRandomizedElectionTimeout()
615
616	r.abortLeaderTransfer()
617
618	r.prs.ResetVotes()
619	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
620		*pr = tracker.Progress{
621			Match:     0,
622			Next:      r.raftLog.lastIndex() + 1,
623			Inflights: tracker.NewInflights(r.prs.MaxInflight),
624			IsLearner: pr.IsLearner,
625		}
626		if id == r.id {
627			pr.Match = r.raftLog.lastIndex()
628		}
629	})
630
631	r.pendingConfIndex = 0
632	r.uncommittedSize = 0
633	r.readOnly = newReadOnly(r.readOnly.option)
634}
635
636func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
637	li := r.raftLog.lastIndex()
638	for i := range es {
639		es[i].Term = r.Term
640		es[i].Index = li + 1 + uint64(i)
641	}
642	// Track the size of this uncommitted proposal.
643	if !r.increaseUncommittedSize(es) {
644		r.logger.Debugf(
645			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
646			r.id,
647		)
648		// Drop the proposal.
649		return false
650	}
651	// use latest "last" index after truncate/append
652	li = r.raftLog.append(es...)
653	r.prs.Progress[r.id].MaybeUpdate(li)
654	// Regardless of maybeCommit's return, our caller will call bcastAppend.
655	r.maybeCommit()
656	return true
657}
658
659// tickElection is run by followers and candidates after r.electionTimeout.
660func (r *raft) tickElection() {
661	r.electionElapsed++
662
663	if r.promotable() && r.pastElectionTimeout() {
664		r.electionElapsed = 0
665		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
666	}
667}
668
669// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
670func (r *raft) tickHeartbeat() {
671	r.heartbeatElapsed++
672	r.electionElapsed++
673
674	if r.electionElapsed >= r.electionTimeout {
675		r.electionElapsed = 0
676		if r.checkQuorum {
677			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
678		}
679		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
680		if r.state == StateLeader && r.leadTransferee != None {
681			r.abortLeaderTransfer()
682		}
683	}
684
685	if r.state != StateLeader {
686		return
687	}
688
689	if r.heartbeatElapsed >= r.heartbeatTimeout {
690		r.heartbeatElapsed = 0
691		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
692	}
693}
694
695func (r *raft) becomeFollower(term uint64, lead uint64) {
696	r.step = stepFollower
697	r.reset(term)
698	r.tick = r.tickElection
699	r.lead = lead
700	r.state = StateFollower
701	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
702}
703
704func (r *raft) becomeCandidate() {
705	// TODO(xiangli) remove the panic when the raft implementation is stable
706	if r.state == StateLeader {
707		panic("invalid transition [leader -> candidate]")
708	}
709	r.step = stepCandidate
710	r.reset(r.Term + 1)
711	r.tick = r.tickElection
712	r.Vote = r.id
713	r.state = StateCandidate
714	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
715}
716
717func (r *raft) becomePreCandidate() {
718	// TODO(xiangli) remove the panic when the raft implementation is stable
719	if r.state == StateLeader {
720		panic("invalid transition [leader -> pre-candidate]")
721	}
722	// Becoming a pre-candidate changes our step functions and state,
723	// but doesn't change anything else. In particular it does not increase
724	// r.Term or change r.Vote.
725	r.step = stepCandidate
726	r.prs.ResetVotes()
727	r.tick = r.tickElection
728	r.lead = None
729	r.state = StatePreCandidate
730	r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
731}
732
733func (r *raft) becomeLeader() {
734	// TODO(xiangli) remove the panic when the raft implementation is stable
735	if r.state == StateFollower {
736		panic("invalid transition [follower -> leader]")
737	}
738	r.step = stepLeader
739	r.reset(r.Term)
740	r.tick = r.tickHeartbeat
741	r.lead = r.id
742	r.state = StateLeader
743	// Followers enter replicate mode when they've been successfully probed
744	// (perhaps after having received a snapshot as a result). The leader is
745	// trivially in this state. Note that r.reset() has initialized this
746	// progress with the last index already.
747	r.prs.Progress[r.id].BecomeReplicate()
748
749	// Conservatively set the pendingConfIndex to the last index in the
750	// log. There may or may not be a pending config change, but it's
751	// safe to delay any future proposals until we commit all our
752	// pending log entries, and scanning the entire tail of the log
753	// could be expensive.
754	r.pendingConfIndex = r.raftLog.lastIndex()
755
756	emptyEnt := pb.Entry{Data: nil}
757	if !r.appendEntry(emptyEnt) {
758		// This won't happen because we just called reset() above.
759		r.logger.Panic("empty entry was dropped")
760	}
761	// As a special case, don't count the initial empty entry towards the
762	// uncommitted log quota. This is because we want to preserve the
763	// behavior of allowing one entry larger than quota if the current
764	// usage is zero.
765	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
766	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
767}
768
769// campaign transitions the raft instance to candidate state. This must only be
770// called after verifying that this is a legitimate transition.
771func (r *raft) campaign(t CampaignType) {
772	if !r.promotable() {
773		// This path should not be hit (callers are supposed to check), but
774		// better safe than sorry.
775		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
776	}
777	var term uint64
778	var voteMsg pb.MessageType
779	if t == campaignPreElection {
780		r.becomePreCandidate()
781		voteMsg = pb.MsgPreVote
782		// PreVote RPCs are sent for the next term before we've incremented r.Term.
783		term = r.Term + 1
784	} else {
785		r.becomeCandidate()
786		voteMsg = pb.MsgVote
787		term = r.Term
788	}
789	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
790		// We won the election after voting for ourselves (which must mean that
791		// this is a single-node cluster). Advance to the next state.
792		if t == campaignPreElection {
793			r.campaign(campaignElection)
794		} else {
795			r.becomeLeader()
796		}
797		return
798	}
799	var ids []uint64
800	{
801		idMap := r.prs.Voters.IDs()
802		ids = make([]uint64, 0, len(idMap))
803		for id := range idMap {
804			ids = append(ids, id)
805		}
806		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
807	}
808	for _, id := range ids {
809		if id == r.id {
810			continue
811		}
812		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
813			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
814
815		var ctx []byte
816		if t == campaignTransfer {
817			ctx = []byte(t)
818		}
819		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
820	}
821}
822
823func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
824	if v {
825		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
826	} else {
827		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
828	}
829	r.prs.RecordVote(id, v)
830	return r.prs.TallyVotes()
831}
832
833func (r *raft) Step(m pb.Message) error {
834	// Handle the message term, which may result in our stepping down to a follower.
835	switch {
836	case m.Term == 0:
837		// local message
838	case m.Term > r.Term:
839		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
840			force := bytes.Equal(m.Context, []byte(campaignTransfer))
841			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
842			if !force && inLease {
843				// If a server receives a RequestVote request within the minimum election timeout
844				// of hearing from a current leader, it does not update its term or grant its vote
845				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
846					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
847				return nil
848			}
849		}
850		switch {
851		case m.Type == pb.MsgPreVote:
852			// Never change our term in response to a PreVote
853		case m.Type == pb.MsgPreVoteResp && !m.Reject:
854			// We send pre-vote requests with a term in our future. If the
855			// pre-vote is granted, we will increment our term when we get a
856			// quorum. If it is not, the term comes from the node that
857			// rejected our vote so we should become a follower at the new
858			// term.
859		default:
860			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
861				r.id, r.Term, m.Type, m.From, m.Term)
862			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
863				r.becomeFollower(m.Term, m.From)
864			} else {
865				r.becomeFollower(m.Term, None)
866			}
867		}
868
869	case m.Term < r.Term:
870		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
871			// We have received messages from a leader at a lower term. It is possible
872			// that these messages were simply delayed in the network, but this could
873			// also mean that this node has advanced its term number during a network
874			// partition, and it is now unable to either win an election or to rejoin
875			// the majority on the old term. If checkQuorum is false, this will be
876			// handled by incrementing term numbers in response to MsgVote with a
877			// higher term, but if checkQuorum is true we may not advance the term on
878			// MsgVote and must generate other messages to advance the term. The net
879			// result of these two features is to minimize the disruption caused by
880			// nodes that have been removed from the cluster's configuration: a
881			// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
882			// but it will not receive MsgApp or MsgHeartbeat, so it will not create
883			// disruptive term increases, by notifying leader of this node's activeness.
884			// The above comments also true for Pre-Vote
885			//
886			// When follower gets isolated, it soon starts an election ending
887			// up with a higher term than leader, although it won't receive enough
888			// votes to win the election. When it regains connectivity, this response
889			// with "pb.MsgAppResp" of higher term would force leader to step down.
890			// However, this disruption is inevitable to free this stuck node with
891			// fresh election. This can be prevented with Pre-Vote phase.
892			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
893		} else if m.Type == pb.MsgPreVote {
894			// Before Pre-Vote enable, there may have candidate with higher term,
895			// but less log. After update to Pre-Vote, the cluster may deadlock if
896			// we drop messages with a lower term.
897			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
898				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
899			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
900		} else {
901			// ignore other cases
902			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
903				r.id, r.Term, m.Type, m.From, m.Term)
904		}
905		return nil
906	}
907
908	switch m.Type {
909	case pb.MsgHup:
910		if r.state != StateLeader {
911			if !r.promotable() {
912				r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
913				return nil
914			}
915			ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
916			if err != nil {
917				r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
918			}
919			if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
920				r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
921				return nil
922			}
923
924			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
925			if r.preVote {
926				r.campaign(campaignPreElection)
927			} else {
928				r.campaign(campaignElection)
929			}
930		} else {
931			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
932		}
933
934	case pb.MsgVote, pb.MsgPreVote:
935		// We can vote if this is a repeat of a vote we've already cast...
936		canVote := r.Vote == m.From ||
937			// ...we haven't voted and we don't think there's a leader yet in this term...
938			(r.Vote == None && r.lead == None) ||
939			// ...or this is a PreVote for a future term...
940			(m.Type == pb.MsgPreVote && m.Term > r.Term)
941		// ...and we believe the candidate is up to date.
942		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
943			// Note: it turns out that that learners must be allowed to cast votes.
944			// This seems counter- intuitive but is necessary in the situation in which
945			// a learner has been promoted (i.e. is now a voter) but has not learned
946			// about this yet.
947			// For example, consider a group in which id=1 is a learner and id=2 and
948			// id=3 are voters. A configuration change promoting 1 can be committed on
949			// the quorum `{2,3}` without the config change being appended to the
950			// learner's log. If the leader (say 2) fails, there are de facto two
951			// voters remaining. Only 3 can win an election (due to its log containing
952			// all committed entries), but to do so it will need 1 to vote. But 1
953			// considers itself a learner and will continue to do so until 3 has
954			// stepped up as leader, replicates the conf change to 1, and 1 applies it.
955			// Ultimately, by receiving a request to vote, the learner realizes that
956			// the candidate believes it to be a voter, and that it should act
957			// accordingly. The candidate's config may be stale, too; but in that case
958			// it won't win the election, at least in the absence of the bug discussed
959			// in:
960			// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
961			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
962				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
963			// When responding to Msg{Pre,}Vote messages we include the term
964			// from the message, not the local term. To see why, consider the
965			// case where a single node was previously partitioned away and
966			// it's local term is now out of date. If we include the local term
967			// (recall that for pre-votes we don't update the local term), the
968			// (pre-)campaigning node on the other end will proceed to ignore
969			// the message (it ignores all out of date messages).
970			// The term in the original message and current local term are the
971			// same in the case of regular votes, but different for pre-votes.
972			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
973			if m.Type == pb.MsgVote {
974				// Only record real votes.
975				r.electionElapsed = 0
976				r.Vote = m.From
977			}
978		} else {
979			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
980				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
981			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
982		}
983
984	default:
985		err := r.step(r, m)
986		if err != nil {
987			return err
988		}
989	}
990	return nil
991}
992
993type stepFunc func(r *raft, m pb.Message) error
994
995func stepLeader(r *raft, m pb.Message) error {
996	// These message types do not require any progress for m.From.
997	switch m.Type {
998	case pb.MsgBeat:
999		r.bcastHeartbeat()
1000		return nil
1001	case pb.MsgCheckQuorum:
1002		// The leader should always see itself as active. As a precaution, handle
1003		// the case in which the leader isn't in the configuration any more (for
1004		// example if it just removed itself).
1005		//
1006		// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
1007		// leader steps down when removing itself. I might be missing something.
1008		if pr := r.prs.Progress[r.id]; pr != nil {
1009			pr.RecentActive = true
1010		}
1011		if !r.prs.QuorumActive() {
1012			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
1013			r.becomeFollower(r.Term, None)
1014		}
1015		// Mark everyone (but ourselves) as inactive in preparation for the next
1016		// CheckQuorum.
1017		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1018			if id != r.id {
1019				pr.RecentActive = false
1020			}
1021		})
1022		return nil
1023	case pb.MsgProp:
1024		if len(m.Entries) == 0 {
1025			r.logger.Panicf("%x stepped empty MsgProp", r.id)
1026		}
1027		if r.prs.Progress[r.id] == nil {
1028			// If we are not currently a member of the range (i.e. this node
1029			// was removed from the configuration while serving as leader),
1030			// drop any new proposals.
1031			return ErrProposalDropped
1032		}
1033		if r.leadTransferee != None {
1034			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
1035			return ErrProposalDropped
1036		}
1037
1038		for i := range m.Entries {
1039			e := &m.Entries[i]
1040			var cc pb.ConfChangeI
1041			if e.Type == pb.EntryConfChange {
1042				var ccc pb.ConfChange
1043				if err := ccc.Unmarshal(e.Data); err != nil {
1044					panic(err)
1045				}
1046				cc = ccc
1047			} else if e.Type == pb.EntryConfChangeV2 {
1048				var ccc pb.ConfChangeV2
1049				if err := ccc.Unmarshal(e.Data); err != nil {
1050					panic(err)
1051				}
1052				cc = ccc
1053			}
1054			if cc != nil {
1055				alreadyPending := r.pendingConfIndex > r.raftLog.applied
1056				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
1057				wantsLeaveJoint := len(cc.AsV2().Changes) == 0
1058
1059				var refused string
1060				if alreadyPending {
1061					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
1062				} else if alreadyJoint && !wantsLeaveJoint {
1063					refused = "must transition out of joint config first"
1064				} else if !alreadyJoint && wantsLeaveJoint {
1065					refused = "not in joint state; refusing empty conf change"
1066				}
1067
1068				if refused != "" {
1069					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
1070					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
1071				} else {
1072					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
1073				}
1074			}
1075		}
1076
1077		if !r.appendEntry(m.Entries...) {
1078			return ErrProposalDropped
1079		}
1080		r.bcastAppend()
1081		return nil
1082	case pb.MsgReadIndex:
1083		// only one voting member (the leader) in the cluster
1084		if r.prs.IsSingleton() {
1085			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1086				r.send(resp)
1087			}
1088			return nil
1089		}
1090
1091		// Reject read only request when this leader has not committed any log entry at its term.
1092		if !r.committedEntryInCurrentTerm() {
1093			return nil
1094		}
1095
1096		// thinking: use an interally defined context instead of the user given context.
1097		// We can express this in terms of the term and index instead of a user-supplied value.
1098		// This would allow multiple reads to piggyback on the same message.
1099		switch r.readOnly.option {
1100		// If more than the local vote is needed, go through a full broadcast.
1101		case ReadOnlySafe:
1102			r.readOnly.addRequest(r.raftLog.committed, m)
1103			// The local node automatically acks the request.
1104			r.readOnly.recvAck(r.id, m.Entries[0].Data)
1105			r.bcastHeartbeatWithCtx(m.Entries[0].Data)
1106		case ReadOnlyLeaseBased:
1107			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1108				r.send(resp)
1109			}
1110		}
1111		return nil
1112	}
1113
1114	// All other message types require a progress for m.From (pr).
1115	pr := r.prs.Progress[m.From]
1116	if pr == nil {
1117		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1118		return nil
1119	}
1120	switch m.Type {
1121	case pb.MsgAppResp:
1122		pr.RecentActive = true
1123
1124		if m.Reject {
1125			r.logger.Debugf("%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d",
1126				r.id, m.RejectHint, m.From, m.Index)
1127			if pr.MaybeDecrTo(m.Index, m.RejectHint) {
1128				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
1129				if pr.State == tracker.StateReplicate {
1130					pr.BecomeProbe()
1131				}
1132				r.sendAppend(m.From)
1133			}
1134		} else {
1135			oldPaused := pr.IsPaused()
1136			if pr.MaybeUpdate(m.Index) {
1137				switch {
1138				case pr.State == tracker.StateProbe:
1139					pr.BecomeReplicate()
1140				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
1141					// TODO(tbg): we should also enter this branch if a snapshot is
1142					// received that is below pr.PendingSnapshot but which makes it
1143					// possible to use the log again.
1144					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1145					// Transition back to replicating state via probing state
1146					// (which takes the snapshot into account). If we didn't
1147					// move to replicating state, that would only happen with
1148					// the next round of appends (but there may not be a next
1149					// round for a while, exposing an inconsistent RaftStatus).
1150					pr.BecomeProbe()
1151					pr.BecomeReplicate()
1152				case pr.State == tracker.StateReplicate:
1153					pr.Inflights.FreeLE(m.Index)
1154				}
1155
1156				if r.maybeCommit() {
1157					r.bcastAppend()
1158				} else if oldPaused {
1159					// If we were paused before, this node may be missing the
1160					// latest commit index, so send it.
1161					r.sendAppend(m.From)
1162				}
1163				// We've updated flow control information above, which may
1164				// allow us to send multiple (size-limited) in-flight messages
1165				// at once (such as when transitioning from probe to
1166				// replicate, or when freeTo() covers multiple messages). If
1167				// we have more entries to send, send as many messages as we
1168				// can (without sending empty messages for the commit index)
1169				for r.maybeSendAppend(m.From, false) {
1170				}
1171				// Transfer leadership is in progress.
1172				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1173					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1174					r.sendTimeoutNow(m.From)
1175				}
1176			}
1177		}
1178	case pb.MsgHeartbeatResp:
1179		pr.RecentActive = true
1180		pr.ProbeSent = false
1181
1182		// free one slot for the full inflights window to allow progress.
1183		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
1184			pr.Inflights.FreeFirstOne()
1185		}
1186		if pr.Match < r.raftLog.lastIndex() {
1187			r.sendAppend(m.From)
1188		}
1189
1190		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1191			return nil
1192		}
1193
1194		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
1195			return nil
1196		}
1197
1198		rss := r.readOnly.advance(m)
1199		for _, rs := range rss {
1200			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
1201				r.send(resp)
1202			}
1203		}
1204	case pb.MsgSnapStatus:
1205		if pr.State != tracker.StateSnapshot {
1206			return nil
1207		}
1208		// TODO(tbg): this code is very similar to the snapshot handling in
1209		// MsgAppResp above. In fact, the code there is more correct than the
1210		// code here and should likely be updated to match (or even better, the
1211		// logic pulled into a newly created Progress state machine handler).
1212		if !m.Reject {
1213			pr.BecomeProbe()
1214			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1215		} else {
1216			// NB: the order here matters or we'll be probing erroneously from
1217			// the snapshot index, but the snapshot never applied.
1218			pr.PendingSnapshot = 0
1219			pr.BecomeProbe()
1220			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1221		}
1222		// If snapshot finish, wait for the MsgAppResp from the remote node before sending
1223		// out the next MsgApp.
1224		// If snapshot failure, wait for a heartbeat interval before next try
1225		pr.ProbeSent = true
1226	case pb.MsgUnreachable:
1227		// During optimistic replication, if the remote becomes unreachable,
1228		// there is huge probability that a MsgApp is lost.
1229		if pr.State == tracker.StateReplicate {
1230			pr.BecomeProbe()
1231		}
1232		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1233	case pb.MsgTransferLeader:
1234		if pr.IsLearner {
1235			r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1236			return nil
1237		}
1238		leadTransferee := m.From
1239		lastLeadTransferee := r.leadTransferee
1240		if lastLeadTransferee != None {
1241			if lastLeadTransferee == leadTransferee {
1242				r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1243					r.id, r.Term, leadTransferee, leadTransferee)
1244				return nil
1245			}
1246			r.abortLeaderTransfer()
1247			r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1248		}
1249		if leadTransferee == r.id {
1250			r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1251			return nil
1252		}
1253		// Transfer leadership to third party.
1254		r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1255		// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
1256		r.electionElapsed = 0
1257		r.leadTransferee = leadTransferee
1258		if pr.Match == r.raftLog.lastIndex() {
1259			r.sendTimeoutNow(leadTransferee)
1260			r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1261		} else {
1262			r.sendAppend(leadTransferee)
1263		}
1264	}
1265	return nil
1266}
1267
1268// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
1269// whether they respond to MsgVoteResp or MsgPreVoteResp.
1270func stepCandidate(r *raft, m pb.Message) error {
1271	// Only handle vote responses corresponding to our candidacy (while in
1272	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
1273	// our pre-candidate state).
1274	var myVoteRespType pb.MessageType
1275	if r.state == StatePreCandidate {
1276		myVoteRespType = pb.MsgPreVoteResp
1277	} else {
1278		myVoteRespType = pb.MsgVoteResp
1279	}
1280	switch m.Type {
1281	case pb.MsgProp:
1282		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1283		return ErrProposalDropped
1284	case pb.MsgApp:
1285		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1286		r.handleAppendEntries(m)
1287	case pb.MsgHeartbeat:
1288		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1289		r.handleHeartbeat(m)
1290	case pb.MsgSnap:
1291		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
1292		r.handleSnapshot(m)
1293	case myVoteRespType:
1294		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1295		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1296		switch res {
1297		case quorum.VoteWon:
1298			if r.state == StatePreCandidate {
1299				r.campaign(campaignElection)
1300			} else {
1301				r.becomeLeader()
1302				r.bcastAppend()
1303			}
1304		case quorum.VoteLost:
1305			// pb.MsgPreVoteResp contains future term of pre-candidate
1306			// m.Term > r.Term; reuse r.Term
1307			r.becomeFollower(r.Term, None)
1308		}
1309	case pb.MsgTimeoutNow:
1310		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1311	}
1312	return nil
1313}
1314
1315func stepFollower(r *raft, m pb.Message) error {
1316	switch m.Type {
1317	case pb.MsgProp:
1318		if r.lead == None {
1319			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1320			return ErrProposalDropped
1321		} else if r.disableProposalForwarding {
1322			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1323			return ErrProposalDropped
1324		}
1325		m.To = r.lead
1326		r.send(m)
1327	case pb.MsgApp:
1328		r.electionElapsed = 0
1329		r.lead = m.From
1330		r.handleAppendEntries(m)
1331	case pb.MsgHeartbeat:
1332		r.electionElapsed = 0
1333		r.lead = m.From
1334		r.handleHeartbeat(m)
1335	case pb.MsgSnap:
1336		r.electionElapsed = 0
1337		r.lead = m.From
1338		r.handleSnapshot(m)
1339	case pb.MsgTransferLeader:
1340		if r.lead == None {
1341			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1342			return nil
1343		}
1344		m.To = r.lead
1345		r.send(m)
1346	case pb.MsgTimeoutNow:
1347		if r.promotable() {
1348			r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1349			// Leadership transfers never use pre-vote even if r.preVote is true; we
1350			// know we are not recovering from a partition so there is no need for the
1351			// extra round trip.
1352			r.campaign(campaignTransfer)
1353		} else {
1354			r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
1355		}
1356	case pb.MsgReadIndex:
1357		if r.lead == None {
1358			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1359			return nil
1360		}
1361		m.To = r.lead
1362		r.send(m)
1363	case pb.MsgReadIndexResp:
1364		if len(m.Entries) != 1 {
1365			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1366			return nil
1367		}
1368		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1369	}
1370	return nil
1371}
1372
1373func (r *raft) handleAppendEntries(m pb.Message) {
1374	if m.Index < r.raftLog.committed {
1375		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1376		return
1377	}
1378
1379	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1380		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1381	} else {
1382		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
1383			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1384		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
1385	}
1386}
1387
1388func (r *raft) handleHeartbeat(m pb.Message) {
1389	r.raftLog.commitTo(m.Commit)
1390	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1391}
1392
1393func (r *raft) handleSnapshot(m pb.Message) {
1394	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
1395	if r.restore(m.Snapshot) {
1396		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1397			r.id, r.raftLog.committed, sindex, sterm)
1398		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1399	} else {
1400		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1401			r.id, r.raftLog.committed, sindex, sterm)
1402		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1403	}
1404}
1405
1406// restore recovers the state machine from a snapshot. It restores the log and the
1407// configuration of state machine. If this method returns false, the snapshot was
1408// ignored, either because it was obsolete or because of an error.
1409func (r *raft) restore(s pb.Snapshot) bool {
1410	if s.Metadata.Index <= r.raftLog.committed {
1411		return false
1412	}
1413	if r.state != StateFollower {
1414		// This is defense-in-depth: if the leader somehow ended up applying a
1415		// snapshot, it could move into a new term without moving into a
1416		// follower state. This should never fire, but if it did, we'd have
1417		// prevented damage by returning early, so log only a loud warning.
1418		//
1419		// At the time of writing, the instance is guaranteed to be in follower
1420		// state when this method is called.
1421		r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1422		r.becomeFollower(r.Term+1, None)
1423		return false
1424	}
1425
1426	// More defense-in-depth: throw away snapshot if recipient is not in the
1427	// config. This shouldn't ever happen (at the time of writing) but lots of
1428	// code here and there assumes that r.id is in the progress tracker.
1429	found := false
1430	cs := s.Metadata.ConfState
1431	for _, set := range [][]uint64{
1432		cs.Voters,
1433		cs.Learners,
1434	} {
1435		for _, id := range set {
1436			if id == r.id {
1437				found = true
1438				break
1439			}
1440		}
1441	}
1442	if !found {
1443		r.logger.Warningf(
1444			"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1445			r.id, cs,
1446		)
1447		return false
1448	}
1449
1450	// Now go ahead and actually restore.
1451
1452	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
1453		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1454			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1455		r.raftLog.commitTo(s.Metadata.Index)
1456		return false
1457	}
1458
1459	r.raftLog.restore(s)
1460
1461	// Reset the configuration and add the (potentially updated) peers in anew.
1462	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
1463	cfg, prs, err := confchange.Restore(confchange.Changer{
1464		Tracker:   r.prs,
1465		LastIndex: r.raftLog.lastIndex(),
1466	}, cs)
1467
1468	if err != nil {
1469		// This should never happen. Either there's a bug in our config change
1470		// handling or the client corrupted the conf change.
1471		panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
1472	}
1473
1474	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
1475
1476	pr := r.prs.Progress[r.id]
1477	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
1478
1479	r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1480		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1481	return true
1482}
1483
1484// promotable indicates whether state machine can be promoted to leader,
1485// which is true when its own id is in progress list.
1486func (r *raft) promotable() bool {
1487	pr := r.prs.Progress[r.id]
1488	return pr != nil && !pr.IsLearner
1489}
1490
1491func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
1492	cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
1493		changer := confchange.Changer{
1494			Tracker:   r.prs,
1495			LastIndex: r.raftLog.lastIndex(),
1496		}
1497		if cc.LeaveJoint() {
1498			return changer.LeaveJoint()
1499		} else if autoLeave, ok := cc.EnterJoint(); ok {
1500			return changer.EnterJoint(autoLeave, cc.Changes...)
1501		}
1502		return changer.Simple(cc.Changes...)
1503	}()
1504
1505	if err != nil {
1506		// TODO(tbg): return the error to the caller.
1507		panic(err)
1508	}
1509
1510	return r.switchToConfig(cfg, prs)
1511}
1512
1513// switchToConfig reconfigures this node to use the provided configuration. It
1514// updates the in-memory state and, when necessary, carries out additional
1515// actions such as reacting to the removal of nodes or changed quorum
1516// requirements.
1517//
1518// The inputs usually result from restoring a ConfState or applying a ConfChange.
1519func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
1520	r.prs.Config = cfg
1521	r.prs.Progress = prs
1522
1523	r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
1524	cs := r.prs.ConfState()
1525	pr, ok := r.prs.Progress[r.id]
1526
1527	// Update whether the node itself is a learner, resetting to false when the
1528	// node is removed.
1529	r.isLearner = ok && pr.IsLearner
1530
1531	if (!ok || r.isLearner) && r.state == StateLeader {
1532		// This node is leader and was removed or demoted. We prevent demotions
1533		// at the time writing but hypothetically we handle them the same way as
1534		// removing the leader: stepping down into the next Term.
1535		//
1536		// TODO(tbg): step down (for sanity) and ask follower with largest Match
1537		// to TimeoutNow (to avoid interruption). This might still drop some
1538		// proposals but it's better than nothing.
1539		//
1540		// TODO(tbg): test this branch. It is untested at the time of writing.
1541		return cs
1542	}
1543
1544	// The remaining steps only make sense if this node is the leader and there
1545	// are other nodes.
1546	if r.state != StateLeader || len(cs.Voters) == 0 {
1547		return cs
1548	}
1549
1550	if r.maybeCommit() {
1551		// If the configuration change means that more entries are committed now,
1552		// broadcast/append to everyone in the updated config.
1553		r.bcastAppend()
1554	} else {
1555		// Otherwise, still probe the newly added replicas; there's no reason to
1556		// let them wait out a heartbeat interval (or the next incoming
1557		// proposal).
1558		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1559			r.maybeSendAppend(id, false /* sendIfEmpty */)
1560		})
1561	}
1562	// If the the leadTransferee was removed or demoted, abort the leadership transfer.
1563	if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
1564		r.abortLeaderTransfer()
1565	}
1566
1567	return cs
1568}
1569
1570func (r *raft) loadState(state pb.HardState) {
1571	if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
1572		r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
1573	}
1574	r.raftLog.committed = state.Commit
1575	r.Term = state.Term
1576	r.Vote = state.Vote
1577}
1578
1579// pastElectionTimeout returns true iff r.electionElapsed is greater
1580// than or equal to the randomized election timeout in
1581// [electiontimeout, 2 * electiontimeout - 1].
1582func (r *raft) pastElectionTimeout() bool {
1583	return r.electionElapsed >= r.randomizedElectionTimeout
1584}
1585
1586func (r *raft) resetRandomizedElectionTimeout() {
1587	r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
1588}
1589
1590func (r *raft) sendTimeoutNow(to uint64) {
1591	r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
1592}
1593
1594func (r *raft) abortLeaderTransfer() {
1595	r.leadTransferee = None
1596}
1597
1598// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
1599func (r *raft) committedEntryInCurrentTerm() bool {
1600	return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
1601}
1602
1603// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
1604// itself, a blank value will be returned.
1605func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
1606	if req.From == None || req.From == r.id {
1607		r.readStates = append(r.readStates, ReadState{
1608			Index:      readIndex,
1609			RequestCtx: req.Entries[0].Data,
1610		})
1611		return pb.Message{}
1612	}
1613	return pb.Message{
1614		Type:    pb.MsgReadIndexResp,
1615		To:      req.From,
1616		Index:   readIndex,
1617		Entries: req.Entries,
1618	}
1619}
1620
1621// increaseUncommittedSize computes the size of the proposed entries and
1622// determines whether they would push leader over its maxUncommittedSize limit.
1623// If the new entries would exceed the limit, the method returns false. If not,
1624// the increase in uncommitted entry size is recorded and the method returns
1625// true.
1626//
1627// Empty payloads are never refused. This is used both for appending an empty
1628// entry at a new leader's term, as well as leaving a joint configuration.
1629func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
1630	var s uint64
1631	for _, e := range ents {
1632		s += uint64(PayloadSize(e))
1633	}
1634
1635	if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
1636		// If the uncommitted tail of the Raft log is empty, allow any size
1637		// proposal. Otherwise, limit the size of the uncommitted tail of the
1638		// log and drop any proposal that would push the size over the limit.
1639		// Note the added requirement s>0 which is used to make sure that
1640		// appending single empty entries to the log always succeeds, used both
1641		// for replicating a new leader's initial empty entry, and for
1642		// auto-leaving joint configurations.
1643		return false
1644	}
1645	r.uncommittedSize += s
1646	return true
1647}
1648
1649// reduceUncommittedSize accounts for the newly committed entries by decreasing
1650// the uncommitted entry size limit.
1651func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
1652	if r.uncommittedSize == 0 {
1653		// Fast-path for followers, who do not track or enforce the limit.
1654		return
1655	}
1656
1657	var s uint64
1658	for _, e := range ents {
1659		s += uint64(PayloadSize(e))
1660	}
1661	if s > r.uncommittedSize {
1662		// uncommittedSize may underestimate the size of the uncommitted Raft
1663		// log tail but will never overestimate it. Saturate at 0 instead of
1664		// allowing overflow.
1665		r.uncommittedSize = 0
1666	} else {
1667		r.uncommittedSize -= s
1668	}
1669}
1670
1671func numOfPendingConf(ents []pb.Entry) int {
1672	n := 0
1673	for i := range ents {
1674		if ents[i].Type == pb.EntryConfChange {
1675			n++
1676		}
1677	}
1678	return n
1679}
1680