1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15/*
16This file contains tests which verify that the scenarios described
17in the raft paper (https://raft.github.io/raft.pdf) are
18handled by the raft implementation correctly. Each test focuses on
19several sentences written in the paper. This could help us to prevent
20most implementation bugs.
21
22Each test is composed of three parts: init, test and check.
23Init part uses simple and understandable way to simulate the init state.
24Test part uses Step function to generate the scenario. Check part checks
25outgoing messages and state.
26*/
27package raft
28
29import (
30	"fmt"
31	"reflect"
32	"sort"
33	"testing"
34
35	pb "go.etcd.io/etcd/raft/raftpb"
36)
37
38func TestFollowerUpdateTermFromMessage(t *testing.T) {
39	testUpdateTermFromMessage(t, StateFollower)
40}
41func TestCandidateUpdateTermFromMessage(t *testing.T) {
42	testUpdateTermFromMessage(t, StateCandidate)
43}
44func TestLeaderUpdateTermFromMessage(t *testing.T) {
45	testUpdateTermFromMessage(t, StateLeader)
46}
47
48// testUpdateTermFromMessage tests that if one server’s current term is
49// smaller than the other’s, then it updates its current term to the larger
50// value. If a candidate or leader discovers that its term is out of date,
51// it immediately reverts to follower state.
52// Reference: section 5.1
53func testUpdateTermFromMessage(t *testing.T, state StateType) {
54	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
55	switch state {
56	case StateFollower:
57		r.becomeFollower(1, 2)
58	case StateCandidate:
59		r.becomeCandidate()
60	case StateLeader:
61		r.becomeCandidate()
62		r.becomeLeader()
63	}
64
65	r.Step(pb.Message{Type: pb.MsgApp, Term: 2})
66
67	if r.Term != 2 {
68		t.Errorf("term = %d, want %d", r.Term, 2)
69	}
70	if r.state != StateFollower {
71		t.Errorf("state = %v, want %v", r.state, StateFollower)
72	}
73}
74
75// TestRejectStaleTermMessage tests that if a server receives a request with
76// a stale term number, it rejects the request.
77// Our implementation ignores the request instead.
78// Reference: section 5.1
79func TestRejectStaleTermMessage(t *testing.T) {
80	called := false
81	fakeStep := func(r *raft, m pb.Message) error {
82		called = true
83		return nil
84	}
85	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
86	r.step = fakeStep
87	r.loadState(pb.HardState{Term: 2})
88
89	r.Step(pb.Message{Type: pb.MsgApp, Term: r.Term - 1})
90
91	if called {
92		t.Errorf("stepFunc called = %v, want %v", called, false)
93	}
94}
95
96// TestStartAsFollower tests that when servers start up, they begin as followers.
97// Reference: section 5.2
98func TestStartAsFollower(t *testing.T) {
99	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
100	if r.state != StateFollower {
101		t.Errorf("state = %s, want %s", r.state, StateFollower)
102	}
103}
104
105// TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
106// it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
107// as heartbeat to all followers.
108// Reference: section 5.2
109func TestLeaderBcastBeat(t *testing.T) {
110	// heartbeat interval
111	hi := 1
112	r := newTestRaft(1, []uint64{1, 2, 3}, 10, hi, NewMemoryStorage())
113	r.becomeCandidate()
114	r.becomeLeader()
115	for i := 0; i < 10; i++ {
116		mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
117	}
118
119	for i := 0; i < hi; i++ {
120		r.tick()
121	}
122
123	msgs := r.readMessages()
124	sort.Sort(messageSlice(msgs))
125	wmsgs := []pb.Message{
126		{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
127		{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
128	}
129	if !reflect.DeepEqual(msgs, wmsgs) {
130		t.Errorf("msgs = %v, want %v", msgs, wmsgs)
131	}
132}
133
134func TestFollowerStartElection(t *testing.T) {
135	testNonleaderStartElection(t, StateFollower)
136}
137func TestCandidateStartNewElection(t *testing.T) {
138	testNonleaderStartElection(t, StateCandidate)
139}
140
141// testNonleaderStartElection tests that if a follower receives no communication
142// over election timeout, it begins an election to choose a new leader. It
143// increments its current term and transitions to candidate state. It then
144// votes for itself and issues RequestVote RPCs in parallel to each of the
145// other servers in the cluster.
146// Reference: section 5.2
147// Also if a candidate fails to obtain a majority, it will time out and
148// start a new election by incrementing its term and initiating another
149// round of RequestVote RPCs.
150// Reference: section 5.2
151func testNonleaderStartElection(t *testing.T, state StateType) {
152	// election timeout
153	et := 10
154	r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
155	switch state {
156	case StateFollower:
157		r.becomeFollower(1, 2)
158	case StateCandidate:
159		r.becomeCandidate()
160	}
161
162	for i := 1; i < 2*et; i++ {
163		r.tick()
164	}
165
166	if r.Term != 2 {
167		t.Errorf("term = %d, want 2", r.Term)
168	}
169	if r.state != StateCandidate {
170		t.Errorf("state = %s, want %s", r.state, StateCandidate)
171	}
172	if !r.prs.Votes[r.id] {
173		t.Errorf("vote for self = false, want true")
174	}
175	msgs := r.readMessages()
176	sort.Sort(messageSlice(msgs))
177	wmsgs := []pb.Message{
178		{From: 1, To: 2, Term: 2, Type: pb.MsgVote},
179		{From: 1, To: 3, Term: 2, Type: pb.MsgVote},
180	}
181	if !reflect.DeepEqual(msgs, wmsgs) {
182		t.Errorf("msgs = %v, want %v", msgs, wmsgs)
183	}
184}
185
186// TestLeaderElectionInOneRoundRPC tests all cases that may happen in
187// leader election during one round of RequestVote RPC:
188// a) it wins the election
189// b) it loses the election
190// c) it is unclear about the result
191// Reference: section 5.2
192func TestLeaderElectionInOneRoundRPC(t *testing.T) {
193	tests := []struct {
194		size  int
195		votes map[uint64]bool
196		state StateType
197	}{
198		// win the election when receiving votes from a majority of the servers
199		{1, map[uint64]bool{}, StateLeader},
200		{3, map[uint64]bool{2: true, 3: true}, StateLeader},
201		{3, map[uint64]bool{2: true}, StateLeader},
202		{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, StateLeader},
203		{5, map[uint64]bool{2: true, 3: true, 4: true}, StateLeader},
204		{5, map[uint64]bool{2: true, 3: true}, StateLeader},
205
206		// return to follower state if it receives vote denial from a majority
207		{3, map[uint64]bool{2: false, 3: false}, StateFollower},
208		{5, map[uint64]bool{2: false, 3: false, 4: false, 5: false}, StateFollower},
209		{5, map[uint64]bool{2: true, 3: false, 4: false, 5: false}, StateFollower},
210
211		// stay in candidate if it does not obtain the majority
212		{3, map[uint64]bool{}, StateCandidate},
213		{5, map[uint64]bool{2: true}, StateCandidate},
214		{5, map[uint64]bool{2: false, 3: false}, StateCandidate},
215		{5, map[uint64]bool{}, StateCandidate},
216	}
217	for i, tt := range tests {
218		r := newTestRaft(1, idsBySize(tt.size), 10, 1, NewMemoryStorage())
219
220		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
221		for id, vote := range tt.votes {
222			r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote})
223		}
224
225		if r.state != tt.state {
226			t.Errorf("#%d: state = %s, want %s", i, r.state, tt.state)
227		}
228		if g := r.Term; g != 1 {
229			t.Errorf("#%d: term = %d, want %d", i, g, 1)
230		}
231	}
232}
233
234// TestFollowerVote tests that each follower will vote for at most one
235// candidate in a given term, on a first-come-first-served basis.
236// Reference: section 5.2
237func TestFollowerVote(t *testing.T) {
238	tests := []struct {
239		vote    uint64
240		nvote   uint64
241		wreject bool
242	}{
243		{None, 1, false},
244		{None, 2, false},
245		{1, 1, false},
246		{2, 2, false},
247		{1, 2, true},
248		{2, 1, true},
249	}
250	for i, tt := range tests {
251		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
252		r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
253
254		r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
255
256		msgs := r.readMessages()
257		wmsgs := []pb.Message{
258			{From: 1, To: tt.nvote, Term: 1, Type: pb.MsgVoteResp, Reject: tt.wreject},
259		}
260		if !reflect.DeepEqual(msgs, wmsgs) {
261			t.Errorf("#%d: msgs = %v, want %v", i, msgs, wmsgs)
262		}
263	}
264}
265
266// TestCandidateFallback tests that while waiting for votes,
267// if a candidate receives an AppendEntries RPC from another server claiming
268// to be leader whose term is at least as large as the candidate's current term,
269// it recognizes the leader as legitimate and returns to follower state.
270// Reference: section 5.2
271func TestCandidateFallback(t *testing.T) {
272	tests := []pb.Message{
273		{From: 2, To: 1, Term: 1, Type: pb.MsgApp},
274		{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
275	}
276	for i, tt := range tests {
277		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
278		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
279		if r.state != StateCandidate {
280			t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
281		}
282
283		r.Step(tt)
284
285		if g := r.state; g != StateFollower {
286			t.Errorf("#%d: state = %s, want %s", i, g, StateFollower)
287		}
288		if g := r.Term; g != tt.Term {
289			t.Errorf("#%d: term = %d, want %d", i, g, tt.Term)
290		}
291	}
292}
293
294func TestFollowerElectionTimeoutRandomized(t *testing.T) {
295	SetLogger(discardLogger)
296	defer SetLogger(defaultLogger)
297	testNonleaderElectionTimeoutRandomized(t, StateFollower)
298}
299func TestCandidateElectionTimeoutRandomized(t *testing.T) {
300	SetLogger(discardLogger)
301	defer SetLogger(defaultLogger)
302	testNonleaderElectionTimeoutRandomized(t, StateCandidate)
303}
304
305// testNonleaderElectionTimeoutRandomized tests that election timeout for
306// follower or candidate is randomized.
307// Reference: section 5.2
308func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
309	et := 10
310	r := newTestRaft(1, []uint64{1, 2, 3}, et, 1, NewMemoryStorage())
311	timeouts := make(map[int]bool)
312	for round := 0; round < 50*et; round++ {
313		switch state {
314		case StateFollower:
315			r.becomeFollower(r.Term+1, 2)
316		case StateCandidate:
317			r.becomeCandidate()
318		}
319
320		time := 0
321		for len(r.readMessages()) == 0 {
322			r.tick()
323			time++
324		}
325		timeouts[time] = true
326	}
327
328	for d := et + 1; d < 2*et; d++ {
329		if !timeouts[d] {
330			t.Errorf("timeout in %d ticks should happen", d)
331		}
332	}
333}
334
335func TestFollowersElectionTimeoutNonconflict(t *testing.T) {
336	SetLogger(discardLogger)
337	defer SetLogger(defaultLogger)
338	testNonleadersElectionTimeoutNonconflict(t, StateFollower)
339}
340func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
341	SetLogger(discardLogger)
342	defer SetLogger(defaultLogger)
343	testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
344}
345
346// testNonleadersElectionTimeoutNonconflict tests that in most cases only a
347// single server(follower or candidate) will time out, which reduces the
348// likelihood of split vote in the new election.
349// Reference: section 5.2
350func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
351	et := 10
352	size := 5
353	rs := make([]*raft, size)
354	ids := idsBySize(size)
355	for k := range rs {
356		rs[k] = newTestRaft(ids[k], ids, et, 1, NewMemoryStorage())
357	}
358	conflicts := 0
359	for round := 0; round < 1000; round++ {
360		for _, r := range rs {
361			switch state {
362			case StateFollower:
363				r.becomeFollower(r.Term+1, None)
364			case StateCandidate:
365				r.becomeCandidate()
366			}
367		}
368
369		timeoutNum := 0
370		for timeoutNum == 0 {
371			for _, r := range rs {
372				r.tick()
373				if len(r.readMessages()) > 0 {
374					timeoutNum++
375				}
376			}
377		}
378		// several rafts time out at the same tick
379		if timeoutNum > 1 {
380			conflicts++
381		}
382	}
383
384	if g := float64(conflicts) / 1000; g > 0.3 {
385		t.Errorf("probability of conflicts = %v, want <= 0.3", g)
386	}
387}
388
389// TestLeaderStartReplication tests that when receiving client proposals,
390// the leader appends the proposal to its log as a new entry, then issues
391// AppendEntries RPCs in parallel to each of the other servers to replicate
392// the entry. Also, when sending an AppendEntries RPC, the leader includes
393// the index and term of the entry in its log that immediately precedes
394// the new entries.
395// Also, it writes the new entry into stable storage.
396// Reference: section 5.3
397func TestLeaderStartReplication(t *testing.T) {
398	s := NewMemoryStorage()
399	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
400	r.becomeCandidate()
401	r.becomeLeader()
402	commitNoopEntry(r, s)
403	li := r.raftLog.lastIndex()
404
405	ents := []pb.Entry{{Data: []byte("some data")}}
406	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents})
407
408	if g := r.raftLog.lastIndex(); g != li+1 {
409		t.Errorf("lastIndex = %d, want %d", g, li+1)
410	}
411	if g := r.raftLog.committed; g != li {
412		t.Errorf("committed = %d, want %d", g, li)
413	}
414	msgs := r.readMessages()
415	sort.Sort(messageSlice(msgs))
416	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
417	wmsgs := []pb.Message{
418		{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
419		{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
420	}
421	if !reflect.DeepEqual(msgs, wmsgs) {
422		t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
423	}
424	if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, wents) {
425		t.Errorf("ents = %+v, want %+v", g, wents)
426	}
427}
428
429// TestLeaderCommitEntry tests that when the entry has been safely replicated,
430// the leader gives out the applied entries, which can be applied to its state
431// machine.
432// Also, the leader keeps track of the highest index it knows to be committed,
433// and it includes that index in future AppendEntries RPCs so that the other
434// servers eventually find out.
435// Reference: section 5.3
436func TestLeaderCommitEntry(t *testing.T) {
437	s := NewMemoryStorage()
438	r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s)
439	r.becomeCandidate()
440	r.becomeLeader()
441	commitNoopEntry(r, s)
442	li := r.raftLog.lastIndex()
443	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
444
445	for _, m := range r.readMessages() {
446		r.Step(acceptAndReply(m))
447	}
448
449	if g := r.raftLog.committed; g != li+1 {
450		t.Errorf("committed = %d, want %d", g, li+1)
451	}
452	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
453	if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
454		t.Errorf("nextEnts = %+v, want %+v", g, wents)
455	}
456	msgs := r.readMessages()
457	sort.Sort(messageSlice(msgs))
458	for i, m := range msgs {
459		if w := uint64(i + 2); m.To != w {
460			t.Errorf("to = %x, want %x", m.To, w)
461		}
462		if m.Type != pb.MsgApp {
463			t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
464		}
465		if m.Commit != li+1 {
466			t.Errorf("commit = %d, want %d", m.Commit, li+1)
467		}
468	}
469}
470
471// TestLeaderAcknowledgeCommit tests that a log entry is committed once the
472// leader that created the entry has replicated it on a majority of the servers.
473// Reference: section 5.3
474func TestLeaderAcknowledgeCommit(t *testing.T) {
475	tests := []struct {
476		size      int
477		acceptors map[uint64]bool
478		wack      bool
479	}{
480		{1, nil, true},
481		{3, nil, false},
482		{3, map[uint64]bool{2: true}, true},
483		{3, map[uint64]bool{2: true, 3: true}, true},
484		{5, nil, false},
485		{5, map[uint64]bool{2: true}, false},
486		{5, map[uint64]bool{2: true, 3: true}, true},
487		{5, map[uint64]bool{2: true, 3: true, 4: true}, true},
488		{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
489	}
490	for i, tt := range tests {
491		s := NewMemoryStorage()
492		r := newTestRaft(1, idsBySize(tt.size), 10, 1, s)
493		r.becomeCandidate()
494		r.becomeLeader()
495		commitNoopEntry(r, s)
496		li := r.raftLog.lastIndex()
497		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
498
499		for _, m := range r.readMessages() {
500			if tt.acceptors[m.To] {
501				r.Step(acceptAndReply(m))
502			}
503		}
504
505		if g := r.raftLog.committed > li; g != tt.wack {
506			t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack)
507		}
508	}
509}
510
511// TestLeaderCommitPrecedingEntries tests that when leader commits a log entry,
512// it also commits all preceding entries in the leader’s log, including
513// entries created by previous leaders.
514// Also, it applies the entry to its local state machine (in log order).
515// Reference: section 5.3
516func TestLeaderCommitPrecedingEntries(t *testing.T) {
517	tests := [][]pb.Entry{
518		{},
519		{{Term: 2, Index: 1}},
520		{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
521		{{Term: 1, Index: 1}},
522	}
523	for i, tt := range tests {
524		storage := NewMemoryStorage()
525		storage.Append(tt)
526		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
527		r.loadState(pb.HardState{Term: 2})
528		r.becomeCandidate()
529		r.becomeLeader()
530		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
531
532		for _, m := range r.readMessages() {
533			r.Step(acceptAndReply(m))
534		}
535
536		li := uint64(len(tt))
537		wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
538		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
539			t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
540		}
541	}
542}
543
544// TestFollowerCommitEntry tests that once a follower learns that a log entry
545// is committed, it applies the entry to its local state machine (in log order).
546// Reference: section 5.3
547func TestFollowerCommitEntry(t *testing.T) {
548	tests := []struct {
549		ents   []pb.Entry
550		commit uint64
551	}{
552		{
553			[]pb.Entry{
554				{Term: 1, Index: 1, Data: []byte("some data")},
555			},
556			1,
557		},
558		{
559			[]pb.Entry{
560				{Term: 1, Index: 1, Data: []byte("some data")},
561				{Term: 1, Index: 2, Data: []byte("some data2")},
562			},
563			2,
564		},
565		{
566			[]pb.Entry{
567				{Term: 1, Index: 1, Data: []byte("some data2")},
568				{Term: 1, Index: 2, Data: []byte("some data")},
569			},
570			2,
571		},
572		{
573			[]pb.Entry{
574				{Term: 1, Index: 1, Data: []byte("some data")},
575				{Term: 1, Index: 2, Data: []byte("some data2")},
576			},
577			1,
578		},
579	}
580	for i, tt := range tests {
581		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
582		r.becomeFollower(1, 2)
583
584		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
585
586		if g := r.raftLog.committed; g != tt.commit {
587			t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
588		}
589		wents := tt.ents[:int(tt.commit)]
590		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
591			t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
592		}
593	}
594}
595
596// TestFollowerCheckMsgApp tests that if the follower does not find an
597// entry in its log with the same index and term as the one in AppendEntries RPC,
598// then it refuses the new entries. Otherwise it replies that it accepts the
599// append entries.
600// Reference: section 5.3
601func TestFollowerCheckMsgApp(t *testing.T) {
602	ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
603	tests := []struct {
604		term        uint64
605		index       uint64
606		windex      uint64
607		wreject     bool
608		wrejectHint uint64
609	}{
610		// match with committed entries
611		{0, 0, 1, false, 0},
612		{ents[0].Term, ents[0].Index, 1, false, 0},
613		// match with uncommitted entries
614		{ents[1].Term, ents[1].Index, 2, false, 0},
615
616		// unmatch with existing entry
617		{ents[0].Term, ents[1].Index, ents[1].Index, true, 2},
618		// unexisting entry
619		{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2},
620	}
621	for i, tt := range tests {
622		storage := NewMemoryStorage()
623		storage.Append(ents)
624		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
625		r.loadState(pb.HardState{Commit: 1})
626		r.becomeFollower(2, 2)
627
628		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
629
630		msgs := r.readMessages()
631		wmsgs := []pb.Message{
632			{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint},
633		}
634		if !reflect.DeepEqual(msgs, wmsgs) {
635			t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
636		}
637	}
638}
639
640// TestFollowerAppendEntries tests that when AppendEntries RPC is valid,
641// the follower will delete the existing conflict entry and all that follow it,
642// and append any new entries not already in the log.
643// Also, it writes the new entry into stable storage.
644// Reference: section 5.3
645func TestFollowerAppendEntries(t *testing.T) {
646	tests := []struct {
647		index, term uint64
648		ents        []pb.Entry
649		wents       []pb.Entry
650		wunstable   []pb.Entry
651	}{
652		{
653			2, 2,
654			[]pb.Entry{{Term: 3, Index: 3}},
655			[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
656			[]pb.Entry{{Term: 3, Index: 3}},
657		},
658		{
659			1, 1,
660			[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
661			[]pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}},
662			[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
663		},
664		{
665			0, 0,
666			[]pb.Entry{{Term: 1, Index: 1}},
667			[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
668			nil,
669		},
670		{
671			0, 0,
672			[]pb.Entry{{Term: 3, Index: 1}},
673			[]pb.Entry{{Term: 3, Index: 1}},
674			[]pb.Entry{{Term: 3, Index: 1}},
675		},
676	}
677	for i, tt := range tests {
678		storage := NewMemoryStorage()
679		storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
680		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
681		r.becomeFollower(2, 2)
682
683		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
684
685		if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) {
686			t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
687		}
688		if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, tt.wunstable) {
689			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
690		}
691	}
692}
693
694// TestLeaderSyncFollowerLog tests that the leader could bring a follower's log
695// into consistency with its own.
696// Reference: section 5.3, figure 7
697func TestLeaderSyncFollowerLog(t *testing.T) {
698	ents := []pb.Entry{
699		{},
700		{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
701		{Term: 4, Index: 4}, {Term: 4, Index: 5},
702		{Term: 5, Index: 6}, {Term: 5, Index: 7},
703		{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
704	}
705	term := uint64(8)
706	tests := [][]pb.Entry{
707		{
708			{},
709			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
710			{Term: 4, Index: 4}, {Term: 4, Index: 5},
711			{Term: 5, Index: 6}, {Term: 5, Index: 7},
712			{Term: 6, Index: 8}, {Term: 6, Index: 9},
713		},
714		{
715			{},
716			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
717			{Term: 4, Index: 4},
718		},
719		{
720			{},
721			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
722			{Term: 4, Index: 4}, {Term: 4, Index: 5},
723			{Term: 5, Index: 6}, {Term: 5, Index: 7},
724			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11},
725		},
726		{
727			{},
728			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
729			{Term: 4, Index: 4}, {Term: 4, Index: 5},
730			{Term: 5, Index: 6}, {Term: 5, Index: 7},
731			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
732			{Term: 7, Index: 11}, {Term: 7, Index: 12},
733		},
734		{
735			{},
736			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
737			{Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7},
738		},
739		{
740			{},
741			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
742			{Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6},
743			{Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11},
744		},
745	}
746	for i, tt := range tests {
747		leadStorage := NewMemoryStorage()
748		leadStorage.Append(ents)
749		lead := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, leadStorage)
750		lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
751		followerStorage := NewMemoryStorage()
752		followerStorage.Append(tt)
753		follower := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, followerStorage)
754		follower.loadState(pb.HardState{Term: term - 1})
755		// It is necessary to have a three-node cluster.
756		// The second may have more up-to-date log than the first one, so the
757		// first node needs the vote from the third node to become the leader.
758		n := newNetwork(lead, follower, nopStepper)
759		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
760		// The election occurs in the term after the one we loaded with
761		// lead.loadState above.
762		n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: term + 1})
763
764		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
765
766		if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" {
767			t.Errorf("#%d: log diff:\n%s", i, g)
768		}
769	}
770}
771
772// TestVoteRequest tests that the vote request includes information about the candidate’s log
773// and are sent to all of the other nodes.
774// Reference: section 5.4.1
775func TestVoteRequest(t *testing.T) {
776	tests := []struct {
777		ents  []pb.Entry
778		wterm uint64
779	}{
780		{[]pb.Entry{{Term: 1, Index: 1}}, 2},
781		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
782	}
783	for j, tt := range tests {
784		r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
785		r.Step(pb.Message{
786			From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
787		})
788		r.readMessages()
789
790		for i := 1; i < r.electionTimeout*2; i++ {
791			r.tickElection()
792		}
793
794		msgs := r.readMessages()
795		sort.Sort(messageSlice(msgs))
796		if len(msgs) != 2 {
797			t.Fatalf("#%d: len(msg) = %d, want %d", j, len(msgs), 2)
798		}
799		for i, m := range msgs {
800			if m.Type != pb.MsgVote {
801				t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVote)
802			}
803			if m.To != uint64(i+2) {
804				t.Errorf("#%d: to = %d, want %d", i, m.To, i+2)
805			}
806			if m.Term != tt.wterm {
807				t.Errorf("#%d: term = %d, want %d", i, m.Term, tt.wterm)
808			}
809			windex, wlogterm := tt.ents[len(tt.ents)-1].Index, tt.ents[len(tt.ents)-1].Term
810			if m.Index != windex {
811				t.Errorf("#%d: index = %d, want %d", i, m.Index, windex)
812			}
813			if m.LogTerm != wlogterm {
814				t.Errorf("#%d: logterm = %d, want %d", i, m.LogTerm, wlogterm)
815			}
816		}
817	}
818}
819
820// TestVoter tests the voter denies its vote if its own log is more up-to-date
821// than that of the candidate.
822// Reference: section 5.4.1
823func TestVoter(t *testing.T) {
824	tests := []struct {
825		ents    []pb.Entry
826		logterm uint64
827		index   uint64
828
829		wreject bool
830	}{
831		// same logterm
832		{[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false},
833		{[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false},
834		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
835		// candidate higher logterm
836		{[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false},
837		{[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false},
838		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
839		// voter higher logterm
840		{[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true},
841		{[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true},
842		{[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
843	}
844	for i, tt := range tests {
845		storage := NewMemoryStorage()
846		storage.Append(tt.ents)
847		r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
848
849		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
850
851		msgs := r.readMessages()
852		if len(msgs) != 1 {
853			t.Fatalf("#%d: len(msg) = %d, want %d", i, len(msgs), 1)
854		}
855		m := msgs[0]
856		if m.Type != pb.MsgVoteResp {
857			t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVoteResp)
858		}
859		if m.Reject != tt.wreject {
860			t.Errorf("#%d: reject = %t, want %t", i, m.Reject, tt.wreject)
861		}
862	}
863}
864
865// TestLeaderOnlyCommitsLogFromCurrentTerm tests that only log entries from the leader’s
866// current term are committed by counting replicas.
867// Reference: section 5.4.2
868func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
869	ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
870	tests := []struct {
871		index   uint64
872		wcommit uint64
873	}{
874		// do not commit log entries in previous terms
875		{1, 0},
876		{2, 0},
877		// commit log in current term
878		{3, 3},
879	}
880	for i, tt := range tests {
881		storage := NewMemoryStorage()
882		storage.Append(ents)
883		r := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
884		r.loadState(pb.HardState{Term: 2})
885		// become leader at term 3
886		r.becomeCandidate()
887		r.becomeLeader()
888		r.readMessages()
889		// propose a entry to current term
890		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
891
892		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index})
893		if r.raftLog.committed != tt.wcommit {
894			t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit)
895		}
896	}
897}
898
899type messageSlice []pb.Message
900
901func (s messageSlice) Len() int           { return len(s) }
902func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) }
903func (s messageSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
904
905func commitNoopEntry(r *raft, s *MemoryStorage) {
906	if r.state != StateLeader {
907		panic("it should only be used when it is the leader")
908	}
909	r.bcastAppend()
910	// simulate the response of MsgApp
911	msgs := r.readMessages()
912	for _, m := range msgs {
913		if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil {
914			panic("not a message to append noop entry")
915		}
916		r.Step(acceptAndReply(m))
917	}
918	// ignore further messages to refresh followers' commit index
919	r.readMessages()
920	s.Append(r.raftLog.unstableEntries())
921	r.raftLog.appliedTo(r.raftLog.committed)
922	r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
923}
924
925func acceptAndReply(m pb.Message) pb.Message {
926	if m.Type != pb.MsgApp {
927		panic("type should be MsgApp")
928	}
929	return pb.Message{
930		From:  m.To,
931		To:    m.From,
932		Term:  m.Term,
933		Type:  pb.MsgAppResp,
934		Index: m.Index + uint64(len(m.Entries)),
935	}
936}
937