1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import ( 18 "bytes" 19 "fmt" 20 "math" 21 "math/rand" 22 "reflect" 23 "strings" 24 "testing" 25 26 pb "go.etcd.io/etcd/raft/raftpb" 27 "go.etcd.io/etcd/raft/tracker" 28) 29 30// nextEnts returns the appliable entries and updates the applied index 31func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { 32 // Transfer all unstable entries to "stable" storage. 33 s.Append(r.raftLog.unstableEntries()) 34 r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) 35 36 ents = r.raftLog.nextEnts() 37 r.raftLog.appliedTo(r.raftLog.committed) 38 return ents 39} 40 41func mustAppendEntry(r *raft, ents ...pb.Entry) { 42 if !r.appendEntry(ents...) { 43 panic("entry unexpectedly dropped") 44 } 45} 46 47type stateMachine interface { 48 Step(m pb.Message) error 49 readMessages() []pb.Message 50} 51 52func (r *raft) readMessages() []pb.Message { 53 msgs := r.msgs 54 r.msgs = make([]pb.Message, 0) 55 56 return msgs 57} 58 59func TestProgressLeader(t *testing.T) { 60 r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 61 r.becomeCandidate() 62 r.becomeLeader() 63 r.prs.Progress[2].BecomeReplicate() 64 65 // Send proposals to r1. The first 5 entries should be appended to the log. 66 propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} 67 for i := 0; i < 5; i++ { 68 if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { 69 t.Errorf("unexpected progress %v", pr) 70 } 71 if err := r.Step(propMsg); err != nil { 72 t.Fatalf("proposal resulted in error: %v", err) 73 } 74 } 75} 76 77// TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. 78func TestProgressResumeByHeartbeatResp(t *testing.T) { 79 r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 80 r.becomeCandidate() 81 r.becomeLeader() 82 83 r.prs.Progress[2].ProbeSent = true 84 85 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 86 if !r.prs.Progress[2].ProbeSent { 87 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) 88 } 89 90 r.prs.Progress[2].BecomeReplicate() 91 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) 92 if r.prs.Progress[2].ProbeSent { 93 t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent) 94 } 95} 96 97func TestProgressPaused(t *testing.T) { 98 r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 99 r.becomeCandidate() 100 r.becomeLeader() 101 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 102 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 103 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 104 105 ms := r.readMessages() 106 if len(ms) != 1 { 107 t.Errorf("len(ms) = %d, want 1", len(ms)) 108 } 109} 110 111func TestProgressFlowControl(t *testing.T) { 112 cfg := newTestConfig(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 113 cfg.MaxInflightMsgs = 3 114 cfg.MaxSizePerMsg = 2048 115 r := newRaft(cfg) 116 r.becomeCandidate() 117 r.becomeLeader() 118 119 // Throw away all the messages relating to the initial election. 120 r.readMessages() 121 122 // While node 2 is in probe state, propose a bunch of entries. 123 r.prs.Progress[2].BecomeProbe() 124 blob := []byte(strings.Repeat("a", 1000)) 125 for i := 0; i < 10; i++ { 126 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) 127 } 128 129 ms := r.readMessages() 130 // First append has two entries: the empty entry to confirm the 131 // election, and the first proposal (only one proposal gets sent 132 // because we're in probe state). 133 if len(ms) != 1 || ms[0].Type != pb.MsgApp { 134 t.Fatalf("expected 1 MsgApp, got %v", ms) 135 } 136 if len(ms[0].Entries) != 2 { 137 t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries)) 138 } 139 if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 { 140 t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) 141 } 142 143 // When this append is acked, we change to replicate state and can 144 // send multiple messages at once. 145 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) 146 ms = r.readMessages() 147 if len(ms) != 3 { 148 t.Fatalf("expected 3 messages, got %d", len(ms)) 149 } 150 for i, m := range ms { 151 if m.Type != pb.MsgApp { 152 t.Errorf("%d: expected MsgApp, got %s", i, m.Type) 153 } 154 if len(m.Entries) != 2 { 155 t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) 156 } 157 } 158 159 // Ack all three of those messages together and get the last two 160 // messages (containing three entries). 161 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) 162 ms = r.readMessages() 163 if len(ms) != 2 { 164 t.Fatalf("expected 2 messages, got %d", len(ms)) 165 } 166 for i, m := range ms { 167 if m.Type != pb.MsgApp { 168 t.Errorf("%d: expected MsgApp, got %s", i, m.Type) 169 } 170 } 171 if len(ms[0].Entries) != 2 { 172 t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) 173 } 174 if len(ms[1].Entries) != 1 { 175 t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) 176 } 177} 178 179func TestUncommittedEntryLimit(t *testing.T) { 180 // Use a relatively large number of entries here to prevent regression of a 181 // bug which computed the size before it was fixed. This test would fail 182 // with the bug, either because we'd get dropped proposals earlier than we 183 // expect them, or because the final tally ends up nonzero. (At the time of 184 // writing, the former). 185 const maxEntries = 1024 186 testEntry := pb.Entry{Data: []byte("testdata")} 187 maxEntrySize := maxEntries * PayloadSize(testEntry) 188 189 if n := PayloadSize(pb.Entry{Data: nil}); n != 0 { 190 t.Fatal("entry with no Data must have zero payload size") 191 } 192 193 cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 194 cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) 195 cfg.MaxInflightMsgs = 2 * 1024 // avoid interference 196 r := newRaft(cfg) 197 r.becomeCandidate() 198 r.becomeLeader() 199 if n := r.uncommittedSize; n != 0 { 200 t.Fatalf("expected zero uncommitted size, got %d bytes", n) 201 } 202 203 // Set the two followers to the replicate state. Commit to tail of log. 204 const numFollowers = 2 205 r.prs.Progress[2].BecomeReplicate() 206 r.prs.Progress[3].BecomeReplicate() 207 r.uncommittedSize = 0 208 209 // Send proposals to r1. The first 5 entries should be appended to the log. 210 propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}} 211 propEnts := make([]pb.Entry, maxEntries) 212 for i := 0; i < maxEntries; i++ { 213 if err := r.Step(propMsg); err != nil { 214 t.Fatalf("proposal resulted in error: %v", err) 215 } 216 propEnts[i] = testEntry 217 } 218 219 // Send one more proposal to r1. It should be rejected. 220 if err := r.Step(propMsg); err != ErrProposalDropped { 221 t.Fatalf("proposal not dropped: %v", err) 222 } 223 224 // Read messages and reduce the uncommitted size as if we had committed 225 // these entries. 226 ms := r.readMessages() 227 if e := maxEntries * numFollowers; len(ms) != e { 228 t.Fatalf("expected %d messages, got %d", e, len(ms)) 229 } 230 r.reduceUncommittedSize(propEnts) 231 if r.uncommittedSize != 0 { 232 t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize) 233 } 234 235 // Send a single large proposal to r1. Should be accepted even though it 236 // pushes us above the limit because we were beneath it before the proposal. 237 propEnts = make([]pb.Entry, 2*maxEntries) 238 for i := range propEnts { 239 propEnts[i] = testEntry 240 } 241 propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts} 242 if err := r.Step(propMsgLarge); err != nil { 243 t.Fatalf("proposal resulted in error: %v", err) 244 } 245 246 // Send one more proposal to r1. It should be rejected, again. 247 if err := r.Step(propMsg); err != ErrProposalDropped { 248 t.Fatalf("proposal not dropped: %v", err) 249 } 250 251 // But we can always append an entry with no Data. This is used both for the 252 // leader's first empty entry and for auto-transitioning out of joint config 253 // states. 254 if err := r.Step( 255 pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}, 256 ); err != nil { 257 t.Fatal(err) 258 } 259 260 // Read messages and reduce the uncommitted size as if we had committed 261 // these entries. 262 ms = r.readMessages() 263 if e := 2 * numFollowers; len(ms) != e { 264 t.Fatalf("expected %d messages, got %d", e, len(ms)) 265 } 266 r.reduceUncommittedSize(propEnts) 267 if n := r.uncommittedSize; n != 0 { 268 t.Fatalf("expected zero uncommitted size, got %d", n) 269 } 270} 271 272func TestLeaderElection(t *testing.T) { 273 testLeaderElection(t, false) 274} 275 276func TestLeaderElectionPreVote(t *testing.T) { 277 testLeaderElection(t, true) 278} 279 280func testLeaderElection(t *testing.T, preVote bool) { 281 var cfg func(*Config) 282 candState := StateCandidate 283 candTerm := uint64(1) 284 if preVote { 285 cfg = preVoteConfig 286 // In pre-vote mode, an election that fails to complete 287 // leaves the node in pre-candidate state without advancing 288 // the term. 289 candState = StatePreCandidate 290 candTerm = 0 291 } 292 tests := []struct { 293 *network 294 state StateType 295 expTerm uint64 296 }{ 297 {newNetworkWithConfig(cfg, nil, nil, nil), StateLeader, 1}, 298 {newNetworkWithConfig(cfg, nil, nil, nopStepper), StateLeader, 1}, 299 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper), candState, candTerm}, 300 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil), candState, candTerm}, 301 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil, nil), StateLeader, 1}, 302 303 // three logs further along than 0, but in the same term so rejections 304 // are returned instead of the votes being ignored. 305 {newNetworkWithConfig(cfg, 306 nil, entsWithConfig(cfg, 1), entsWithConfig(cfg, 1), entsWithConfig(cfg, 1, 1), nil), 307 StateFollower, 1}, 308 } 309 310 for i, tt := range tests { 311 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 312 sm := tt.network.peers[1].(*raft) 313 if sm.state != tt.state { 314 t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) 315 } 316 if g := sm.Term; g != tt.expTerm { 317 t.Errorf("#%d: term = %d, want %d", i, g, tt.expTerm) 318 } 319 } 320} 321 322// TestLearnerElectionTimeout verfies that the leader should not start election even 323// when times out. 324func TestLearnerElectionTimeout(t *testing.T) { 325 n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 326 n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 327 328 n1.becomeFollower(1, None) 329 n2.becomeFollower(1, None) 330 331 // n2 is learner. Learner should not start election even when times out. 332 setRandomizedElectionTimeout(n2, n2.electionTimeout) 333 for i := 0; i < n2.electionTimeout; i++ { 334 n2.tick() 335 } 336 337 if n2.state != StateFollower { 338 t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower) 339 } 340} 341 342// TestLearnerPromotion verifies that the learner should not election until 343// it is promoted to a normal peer. 344func TestLearnerPromotion(t *testing.T) { 345 n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 346 n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 347 348 n1.becomeFollower(1, None) 349 n2.becomeFollower(1, None) 350 351 nt := newNetwork(n1, n2) 352 353 if n1.state == StateLeader { 354 t.Error("peer 1 state is leader, want not", n1.state) 355 } 356 357 // n1 should become leader 358 setRandomizedElectionTimeout(n1, n1.electionTimeout) 359 for i := 0; i < n1.electionTimeout; i++ { 360 n1.tick() 361 } 362 363 if n1.state != StateLeader { 364 t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader) 365 } 366 if n2.state != StateFollower { 367 t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower) 368 } 369 370 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 371 372 n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) 373 n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) 374 if n2.isLearner { 375 t.Error("peer 2 is learner, want not") 376 } 377 378 // n2 start election, should become leader 379 setRandomizedElectionTimeout(n2, n2.electionTimeout) 380 for i := 0; i < n2.electionTimeout; i++ { 381 n2.tick() 382 } 383 384 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat}) 385 386 if n1.state != StateFollower { 387 t.Errorf("peer 1 state: %s, want %s", n1.state, StateFollower) 388 } 389 if n2.state != StateLeader { 390 t.Errorf("peer 2 state: %s, want %s", n2.state, StateLeader) 391 } 392} 393 394// TestLearnerCanVote checks that a learner can vote when it receives a valid Vote request. 395// See (*raft).Step for why this is necessary and correct behavior. 396func TestLearnerCanVote(t *testing.T) { 397 n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 398 399 n2.becomeFollower(1, None) 400 401 n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11}) 402 403 if len(n2.msgs) != 1 { 404 t.Fatalf("expected exactly one message, not %+v", n2.msgs) 405 } 406 msg := n2.msgs[0] 407 if msg.Type != pb.MsgVoteResp && !msg.Reject { 408 t.Fatal("expected learner to not reject vote") 409 } 410} 411 412func TestLeaderCycle(t *testing.T) { 413 testLeaderCycle(t, false) 414} 415 416func TestLeaderCyclePreVote(t *testing.T) { 417 testLeaderCycle(t, true) 418} 419 420// testLeaderCycle verifies that each node in a cluster can campaign 421// and be elected in turn. This ensures that elections (including 422// pre-vote) work when not starting from a clean slate (as they do in 423// TestLeaderElection) 424func testLeaderCycle(t *testing.T, preVote bool) { 425 var cfg func(*Config) 426 if preVote { 427 cfg = preVoteConfig 428 } 429 n := newNetworkWithConfig(cfg, nil, nil, nil) 430 for campaignerID := uint64(1); campaignerID <= 3; campaignerID++ { 431 n.send(pb.Message{From: campaignerID, To: campaignerID, Type: pb.MsgHup}) 432 433 for _, peer := range n.peers { 434 sm := peer.(*raft) 435 if sm.id == campaignerID && sm.state != StateLeader { 436 t.Errorf("preVote=%v: campaigning node %d state = %v, want StateLeader", 437 preVote, sm.id, sm.state) 438 } else if sm.id != campaignerID && sm.state != StateFollower { 439 t.Errorf("preVote=%v: after campaign of node %d, "+ 440 "node %d had state = %v, want StateFollower", 441 preVote, campaignerID, sm.id, sm.state) 442 } 443 } 444 } 445} 446 447// TestLeaderElectionOverwriteNewerLogs tests a scenario in which a 448// newly-elected leader does *not* have the newest (i.e. highest term) 449// log entries, and must overwrite higher-term log entries with 450// lower-term ones. 451func TestLeaderElectionOverwriteNewerLogs(t *testing.T) { 452 testLeaderElectionOverwriteNewerLogs(t, false) 453} 454 455func TestLeaderElectionOverwriteNewerLogsPreVote(t *testing.T) { 456 testLeaderElectionOverwriteNewerLogs(t, true) 457} 458 459func testLeaderElectionOverwriteNewerLogs(t *testing.T, preVote bool) { 460 var cfg func(*Config) 461 if preVote { 462 cfg = preVoteConfig 463 } 464 // This network represents the results of the following sequence of 465 // events: 466 // - Node 1 won the election in term 1. 467 // - Node 1 replicated a log entry to node 2 but died before sending 468 // it to other nodes. 469 // - Node 3 won the second election in term 2. 470 // - Node 3 wrote an entry to its logs but died without sending it 471 // to any other nodes. 472 // 473 // At this point, nodes 1, 2, and 3 all have uncommitted entries in 474 // their logs and could win an election at term 3. The winner's log 475 // entry overwrites the losers'. (TestLeaderSyncFollowerLog tests 476 // the case where older log entries are overwritten, so this test 477 // focuses on the case where the newer entries are lost). 478 n := newNetworkWithConfig(cfg, 479 entsWithConfig(cfg, 1), // Node 1: Won first election 480 entsWithConfig(cfg, 1), // Node 2: Got logs from node 1 481 entsWithConfig(cfg, 2), // Node 3: Won second election 482 votedWithConfig(cfg, 3, 2), // Node 4: Voted but didn't get logs 483 votedWithConfig(cfg, 3, 2)) // Node 5: Voted but didn't get logs 484 485 // Node 1 campaigns. The election fails because a quorum of nodes 486 // know about the election that already happened at term 2. Node 1's 487 // term is pushed ahead to 2. 488 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 489 sm1 := n.peers[1].(*raft) 490 if sm1.state != StateFollower { 491 t.Errorf("state = %s, want StateFollower", sm1.state) 492 } 493 if sm1.Term != 2 { 494 t.Errorf("term = %d, want 2", sm1.Term) 495 } 496 497 // Node 1 campaigns again with a higher term. This time it succeeds. 498 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 499 if sm1.state != StateLeader { 500 t.Errorf("state = %s, want StateLeader", sm1.state) 501 } 502 if sm1.Term != 3 { 503 t.Errorf("term = %d, want 3", sm1.Term) 504 } 505 506 // Now all nodes agree on a log entry with term 1 at index 1 (and 507 // term 3 at index 2). 508 for i := range n.peers { 509 sm := n.peers[i].(*raft) 510 entries := sm.raftLog.allEntries() 511 if len(entries) != 2 { 512 t.Fatalf("node %d: len(entries) == %d, want 2", i, len(entries)) 513 } 514 if entries[0].Term != 1 { 515 t.Errorf("node %d: term at index 1 == %d, want 1", i, entries[0].Term) 516 } 517 if entries[1].Term != 3 { 518 t.Errorf("node %d: term at index 2 == %d, want 3", i, entries[1].Term) 519 } 520 } 521} 522 523func TestVoteFromAnyState(t *testing.T) { 524 testVoteFromAnyState(t, pb.MsgVote) 525} 526 527func TestPreVoteFromAnyState(t *testing.T) { 528 testVoteFromAnyState(t, pb.MsgPreVote) 529} 530 531func testVoteFromAnyState(t *testing.T, vt pb.MessageType) { 532 for st := StateType(0); st < numStates; st++ { 533 r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 534 r.Term = 1 535 536 switch st { 537 case StateFollower: 538 r.becomeFollower(r.Term, 3) 539 case StatePreCandidate: 540 r.becomePreCandidate() 541 case StateCandidate: 542 r.becomeCandidate() 543 case StateLeader: 544 r.becomeCandidate() 545 r.becomeLeader() 546 } 547 548 // Note that setting our state above may have advanced r.Term 549 // past its initial value. 550 origTerm := r.Term 551 newTerm := r.Term + 1 552 553 msg := pb.Message{ 554 From: 2, 555 To: 1, 556 Type: vt, 557 Term: newTerm, 558 LogTerm: newTerm, 559 Index: 42, 560 } 561 if err := r.Step(msg); err != nil { 562 t.Errorf("%s,%s: Step failed: %s", vt, st, err) 563 } 564 if len(r.msgs) != 1 { 565 t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs) 566 } else { 567 resp := r.msgs[0] 568 if resp.Type != voteRespMsgType(vt) { 569 t.Errorf("%s,%s: response message is %s, want %s", 570 vt, st, resp.Type, voteRespMsgType(vt)) 571 } 572 if resp.Reject { 573 t.Errorf("%s,%s: unexpected rejection", vt, st) 574 } 575 } 576 577 // If this was a real vote, we reset our state and term. 578 if vt == pb.MsgVote { 579 if r.state != StateFollower { 580 t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, StateFollower) 581 } 582 if r.Term != newTerm { 583 t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, newTerm) 584 } 585 if r.Vote != 2 { 586 t.Errorf("%s,%s: vote %d, want 2", vt, st, r.Vote) 587 } 588 } else { 589 // In a prevote, nothing changes. 590 if r.state != st { 591 t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, st) 592 } 593 if r.Term != origTerm { 594 t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, origTerm) 595 } 596 // if st == StateFollower or StatePreCandidate, r hasn't voted yet. 597 // In StateCandidate or StateLeader, it's voted for itself. 598 if r.Vote != None && r.Vote != 1 { 599 t.Errorf("%s,%s: vote %d, want %d or 1", vt, st, r.Vote, None) 600 } 601 } 602 } 603} 604 605func TestLogReplication(t *testing.T) { 606 tests := []struct { 607 *network 608 msgs []pb.Message 609 wcommitted uint64 610 }{ 611 { 612 newNetwork(nil, nil, nil), 613 []pb.Message{ 614 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 615 }, 616 2, 617 }, 618 { 619 newNetwork(nil, nil, nil), 620 []pb.Message{ 621 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 622 {From: 1, To: 2, Type: pb.MsgHup}, 623 {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 624 }, 625 4, 626 }, 627 } 628 629 for i, tt := range tests { 630 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 631 632 for _, m := range tt.msgs { 633 tt.send(m) 634 } 635 636 for j, x := range tt.network.peers { 637 sm := x.(*raft) 638 639 if sm.raftLog.committed != tt.wcommitted { 640 t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted) 641 } 642 643 ents := []pb.Entry{} 644 for _, e := range nextEnts(sm, tt.network.storage[j]) { 645 if e.Data != nil { 646 ents = append(ents, e) 647 } 648 } 649 props := []pb.Message{} 650 for _, m := range tt.msgs { 651 if m.Type == pb.MsgProp { 652 props = append(props, m) 653 } 654 } 655 for k, m := range props { 656 if !bytes.Equal(ents[k].Data, m.Entries[0].Data) { 657 t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data) 658 } 659 } 660 } 661 } 662} 663 664// TestLearnerLogReplication tests that a learner can receive entries from the leader. 665func TestLearnerLogReplication(t *testing.T) { 666 n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 667 n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 668 669 nt := newNetwork(n1, n2) 670 671 n1.becomeFollower(1, None) 672 n2.becomeFollower(1, None) 673 674 setRandomizedElectionTimeout(n1, n1.electionTimeout) 675 for i := 0; i < n1.electionTimeout; i++ { 676 n1.tick() 677 } 678 679 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 680 681 // n1 is leader and n2 is learner 682 if n1.state != StateLeader { 683 t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader) 684 } 685 if !n2.isLearner { 686 t.Error("peer 2 state: not learner, want yes") 687 } 688 689 nextCommitted := n1.raftLog.committed + 1 690 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 691 if n1.raftLog.committed != nextCommitted { 692 t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed) 693 } 694 695 if n1.raftLog.committed != n2.raftLog.committed { 696 t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) 697 } 698 699 match := n1.prs.Progress[2].Match 700 if match != n2.raftLog.committed { 701 t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match) 702 } 703} 704 705func TestSingleNodeCommit(t *testing.T) { 706 tt := newNetwork(nil) 707 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 708 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 709 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 710 711 sm := tt.peers[1].(*raft) 712 if sm.raftLog.committed != 3 { 713 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) 714 } 715} 716 717// TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed 718// when leader changes, no new proposal comes in and ChangeTerm proposal is 719// filtered. 720func TestCannotCommitWithoutNewTermEntry(t *testing.T) { 721 tt := newNetwork(nil, nil, nil, nil, nil) 722 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 723 724 // 0 cannot reach 2,3,4 725 tt.cut(1, 3) 726 tt.cut(1, 4) 727 tt.cut(1, 5) 728 729 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 730 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 731 732 sm := tt.peers[1].(*raft) 733 if sm.raftLog.committed != 1 { 734 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 735 } 736 737 // network recovery 738 tt.recover() 739 // avoid committing ChangeTerm proposal 740 tt.ignore(pb.MsgApp) 741 742 // elect 2 as the new leader with term 2 743 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 744 745 // no log entries from previous term should be committed 746 sm = tt.peers[2].(*raft) 747 if sm.raftLog.committed != 1 { 748 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 749 } 750 751 tt.recover() 752 // send heartbeat; reset wait 753 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat}) 754 // append an entry at current term 755 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 756 // expect the committed to be advanced 757 if sm.raftLog.committed != 5 { 758 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) 759 } 760} 761 762// TestCommitWithoutNewTermEntry tests the entries could be committed 763// when leader changes, no new proposal comes in. 764func TestCommitWithoutNewTermEntry(t *testing.T) { 765 tt := newNetwork(nil, nil, nil, nil, nil) 766 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 767 768 // 0 cannot reach 2,3,4 769 tt.cut(1, 3) 770 tt.cut(1, 4) 771 tt.cut(1, 5) 772 773 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 774 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 775 776 sm := tt.peers[1].(*raft) 777 if sm.raftLog.committed != 1 { 778 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 779 } 780 781 // network recovery 782 tt.recover() 783 784 // elect 2 as the new leader with term 2 785 // after append a ChangeTerm entry from the current term, all entries 786 // should be committed 787 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 788 789 if sm.raftLog.committed != 4 { 790 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) 791 } 792} 793 794func TestDuelingCandidates(t *testing.T) { 795 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 796 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 797 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 798 799 nt := newNetwork(a, b, c) 800 nt.cut(1, 3) 801 802 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 803 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 804 805 // 1 becomes leader since it receives votes from 1 and 2 806 sm := nt.peers[1].(*raft) 807 if sm.state != StateLeader { 808 t.Errorf("state = %s, want %s", sm.state, StateLeader) 809 } 810 811 // 3 stays as candidate since it receives a vote from 3 and a rejection from 2 812 sm = nt.peers[3].(*raft) 813 if sm.state != StateCandidate { 814 t.Errorf("state = %s, want %s", sm.state, StateCandidate) 815 } 816 817 nt.recover() 818 819 // candidate 3 now increases its term and tries to vote again 820 // we expect it to disrupt the leader 1 since it has a higher term 821 // 3 will be follower again since both 1 and 2 rejects its vote request since 3 does not have a long enough log 822 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 823 824 wlog := &raftLog{ 825 storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, 826 committed: 1, 827 unstable: unstable{offset: 2}, 828 } 829 tests := []struct { 830 sm *raft 831 state StateType 832 term uint64 833 raftLog *raftLog 834 }{ 835 {a, StateFollower, 2, wlog}, 836 {b, StateFollower, 2, wlog}, 837 {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, 838 } 839 840 for i, tt := range tests { 841 if g := tt.sm.state; g != tt.state { 842 t.Errorf("#%d: state = %s, want %s", i, g, tt.state) 843 } 844 if g := tt.sm.Term; g != tt.term { 845 t.Errorf("#%d: term = %d, want %d", i, g, tt.term) 846 } 847 base := ltoa(tt.raftLog) 848 if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { 849 l := ltoa(sm.raftLog) 850 if g := diffu(base, l); g != "" { 851 t.Errorf("#%d: diff:\n%s", i, g) 852 } 853 } else { 854 t.Logf("#%d: empty log", i) 855 } 856 } 857} 858 859func TestDuelingPreCandidates(t *testing.T) { 860 cfgA := newTestConfig(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 861 cfgB := newTestConfig(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 862 cfgC := newTestConfig(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 863 cfgA.PreVote = true 864 cfgB.PreVote = true 865 cfgC.PreVote = true 866 a := newRaft(cfgA) 867 b := newRaft(cfgB) 868 c := newRaft(cfgC) 869 870 nt := newNetwork(a, b, c) 871 nt.cut(1, 3) 872 873 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 874 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 875 876 // 1 becomes leader since it receives votes from 1 and 2 877 sm := nt.peers[1].(*raft) 878 if sm.state != StateLeader { 879 t.Errorf("state = %s, want %s", sm.state, StateLeader) 880 } 881 882 // 3 campaigns then reverts to follower when its PreVote is rejected 883 sm = nt.peers[3].(*raft) 884 if sm.state != StateFollower { 885 t.Errorf("state = %s, want %s", sm.state, StateFollower) 886 } 887 888 nt.recover() 889 890 // Candidate 3 now increases its term and tries to vote again. 891 // With PreVote, it does not disrupt the leader. 892 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 893 894 wlog := &raftLog{ 895 storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, 896 committed: 1, 897 unstable: unstable{offset: 2}, 898 } 899 tests := []struct { 900 sm *raft 901 state StateType 902 term uint64 903 raftLog *raftLog 904 }{ 905 {a, StateLeader, 1, wlog}, 906 {b, StateFollower, 1, wlog}, 907 {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)}, 908 } 909 910 for i, tt := range tests { 911 if g := tt.sm.state; g != tt.state { 912 t.Errorf("#%d: state = %s, want %s", i, g, tt.state) 913 } 914 if g := tt.sm.Term; g != tt.term { 915 t.Errorf("#%d: term = %d, want %d", i, g, tt.term) 916 } 917 base := ltoa(tt.raftLog) 918 if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { 919 l := ltoa(sm.raftLog) 920 if g := diffu(base, l); g != "" { 921 t.Errorf("#%d: diff:\n%s", i, g) 922 } 923 } else { 924 t.Logf("#%d: empty log", i) 925 } 926 } 927} 928 929func TestCandidateConcede(t *testing.T) { 930 tt := newNetwork(nil, nil, nil) 931 tt.isolate(1) 932 933 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 934 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 935 936 // heal the partition 937 tt.recover() 938 // send heartbeat; reset wait 939 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) 940 941 data := []byte("force follower") 942 // send a proposal to 3 to flush out a MsgApp to 1 943 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 944 // send heartbeat; flush out commit 945 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) 946 947 a := tt.peers[1].(*raft) 948 if g := a.state; g != StateFollower { 949 t.Errorf("state = %s, want %s", g, StateFollower) 950 } 951 if g := a.Term; g != 1 { 952 t.Errorf("term = %d, want %d", g, 1) 953 } 954 wantLog := ltoa(&raftLog{ 955 storage: &MemoryStorage{ 956 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, 957 }, 958 unstable: unstable{offset: 3}, 959 committed: 2, 960 }) 961 for i, p := range tt.peers { 962 if sm, ok := p.(*raft); ok { 963 l := ltoa(sm.raftLog) 964 if g := diffu(wantLog, l); g != "" { 965 t.Errorf("#%d: diff:\n%s", i, g) 966 } 967 } else { 968 t.Logf("#%d: empty log", i) 969 } 970 } 971} 972 973func TestSingleNodeCandidate(t *testing.T) { 974 tt := newNetwork(nil) 975 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 976 977 sm := tt.peers[1].(*raft) 978 if sm.state != StateLeader { 979 t.Errorf("state = %d, want %d", sm.state, StateLeader) 980 } 981} 982 983func TestSingleNodePreCandidate(t *testing.T) { 984 tt := newNetworkWithConfig(preVoteConfig, nil) 985 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 986 987 sm := tt.peers[1].(*raft) 988 if sm.state != StateLeader { 989 t.Errorf("state = %d, want %d", sm.state, StateLeader) 990 } 991} 992 993func TestOldMessages(t *testing.T) { 994 tt := newNetwork(nil, nil, nil) 995 // make 0 leader @ term 3 996 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 997 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 998 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 999 // pretend we're an old leader trying to make progress; this entry is expected to be ignored. 1000 tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}}) 1001 // commit a new entry 1002 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 1003 1004 ilog := &raftLog{ 1005 storage: &MemoryStorage{ 1006 ents: []pb.Entry{ 1007 {}, {Data: nil, Term: 1, Index: 1}, 1008 {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3}, 1009 {Data: []byte("somedata"), Term: 3, Index: 4}, 1010 }, 1011 }, 1012 unstable: unstable{offset: 5}, 1013 committed: 4, 1014 } 1015 base := ltoa(ilog) 1016 for i, p := range tt.peers { 1017 if sm, ok := p.(*raft); ok { 1018 l := ltoa(sm.raftLog) 1019 if g := diffu(base, l); g != "" { 1020 t.Errorf("#%d: diff:\n%s", i, g) 1021 } 1022 } else { 1023 t.Logf("#%d: empty log", i) 1024 } 1025 } 1026} 1027 1028// TestOldMessagesReply - optimization - reply with new term. 1029 1030func TestProposal(t *testing.T) { 1031 tests := []struct { 1032 *network 1033 success bool 1034 }{ 1035 {newNetwork(nil, nil, nil), true}, 1036 {newNetwork(nil, nil, nopStepper), true}, 1037 {newNetwork(nil, nopStepper, nopStepper), false}, 1038 {newNetwork(nil, nopStepper, nopStepper, nil), false}, 1039 {newNetwork(nil, nopStepper, nopStepper, nil, nil), true}, 1040 } 1041 1042 for j, tt := range tests { 1043 send := func(m pb.Message) { 1044 defer func() { 1045 // only recover if we expect it to panic (success==false) 1046 if !tt.success { 1047 e := recover() 1048 if e != nil { 1049 t.Logf("#%d: err: %s", j, e) 1050 } 1051 } 1052 }() 1053 tt.send(m) 1054 } 1055 1056 data := []byte("somedata") 1057 1058 // promote 1 to become leader 1059 send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1060 send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 1061 1062 wantLog := newLog(NewMemoryStorage(), raftLogger) 1063 if tt.success { 1064 wantLog = &raftLog{ 1065 storage: &MemoryStorage{ 1066 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, 1067 }, 1068 unstable: unstable{offset: 3}, 1069 committed: 2} 1070 } 1071 base := ltoa(wantLog) 1072 for i, p := range tt.peers { 1073 if sm, ok := p.(*raft); ok { 1074 l := ltoa(sm.raftLog) 1075 if g := diffu(base, l); g != "" { 1076 t.Errorf("#%d: diff:\n%s", i, g) 1077 } 1078 } else { 1079 t.Logf("#%d: empty log", i) 1080 } 1081 } 1082 sm := tt.network.peers[1].(*raft) 1083 if g := sm.Term; g != 1 { 1084 t.Errorf("#%d: term = %d, want %d", j, g, 1) 1085 } 1086 } 1087} 1088 1089func TestProposalByProxy(t *testing.T) { 1090 data := []byte("somedata") 1091 tests := []*network{ 1092 newNetwork(nil, nil, nil), 1093 newNetwork(nil, nil, nopStepper), 1094 } 1095 1096 for j, tt := range tests { 1097 // promote 0 the leader 1098 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1099 1100 // propose via follower 1101 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 1102 1103 wantLog := &raftLog{ 1104 storage: &MemoryStorage{ 1105 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, 1106 }, 1107 unstable: unstable{offset: 3}, 1108 committed: 2} 1109 base := ltoa(wantLog) 1110 for i, p := range tt.peers { 1111 if sm, ok := p.(*raft); ok { 1112 l := ltoa(sm.raftLog) 1113 if g := diffu(base, l); g != "" { 1114 t.Errorf("#%d: diff:\n%s", i, g) 1115 } 1116 } else { 1117 t.Logf("#%d: empty log", i) 1118 } 1119 } 1120 sm := tt.peers[1].(*raft) 1121 if g := sm.Term; g != 1 { 1122 t.Errorf("#%d: term = %d, want %d", j, g, 1) 1123 } 1124 } 1125} 1126 1127func TestCommit(t *testing.T) { 1128 tests := []struct { 1129 matches []uint64 1130 logs []pb.Entry 1131 smTerm uint64 1132 w uint64 1133 }{ 1134 // single 1135 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1}, 1136 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0}, 1137 {[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 1138 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1}, 1139 1140 // odd 1141 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 1142 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 1143 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 1144 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 1145 1146 // even 1147 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 1148 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 1149 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 1150 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 1151 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 1152 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 1153 } 1154 1155 for i, tt := range tests { 1156 storage := NewMemoryStorage() 1157 storage.Append(tt.logs) 1158 storage.hardState = pb.HardState{Term: tt.smTerm} 1159 1160 sm := newTestRaft(1, []uint64{1}, 10, 2, storage) 1161 for j := 0; j < len(tt.matches); j++ { 1162 id := uint64(j) + 1 1163 if id > 1 { 1164 sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2()) 1165 } 1166 pr := sm.prs.Progress[id] 1167 pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1 1168 } 1169 sm.maybeCommit() 1170 if g := sm.raftLog.committed; g != tt.w { 1171 t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) 1172 } 1173 } 1174} 1175 1176func TestPastElectionTimeout(t *testing.T) { 1177 tests := []struct { 1178 elapse int 1179 wprobability float64 1180 round bool 1181 }{ 1182 {5, 0, false}, 1183 {10, 0.1, true}, 1184 {13, 0.4, true}, 1185 {15, 0.6, true}, 1186 {18, 0.9, true}, 1187 {20, 1, false}, 1188 } 1189 1190 for i, tt := range tests { 1191 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1192 sm.electionElapsed = tt.elapse 1193 c := 0 1194 for j := 0; j < 10000; j++ { 1195 sm.resetRandomizedElectionTimeout() 1196 if sm.pastElectionTimeout() { 1197 c++ 1198 } 1199 } 1200 got := float64(c) / 10000.0 1201 if tt.round { 1202 got = math.Floor(got*10+0.5) / 10.0 1203 } 1204 if got != tt.wprobability { 1205 t.Errorf("#%d: probability = %v, want %v", i, got, tt.wprobability) 1206 } 1207 } 1208} 1209 1210// ensure that the Step function ignores the message from old term and does not pass it to the 1211// actual stepX function. 1212func TestStepIgnoreOldTermMsg(t *testing.T) { 1213 called := false 1214 fakeStep := func(r *raft, m pb.Message) error { 1215 called = true 1216 return nil 1217 } 1218 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1219 sm.step = fakeStep 1220 sm.Term = 2 1221 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) 1222 if called { 1223 t.Errorf("stepFunc called = %v , want %v", called, false) 1224 } 1225} 1226 1227// TestHandleMsgApp ensures: 1228// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm. 1229// 2. If an existing entry conflicts with a new one (same index but different terms), 1230// delete the existing entry and all that follow it; append any new entries not already in the log. 1231// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). 1232func TestHandleMsgApp(t *testing.T) { 1233 tests := []struct { 1234 m pb.Message 1235 wIndex uint64 1236 wCommit uint64 1237 wReject bool 1238 }{ 1239 // Ensure 1 1240 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch 1241 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist 1242 1243 // Ensure 2 1244 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, 1245 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false}, 1246 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false}, 1247 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false}, 1248 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, 1249 1250 // Ensure 3 1251 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to last new entry 1 1252 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 1253 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit up to last new entry 2 1254 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit up to log.last() 1255 } 1256 1257 for i, tt := range tests { 1258 storage := NewMemoryStorage() 1259 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) 1260 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 1261 sm.becomeFollower(2, None) 1262 1263 sm.handleAppendEntries(tt.m) 1264 if sm.raftLog.lastIndex() != tt.wIndex { 1265 t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex) 1266 } 1267 if sm.raftLog.committed != tt.wCommit { 1268 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) 1269 } 1270 m := sm.readMessages() 1271 if len(m) != 1 { 1272 t.Fatalf("#%d: msg = nil, want 1", i) 1273 } 1274 if m[0].Reject != tt.wReject { 1275 t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject) 1276 } 1277 } 1278} 1279 1280// TestHandleHeartbeat ensures that the follower commits to the commit in the message. 1281func TestHandleHeartbeat(t *testing.T) { 1282 commit := uint64(2) 1283 tests := []struct { 1284 m pb.Message 1285 wCommit uint64 1286 }{ 1287 {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, 1288 {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit 1289 } 1290 1291 for i, tt := range tests { 1292 storage := NewMemoryStorage() 1293 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) 1294 sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) 1295 sm.becomeFollower(2, 2) 1296 sm.raftLog.commitTo(commit) 1297 sm.handleHeartbeat(tt.m) 1298 if sm.raftLog.committed != tt.wCommit { 1299 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) 1300 } 1301 m := sm.readMessages() 1302 if len(m) != 1 { 1303 t.Fatalf("#%d: msg = nil, want 1", i) 1304 } 1305 if m[0].Type != pb.MsgHeartbeatResp { 1306 t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type) 1307 } 1308 } 1309} 1310 1311// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. 1312func TestHandleHeartbeatResp(t *testing.T) { 1313 storage := NewMemoryStorage() 1314 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) 1315 sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) 1316 sm.becomeCandidate() 1317 sm.becomeLeader() 1318 sm.raftLog.commitTo(sm.raftLog.lastIndex()) 1319 1320 // A heartbeat response from a node that is behind; re-send MsgApp 1321 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 1322 msgs := sm.readMessages() 1323 if len(msgs) != 1 { 1324 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 1325 } 1326 if msgs[0].Type != pb.MsgApp { 1327 t.Errorf("type = %v, want MsgApp", msgs[0].Type) 1328 } 1329 1330 // A second heartbeat response generates another MsgApp re-send 1331 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 1332 msgs = sm.readMessages() 1333 if len(msgs) != 1 { 1334 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 1335 } 1336 if msgs[0].Type != pb.MsgApp { 1337 t.Errorf("type = %v, want MsgApp", msgs[0].Type) 1338 } 1339 1340 // Once we have an MsgAppResp, heartbeats no longer send MsgApp. 1341 sm.Step(pb.Message{ 1342 From: 2, 1343 Type: pb.MsgAppResp, 1344 Index: msgs[0].Index + uint64(len(msgs[0].Entries)), 1345 }) 1346 // Consume the message sent in response to MsgAppResp 1347 sm.readMessages() 1348 1349 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 1350 msgs = sm.readMessages() 1351 if len(msgs) != 0 { 1352 t.Fatalf("len(msgs) = %d, want 0: %+v", len(msgs), msgs) 1353 } 1354} 1355 1356// TestRaftFreesReadOnlyMem ensures raft will free read request from 1357// readOnly readIndexQueue and pendingReadIndex map. 1358// related issue: https://github.com/etcd-io/etcd/issues/7571 1359func TestRaftFreesReadOnlyMem(t *testing.T) { 1360 sm := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 1361 sm.becomeCandidate() 1362 sm.becomeLeader() 1363 sm.raftLog.commitTo(sm.raftLog.lastIndex()) 1364 1365 ctx := []byte("ctx") 1366 1367 // leader starts linearizable read request. 1368 // more info: raft dissertation 6.4, step 2. 1369 sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}}) 1370 msgs := sm.readMessages() 1371 if len(msgs) != 1 { 1372 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 1373 } 1374 if msgs[0].Type != pb.MsgHeartbeat { 1375 t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type) 1376 } 1377 if !bytes.Equal(msgs[0].Context, ctx) { 1378 t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx) 1379 } 1380 if len(sm.readOnly.readIndexQueue) != 1 { 1381 t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue)) 1382 } 1383 if len(sm.readOnly.pendingReadIndex) != 1 { 1384 t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex)) 1385 } 1386 if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok { 1387 t.Fatalf("can't find context %v in pendingReadIndex ", ctx) 1388 } 1389 1390 // heartbeat responses from majority of followers (1 in this case) 1391 // acknowledge the authority of the leader. 1392 // more info: raft dissertation 6.4, step 3. 1393 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx}) 1394 if len(sm.readOnly.readIndexQueue) != 0 { 1395 t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue)) 1396 } 1397 if len(sm.readOnly.pendingReadIndex) != 0 { 1398 t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex)) 1399 } 1400 if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok { 1401 t.Fatalf("found context %v in pendingReadIndex, want none", ctx) 1402 } 1403} 1404 1405// TestMsgAppRespWaitReset verifies the resume behavior of a leader 1406// MsgAppResp. 1407func TestMsgAppRespWaitReset(t *testing.T) { 1408 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 1409 sm.becomeCandidate() 1410 sm.becomeLeader() 1411 1412 // The new leader has just emitted a new Term 4 entry; consume those messages 1413 // from the outgoing queue. 1414 sm.bcastAppend() 1415 sm.readMessages() 1416 1417 // Node 2 acks the first entry, making it committed. 1418 sm.Step(pb.Message{ 1419 From: 2, 1420 Type: pb.MsgAppResp, 1421 Index: 1, 1422 }) 1423 if sm.raftLog.committed != 1 { 1424 t.Fatalf("expected committed to be 1, got %d", sm.raftLog.committed) 1425 } 1426 // Also consume the MsgApp messages that update Commit on the followers. 1427 sm.readMessages() 1428 1429 // A new command is now proposed on node 1. 1430 sm.Step(pb.Message{ 1431 From: 1, 1432 Type: pb.MsgProp, 1433 Entries: []pb.Entry{{}}, 1434 }) 1435 1436 // The command is broadcast to all nodes not in the wait state. 1437 // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. 1438 msgs := sm.readMessages() 1439 if len(msgs) != 1 { 1440 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) 1441 } 1442 if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 { 1443 t.Errorf("expected MsgApp to node 2, got %v to %d", msgs[0].Type, msgs[0].To) 1444 } 1445 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { 1446 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) 1447 } 1448 1449 // Now Node 3 acks the first entry. This releases the wait and entry 2 is sent. 1450 sm.Step(pb.Message{ 1451 From: 3, 1452 Type: pb.MsgAppResp, 1453 Index: 1, 1454 }) 1455 msgs = sm.readMessages() 1456 if len(msgs) != 1 { 1457 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) 1458 } 1459 if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 { 1460 t.Errorf("expected MsgApp to node 3, got %v to %d", msgs[0].Type, msgs[0].To) 1461 } 1462 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { 1463 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) 1464 } 1465} 1466 1467func TestRecvMsgVote(t *testing.T) { 1468 testRecvMsgVote(t, pb.MsgVote) 1469} 1470 1471func TestRecvMsgPreVote(t *testing.T) { 1472 testRecvMsgVote(t, pb.MsgPreVote) 1473} 1474 1475func testRecvMsgVote(t *testing.T, msgType pb.MessageType) { 1476 tests := []struct { 1477 state StateType 1478 index, logTerm uint64 1479 voteFor uint64 1480 wreject bool 1481 }{ 1482 {StateFollower, 0, 0, None, true}, 1483 {StateFollower, 0, 1, None, true}, 1484 {StateFollower, 0, 2, None, true}, 1485 {StateFollower, 0, 3, None, false}, 1486 1487 {StateFollower, 1, 0, None, true}, 1488 {StateFollower, 1, 1, None, true}, 1489 {StateFollower, 1, 2, None, true}, 1490 {StateFollower, 1, 3, None, false}, 1491 1492 {StateFollower, 2, 0, None, true}, 1493 {StateFollower, 2, 1, None, true}, 1494 {StateFollower, 2, 2, None, false}, 1495 {StateFollower, 2, 3, None, false}, 1496 1497 {StateFollower, 3, 0, None, true}, 1498 {StateFollower, 3, 1, None, true}, 1499 {StateFollower, 3, 2, None, false}, 1500 {StateFollower, 3, 3, None, false}, 1501 1502 {StateFollower, 3, 2, 2, false}, 1503 {StateFollower, 3, 2, 1, true}, 1504 1505 {StateLeader, 3, 3, 1, true}, 1506 {StatePreCandidate, 3, 3, 1, true}, 1507 {StateCandidate, 3, 3, 1, true}, 1508 } 1509 1510 max := func(a, b uint64) uint64 { 1511 if a > b { 1512 return a 1513 } 1514 return b 1515 } 1516 1517 for i, tt := range tests { 1518 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1519 sm.state = tt.state 1520 switch tt.state { 1521 case StateFollower: 1522 sm.step = stepFollower 1523 case StateCandidate, StatePreCandidate: 1524 sm.step = stepCandidate 1525 case StateLeader: 1526 sm.step = stepLeader 1527 } 1528 sm.Vote = tt.voteFor 1529 sm.raftLog = &raftLog{ 1530 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}}, 1531 unstable: unstable{offset: 3}, 1532 } 1533 1534 // raft.Term is greater than or equal to raft.raftLog.lastTerm. In this 1535 // test we're only testing MsgVote responses when the campaigning node 1536 // has a different raft log compared to the recipient node. 1537 // Additionally we're verifying behaviour when the recipient node has 1538 // already given out its vote for its current term. We're not testing 1539 // what the recipient node does when receiving a message with a 1540 // different term number, so we simply initialize both term numbers to 1541 // be the same. 1542 term := max(sm.raftLog.lastTerm(), tt.logTerm) 1543 sm.Term = term 1544 sm.Step(pb.Message{Type: msgType, Term: term, From: 2, Index: tt.index, LogTerm: tt.logTerm}) 1545 1546 msgs := sm.readMessages() 1547 if g := len(msgs); g != 1 { 1548 t.Fatalf("#%d: len(msgs) = %d, want 1", i, g) 1549 continue 1550 } 1551 if g := msgs[0].Type; g != voteRespMsgType(msgType) { 1552 t.Errorf("#%d, m.Type = %v, want %v", i, g, voteRespMsgType(msgType)) 1553 } 1554 if g := msgs[0].Reject; g != tt.wreject { 1555 t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject) 1556 } 1557 } 1558} 1559 1560func TestStateTransition(t *testing.T) { 1561 tests := []struct { 1562 from StateType 1563 to StateType 1564 wallow bool 1565 wterm uint64 1566 wlead uint64 1567 }{ 1568 {StateFollower, StateFollower, true, 1, None}, 1569 {StateFollower, StatePreCandidate, true, 0, None}, 1570 {StateFollower, StateCandidate, true, 1, None}, 1571 {StateFollower, StateLeader, false, 0, None}, 1572 1573 {StatePreCandidate, StateFollower, true, 0, None}, 1574 {StatePreCandidate, StatePreCandidate, true, 0, None}, 1575 {StatePreCandidate, StateCandidate, true, 1, None}, 1576 {StatePreCandidate, StateLeader, true, 0, 1}, 1577 1578 {StateCandidate, StateFollower, true, 0, None}, 1579 {StateCandidate, StatePreCandidate, true, 0, None}, 1580 {StateCandidate, StateCandidate, true, 1, None}, 1581 {StateCandidate, StateLeader, true, 0, 1}, 1582 1583 {StateLeader, StateFollower, true, 1, None}, 1584 {StateLeader, StatePreCandidate, false, 0, None}, 1585 {StateLeader, StateCandidate, false, 1, None}, 1586 {StateLeader, StateLeader, true, 0, 1}, 1587 } 1588 1589 for i, tt := range tests { 1590 func() { 1591 defer func() { 1592 if r := recover(); r != nil { 1593 if tt.wallow { 1594 t.Errorf("%d: allow = %v, want %v", i, false, true) 1595 } 1596 } 1597 }() 1598 1599 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1600 sm.state = tt.from 1601 1602 switch tt.to { 1603 case StateFollower: 1604 sm.becomeFollower(tt.wterm, tt.wlead) 1605 case StatePreCandidate: 1606 sm.becomePreCandidate() 1607 case StateCandidate: 1608 sm.becomeCandidate() 1609 case StateLeader: 1610 sm.becomeLeader() 1611 } 1612 1613 if sm.Term != tt.wterm { 1614 t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm) 1615 } 1616 if sm.lead != tt.wlead { 1617 t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead) 1618 } 1619 }() 1620 } 1621} 1622 1623func TestAllServerStepdown(t *testing.T) { 1624 tests := []struct { 1625 state StateType 1626 1627 wstate StateType 1628 wterm uint64 1629 windex uint64 1630 }{ 1631 {StateFollower, StateFollower, 3, 0}, 1632 {StatePreCandidate, StateFollower, 3, 0}, 1633 {StateCandidate, StateFollower, 3, 0}, 1634 {StateLeader, StateFollower, 3, 1}, 1635 } 1636 1637 tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp} 1638 tterm := uint64(3) 1639 1640 for i, tt := range tests { 1641 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1642 switch tt.state { 1643 case StateFollower: 1644 sm.becomeFollower(1, None) 1645 case StatePreCandidate: 1646 sm.becomePreCandidate() 1647 case StateCandidate: 1648 sm.becomeCandidate() 1649 case StateLeader: 1650 sm.becomeCandidate() 1651 sm.becomeLeader() 1652 } 1653 1654 for j, msgType := range tmsgTypes { 1655 sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm}) 1656 1657 if sm.state != tt.wstate { 1658 t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) 1659 } 1660 if sm.Term != tt.wterm { 1661 t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm) 1662 } 1663 if sm.raftLog.lastIndex() != tt.windex { 1664 t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex) 1665 } 1666 if uint64(len(sm.raftLog.allEntries())) != tt.windex { 1667 t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex) 1668 } 1669 wlead := uint64(2) 1670 if msgType == pb.MsgVote { 1671 wlead = None 1672 } 1673 if sm.lead != wlead { 1674 t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None) 1675 } 1676 } 1677 } 1678} 1679 1680func TestCandidateResetTermMsgHeartbeat(t *testing.T) { 1681 testCandidateResetTerm(t, pb.MsgHeartbeat) 1682} 1683 1684func TestCandidateResetTermMsgApp(t *testing.T) { 1685 testCandidateResetTerm(t, pb.MsgApp) 1686} 1687 1688// testCandidateResetTerm tests when a candidate receives a 1689// MsgHeartbeat or MsgApp from leader, "Step" resets the term 1690// with leader's and reverts back to follower. 1691func testCandidateResetTerm(t *testing.T, mt pb.MessageType) { 1692 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1693 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1694 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1695 1696 nt := newNetwork(a, b, c) 1697 1698 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1699 if a.state != StateLeader { 1700 t.Errorf("state = %s, want %s", a.state, StateLeader) 1701 } 1702 if b.state != StateFollower { 1703 t.Errorf("state = %s, want %s", b.state, StateFollower) 1704 } 1705 if c.state != StateFollower { 1706 t.Errorf("state = %s, want %s", c.state, StateFollower) 1707 } 1708 1709 // isolate 3 and increase term in rest 1710 nt.isolate(3) 1711 1712 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 1713 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1714 1715 if a.state != StateLeader { 1716 t.Errorf("state = %s, want %s", a.state, StateLeader) 1717 } 1718 if b.state != StateFollower { 1719 t.Errorf("state = %s, want %s", b.state, StateFollower) 1720 } 1721 1722 // trigger campaign in isolated c 1723 c.resetRandomizedElectionTimeout() 1724 for i := 0; i < c.randomizedElectionTimeout; i++ { 1725 c.tick() 1726 } 1727 1728 if c.state != StateCandidate { 1729 t.Errorf("state = %s, want %s", c.state, StateCandidate) 1730 } 1731 1732 nt.recover() 1733 1734 // leader sends to isolated candidate 1735 // and expects candidate to revert to follower 1736 nt.send(pb.Message{From: 1, To: 3, Term: a.Term, Type: mt}) 1737 1738 if c.state != StateFollower { 1739 t.Errorf("state = %s, want %s", c.state, StateFollower) 1740 } 1741 1742 // follower c term is reset with leader's 1743 if a.Term != c.Term { 1744 t.Errorf("follower term expected same term as leader's %d, got %d", a.Term, c.Term) 1745 } 1746} 1747 1748func TestLeaderStepdownWhenQuorumActive(t *testing.T) { 1749 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 1750 1751 sm.checkQuorum = true 1752 1753 sm.becomeCandidate() 1754 sm.becomeLeader() 1755 1756 for i := 0; i < sm.electionTimeout+1; i++ { 1757 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Term: sm.Term}) 1758 sm.tick() 1759 } 1760 1761 if sm.state != StateLeader { 1762 t.Errorf("state = %v, want %v", sm.state, StateLeader) 1763 } 1764} 1765 1766func TestLeaderStepdownWhenQuorumLost(t *testing.T) { 1767 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 1768 1769 sm.checkQuorum = true 1770 1771 sm.becomeCandidate() 1772 sm.becomeLeader() 1773 1774 for i := 0; i < sm.electionTimeout+1; i++ { 1775 sm.tick() 1776 } 1777 1778 if sm.state != StateFollower { 1779 t.Errorf("state = %v, want %v", sm.state, StateFollower) 1780 } 1781} 1782 1783func TestLeaderSupersedingWithCheckQuorum(t *testing.T) { 1784 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1785 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1786 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1787 1788 a.checkQuorum = true 1789 b.checkQuorum = true 1790 c.checkQuorum = true 1791 1792 nt := newNetwork(a, b, c) 1793 setRandomizedElectionTimeout(b, b.electionTimeout+1) 1794 1795 for i := 0; i < b.electionTimeout; i++ { 1796 b.tick() 1797 } 1798 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1799 1800 if a.state != StateLeader { 1801 t.Errorf("state = %s, want %s", a.state, StateLeader) 1802 } 1803 1804 if c.state != StateFollower { 1805 t.Errorf("state = %s, want %s", c.state, StateFollower) 1806 } 1807 1808 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1809 1810 // Peer b rejected c's vote since its electionElapsed had not reached to electionTimeout 1811 if c.state != StateCandidate { 1812 t.Errorf("state = %s, want %s", c.state, StateCandidate) 1813 } 1814 1815 // Letting b's electionElapsed reach to electionTimeout 1816 for i := 0; i < b.electionTimeout; i++ { 1817 b.tick() 1818 } 1819 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1820 1821 if c.state != StateLeader { 1822 t.Errorf("state = %s, want %s", c.state, StateLeader) 1823 } 1824} 1825 1826func TestLeaderElectionWithCheckQuorum(t *testing.T) { 1827 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1828 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1829 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1830 1831 a.checkQuorum = true 1832 b.checkQuorum = true 1833 c.checkQuorum = true 1834 1835 nt := newNetwork(a, b, c) 1836 setRandomizedElectionTimeout(a, a.electionTimeout+1) 1837 setRandomizedElectionTimeout(b, b.electionTimeout+2) 1838 1839 // Immediately after creation, votes are cast regardless of the 1840 // election timeout. 1841 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1842 1843 if a.state != StateLeader { 1844 t.Errorf("state = %s, want %s", a.state, StateLeader) 1845 } 1846 1847 if c.state != StateFollower { 1848 t.Errorf("state = %s, want %s", c.state, StateFollower) 1849 } 1850 1851 // need to reset randomizedElectionTimeout larger than electionTimeout again, 1852 // because the value might be reset to electionTimeout since the last state changes 1853 setRandomizedElectionTimeout(a, a.electionTimeout+1) 1854 setRandomizedElectionTimeout(b, b.electionTimeout+2) 1855 for i := 0; i < a.electionTimeout; i++ { 1856 a.tick() 1857 } 1858 for i := 0; i < b.electionTimeout; i++ { 1859 b.tick() 1860 } 1861 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1862 1863 if a.state != StateFollower { 1864 t.Errorf("state = %s, want %s", a.state, StateFollower) 1865 } 1866 1867 if c.state != StateLeader { 1868 t.Errorf("state = %s, want %s", c.state, StateLeader) 1869 } 1870} 1871 1872// TestFreeStuckCandidateWithCheckQuorum ensures that a candidate with a higher term 1873// can disrupt the leader even if the leader still "officially" holds the lease, The 1874// leader is expected to step down and adopt the candidate's term 1875func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) { 1876 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1877 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1878 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1879 1880 a.checkQuorum = true 1881 b.checkQuorum = true 1882 c.checkQuorum = true 1883 1884 nt := newNetwork(a, b, c) 1885 setRandomizedElectionTimeout(b, b.electionTimeout+1) 1886 1887 for i := 0; i < b.electionTimeout; i++ { 1888 b.tick() 1889 } 1890 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1891 1892 nt.isolate(1) 1893 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1894 1895 if b.state != StateFollower { 1896 t.Errorf("state = %s, want %s", b.state, StateFollower) 1897 } 1898 1899 if c.state != StateCandidate { 1900 t.Errorf("state = %s, want %s", c.state, StateCandidate) 1901 } 1902 1903 if c.Term != b.Term+1 { 1904 t.Errorf("term = %d, want %d", c.Term, b.Term+1) 1905 } 1906 1907 // Vote again for safety 1908 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1909 1910 if b.state != StateFollower { 1911 t.Errorf("state = %s, want %s", b.state, StateFollower) 1912 } 1913 1914 if c.state != StateCandidate { 1915 t.Errorf("state = %s, want %s", c.state, StateCandidate) 1916 } 1917 1918 if c.Term != b.Term+2 { 1919 t.Errorf("term = %d, want %d", c.Term, b.Term+2) 1920 } 1921 1922 nt.recover() 1923 nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: a.Term}) 1924 1925 // Disrupt the leader so that the stuck peer is freed 1926 if a.state != StateFollower { 1927 t.Errorf("state = %s, want %s", a.state, StateFollower) 1928 } 1929 1930 if c.Term != a.Term { 1931 t.Errorf("term = %d, want %d", c.Term, a.Term) 1932 } 1933 1934 // Vote again, should become leader this time 1935 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 1936 1937 if c.state != StateLeader { 1938 t.Errorf("peer 3 state: %s, want %s", c.state, StateLeader) 1939 } 1940} 1941 1942func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { 1943 a := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1944 b := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) 1945 1946 a.checkQuorum = true 1947 b.checkQuorum = true 1948 1949 nt := newNetwork(a, b) 1950 setRandomizedElectionTimeout(b, b.electionTimeout+1) 1951 // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states 1952 b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2()) 1953 1954 if b.promotable() { 1955 t.Fatalf("promotable = %v, want false", b.promotable()) 1956 } 1957 1958 for i := 0; i < b.electionTimeout; i++ { 1959 b.tick() 1960 } 1961 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1962 1963 if a.state != StateLeader { 1964 t.Errorf("state = %s, want %s", a.state, StateLeader) 1965 } 1966 1967 if b.state != StateFollower { 1968 t.Errorf("state = %s, want %s", b.state, StateFollower) 1969 } 1970 1971 if b.lead != 1 { 1972 t.Errorf("lead = %d, want 1", b.lead) 1973 } 1974} 1975 1976// TestDisruptiveFollower tests isolated follower, 1977// with slow network incoming from leader, election times out 1978// to become a candidate with an increased term. Then, the 1979// candiate's response to late leader heartbeat forces the leader 1980// to step down. 1981func TestDisruptiveFollower(t *testing.T) { 1982 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1983 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1984 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1985 1986 n1.checkQuorum = true 1987 n2.checkQuorum = true 1988 n3.checkQuorum = true 1989 1990 n1.becomeFollower(1, None) 1991 n2.becomeFollower(1, None) 1992 n3.becomeFollower(1, None) 1993 1994 nt := newNetwork(n1, n2, n3) 1995 1996 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1997 1998 // check state 1999 // n1.state == StateLeader 2000 // n2.state == StateFollower 2001 // n3.state == StateFollower 2002 if n1.state != StateLeader { 2003 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 2004 } 2005 if n2.state != StateFollower { 2006 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 2007 } 2008 if n3.state != StateFollower { 2009 t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower) 2010 } 2011 2012 // etcd server "advanceTicksForElection" on restart; 2013 // this is to expedite campaign trigger when given larger 2014 // election timeouts (e.g. multi-datacenter deploy) 2015 // Or leader messages are being delayed while ticks elapse 2016 setRandomizedElectionTimeout(n3, n3.electionTimeout+2) 2017 for i := 0; i < n3.randomizedElectionTimeout-1; i++ { 2018 n3.tick() 2019 } 2020 2021 // ideally, before last election tick elapses, 2022 // the follower n3 receives "pb.MsgApp" or "pb.MsgHeartbeat" 2023 // from leader n1, and then resets its "electionElapsed" 2024 // however, last tick may elapse before receiving any 2025 // messages from leader, thus triggering campaign 2026 n3.tick() 2027 2028 // n1 is still leader yet 2029 // while its heartbeat to candidate n3 is being delayed 2030 2031 // check state 2032 // n1.state == StateLeader 2033 // n2.state == StateFollower 2034 // n3.state == StateCandidate 2035 if n1.state != StateLeader { 2036 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 2037 } 2038 if n2.state != StateFollower { 2039 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 2040 } 2041 if n3.state != StateCandidate { 2042 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate) 2043 } 2044 // check term 2045 // n1.Term == 2 2046 // n2.Term == 2 2047 // n3.Term == 3 2048 if n1.Term != 2 { 2049 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2) 2050 } 2051 if n2.Term != 2 { 2052 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2) 2053 } 2054 if n3.Term != 3 { 2055 t.Fatalf("node 3 term: %d, want %d", n3.Term, 3) 2056 } 2057 2058 // while outgoing vote requests are still queued in n3, 2059 // leader heartbeat finally arrives at candidate n3 2060 // however, due to delayed network from leader, leader 2061 // heartbeat was sent with lower term than candidate's 2062 nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat}) 2063 2064 // then candidate n3 responds with "pb.MsgAppResp" of higher term 2065 // and leader steps down from a message with higher term 2066 // this is to disrupt the current leader, so that candidate 2067 // with higher term can be freed with following election 2068 2069 // check state 2070 // n1.state == StateFollower 2071 // n2.state == StateFollower 2072 // n3.state == StateCandidate 2073 if n1.state != StateFollower { 2074 t.Fatalf("node 1 state: %s, want %s", n1.state, StateFollower) 2075 } 2076 if n2.state != StateFollower { 2077 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 2078 } 2079 if n3.state != StateCandidate { 2080 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate) 2081 } 2082 // check term 2083 // n1.Term == 3 2084 // n2.Term == 2 2085 // n3.Term == 3 2086 if n1.Term != 3 { 2087 t.Fatalf("node 1 term: %d, want %d", n1.Term, 3) 2088 } 2089 if n2.Term != 2 { 2090 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2) 2091 } 2092 if n3.Term != 3 { 2093 t.Fatalf("node 3 term: %d, want %d", n3.Term, 3) 2094 } 2095} 2096 2097// TestDisruptiveFollowerPreVote tests isolated follower, 2098// with slow network incoming from leader, election times out 2099// to become a pre-candidate with less log than current leader. 2100// Then pre-vote phase prevents this isolated node from forcing 2101// current leader to step down, thus less disruptions. 2102func TestDisruptiveFollowerPreVote(t *testing.T) { 2103 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2104 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2105 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2106 2107 n1.checkQuorum = true 2108 n2.checkQuorum = true 2109 n3.checkQuorum = true 2110 2111 n1.becomeFollower(1, None) 2112 n2.becomeFollower(1, None) 2113 n3.becomeFollower(1, None) 2114 2115 nt := newNetwork(n1, n2, n3) 2116 2117 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 2118 2119 // check state 2120 // n1.state == StateLeader 2121 // n2.state == StateFollower 2122 // n3.state == StateFollower 2123 if n1.state != StateLeader { 2124 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 2125 } 2126 if n2.state != StateFollower { 2127 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 2128 } 2129 if n3.state != StateFollower { 2130 t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower) 2131 } 2132 2133 nt.isolate(3) 2134 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 2135 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 2136 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 2137 n1.preVote = true 2138 n2.preVote = true 2139 n3.preVote = true 2140 nt.recover() 2141 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 2142 2143 // check state 2144 // n1.state == StateLeader 2145 // n2.state == StateFollower 2146 // n3.state == StatePreCandidate 2147 if n1.state != StateLeader { 2148 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 2149 } 2150 if n2.state != StateFollower { 2151 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 2152 } 2153 if n3.state != StatePreCandidate { 2154 t.Fatalf("node 3 state: %s, want %s", n3.state, StatePreCandidate) 2155 } 2156 // check term 2157 // n1.Term == 2 2158 // n2.Term == 2 2159 // n3.Term == 2 2160 if n1.Term != 2 { 2161 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2) 2162 } 2163 if n2.Term != 2 { 2164 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2) 2165 } 2166 if n3.Term != 2 { 2167 t.Fatalf("node 2 term: %d, want %d", n3.Term, 2) 2168 } 2169 2170 // delayed leader heartbeat does not force current leader to step down 2171 nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat}) 2172 if n1.state != StateLeader { 2173 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 2174 } 2175} 2176 2177func TestReadOnlyOptionSafe(t *testing.T) { 2178 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2179 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2180 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2181 2182 nt := newNetwork(a, b, c) 2183 setRandomizedElectionTimeout(b, b.electionTimeout+1) 2184 2185 for i := 0; i < b.electionTimeout; i++ { 2186 b.tick() 2187 } 2188 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 2189 2190 if a.state != StateLeader { 2191 t.Fatalf("state = %s, want %s", a.state, StateLeader) 2192 } 2193 2194 tests := []struct { 2195 sm *raft 2196 proposals int 2197 wri uint64 2198 wctx []byte 2199 }{ 2200 {a, 10, 11, []byte("ctx1")}, 2201 {b, 10, 21, []byte("ctx2")}, 2202 {c, 10, 31, []byte("ctx3")}, 2203 {a, 10, 41, []byte("ctx4")}, 2204 {b, 10, 51, []byte("ctx5")}, 2205 {c, 10, 61, []byte("ctx6")}, 2206 } 2207 2208 for i, tt := range tests { 2209 for j := 0; j < tt.proposals; j++ { 2210 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 2211 } 2212 2213 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) 2214 2215 r := tt.sm 2216 if len(r.readStates) == 0 { 2217 t.Errorf("#%d: len(readStates) = 0, want non-zero", i) 2218 } 2219 rs := r.readStates[0] 2220 if rs.Index != tt.wri { 2221 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri) 2222 } 2223 2224 if !bytes.Equal(rs.RequestCtx, tt.wctx) { 2225 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx) 2226 } 2227 r.readStates = nil 2228 } 2229} 2230 2231func TestReadOnlyWithLearner(t *testing.T) { 2232 a := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 2233 b := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 2234 2235 nt := newNetwork(a, b) 2236 setRandomizedElectionTimeout(b, b.electionTimeout+1) 2237 2238 for i := 0; i < b.electionTimeout; i++ { 2239 b.tick() 2240 } 2241 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 2242 2243 if a.state != StateLeader { 2244 t.Fatalf("state = %s, want %s", a.state, StateLeader) 2245 } 2246 2247 tests := []struct { 2248 sm *raft 2249 proposals int 2250 wri uint64 2251 wctx []byte 2252 }{ 2253 {a, 10, 11, []byte("ctx1")}, 2254 {b, 10, 21, []byte("ctx2")}, 2255 {a, 10, 31, []byte("ctx3")}, 2256 {b, 10, 41, []byte("ctx4")}, 2257 } 2258 2259 for i, tt := range tests { 2260 for j := 0; j < tt.proposals; j++ { 2261 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 2262 } 2263 2264 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) 2265 2266 r := tt.sm 2267 if len(r.readStates) == 0 { 2268 t.Fatalf("#%d: len(readStates) = 0, want non-zero", i) 2269 } 2270 rs := r.readStates[0] 2271 if rs.Index != tt.wri { 2272 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri) 2273 } 2274 2275 if !bytes.Equal(rs.RequestCtx, tt.wctx) { 2276 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx) 2277 } 2278 r.readStates = nil 2279 } 2280} 2281 2282func TestReadOnlyOptionLease(t *testing.T) { 2283 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2284 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2285 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2286 a.readOnly.option = ReadOnlyLeaseBased 2287 b.readOnly.option = ReadOnlyLeaseBased 2288 c.readOnly.option = ReadOnlyLeaseBased 2289 a.checkQuorum = true 2290 b.checkQuorum = true 2291 c.checkQuorum = true 2292 2293 nt := newNetwork(a, b, c) 2294 setRandomizedElectionTimeout(b, b.electionTimeout+1) 2295 2296 for i := 0; i < b.electionTimeout; i++ { 2297 b.tick() 2298 } 2299 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 2300 2301 if a.state != StateLeader { 2302 t.Fatalf("state = %s, want %s", a.state, StateLeader) 2303 } 2304 2305 tests := []struct { 2306 sm *raft 2307 proposals int 2308 wri uint64 2309 wctx []byte 2310 }{ 2311 {a, 10, 11, []byte("ctx1")}, 2312 {b, 10, 21, []byte("ctx2")}, 2313 {c, 10, 31, []byte("ctx3")}, 2314 {a, 10, 41, []byte("ctx4")}, 2315 {b, 10, 51, []byte("ctx5")}, 2316 {c, 10, 61, []byte("ctx6")}, 2317 } 2318 2319 for i, tt := range tests { 2320 for j := 0; j < tt.proposals; j++ { 2321 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 2322 } 2323 2324 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) 2325 2326 r := tt.sm 2327 rs := r.readStates[0] 2328 if rs.Index != tt.wri { 2329 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri) 2330 } 2331 2332 if !bytes.Equal(rs.RequestCtx, tt.wctx) { 2333 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx) 2334 } 2335 r.readStates = nil 2336 } 2337} 2338 2339// TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message 2340// when it commits at least one log entry at it term. 2341func TestReadOnlyForNewLeader(t *testing.T) { 2342 nodeConfigs := []struct { 2343 id uint64 2344 committed uint64 2345 applied uint64 2346 compactIndex uint64 2347 }{ 2348 {1, 1, 1, 0}, 2349 {2, 2, 2, 2}, 2350 {3, 2, 2, 2}, 2351 } 2352 peers := make([]stateMachine, 0) 2353 for _, c := range nodeConfigs { 2354 storage := NewMemoryStorage() 2355 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}) 2356 storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed}) 2357 if c.compactIndex != 0 { 2358 storage.Compact(c.compactIndex) 2359 } 2360 cfg := newTestConfig(c.id, []uint64{1, 2, 3}, 10, 1, storage) 2361 cfg.Applied = c.applied 2362 raft := newRaft(cfg) 2363 peers = append(peers, raft) 2364 } 2365 nt := newNetwork(peers...) 2366 2367 // Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader. 2368 nt.ignore(pb.MsgApp) 2369 // Force peer a to become leader. 2370 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 2371 2372 sm := nt.peers[1].(*raft) 2373 if sm.state != StateLeader { 2374 t.Fatalf("state = %s, want %s", sm.state, StateLeader) 2375 } 2376 2377 // Ensure peer a drops read only request. 2378 var windex uint64 = 4 2379 wctx := []byte("ctx") 2380 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) 2381 if len(sm.readStates) != 0 { 2382 t.Fatalf("len(readStates) = %d, want zero", len(sm.readStates)) 2383 } 2384 2385 nt.recover() 2386 2387 // Force peer a to commit a log entry at its term 2388 for i := 0; i < sm.heartbeatTimeout; i++ { 2389 sm.tick() 2390 } 2391 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 2392 if sm.raftLog.committed != 4 { 2393 t.Fatalf("committed = %d, want 4", sm.raftLog.committed) 2394 } 2395 lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed)) 2396 if lastLogTerm != sm.Term { 2397 t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) 2398 } 2399 2400 // Ensure peer a accepts read only request after it commits a entry at its term. 2401 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}}) 2402 if len(sm.readStates) != 1 { 2403 t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates)) 2404 } 2405 rs := sm.readStates[0] 2406 if rs.Index != windex { 2407 t.Fatalf("readIndex = %d, want %d", rs.Index, windex) 2408 } 2409 if !bytes.Equal(rs.RequestCtx, wctx) { 2410 t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx) 2411 } 2412} 2413 2414func TestLeaderAppResp(t *testing.T) { 2415 // initial progress: match = 0; next = 3 2416 tests := []struct { 2417 index uint64 2418 reject bool 2419 // progress 2420 wmatch uint64 2421 wnext uint64 2422 // message 2423 wmsgNum int 2424 windex uint64 2425 wcommitted uint64 2426 }{ 2427 {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies 2428 {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg 2429 {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index 2430 {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies 2431 } 2432 2433 for i, tt := range tests { 2434 // sm term is 1 after it becomes the leader. 2435 // thus the last log term must be 1 to be committed. 2436 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2437 sm.raftLog = &raftLog{ 2438 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, 2439 unstable: unstable{offset: 3}, 2440 } 2441 sm.becomeCandidate() 2442 sm.becomeLeader() 2443 sm.readMessages() 2444 sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) 2445 2446 p := sm.prs.Progress[2] 2447 if p.Match != tt.wmatch { 2448 t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) 2449 } 2450 if p.Next != tt.wnext { 2451 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) 2452 } 2453 2454 msgs := sm.readMessages() 2455 2456 if len(msgs) != tt.wmsgNum { 2457 t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum) 2458 } 2459 for j, msg := range msgs { 2460 if msg.Index != tt.windex { 2461 t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex) 2462 } 2463 if msg.Commit != tt.wcommitted { 2464 t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted) 2465 } 2466 } 2467 } 2468} 2469 2470// When the leader receives a heartbeat tick, it should 2471// send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries. 2472func TestBcastBeat(t *testing.T) { 2473 offset := uint64(1000) 2474 // make a state machine with log.offset = 1000 2475 s := pb.Snapshot{ 2476 Metadata: pb.SnapshotMetadata{ 2477 Index: offset, 2478 Term: 1, 2479 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, 2480 }, 2481 } 2482 storage := NewMemoryStorage() 2483 storage.ApplySnapshot(s) 2484 sm := newTestRaft(1, nil, 10, 1, storage) 2485 sm.Term = 1 2486 2487 sm.becomeCandidate() 2488 sm.becomeLeader() 2489 for i := 0; i < 10; i++ { 2490 mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) 2491 } 2492 // slow follower 2493 sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6 2494 // normal follower 2495 sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 2496 2497 sm.Step(pb.Message{Type: pb.MsgBeat}) 2498 msgs := sm.readMessages() 2499 if len(msgs) != 2 { 2500 t.Fatalf("len(msgs) = %v, want 2", len(msgs)) 2501 } 2502 wantCommitMap := map[uint64]uint64{ 2503 2: min(sm.raftLog.committed, sm.prs.Progress[2].Match), 2504 3: min(sm.raftLog.committed, sm.prs.Progress[3].Match), 2505 } 2506 for i, m := range msgs { 2507 if m.Type != pb.MsgHeartbeat { 2508 t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat) 2509 } 2510 if m.Index != 0 { 2511 t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0) 2512 } 2513 if m.LogTerm != 0 { 2514 t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) 2515 } 2516 if wantCommitMap[m.To] == 0 { 2517 t.Fatalf("#%d: unexpected to %d", i, m.To) 2518 } else { 2519 if m.Commit != wantCommitMap[m.To] { 2520 t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) 2521 } 2522 delete(wantCommitMap, m.To) 2523 } 2524 if len(m.Entries) != 0 { 2525 t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) 2526 } 2527 } 2528} 2529 2530// tests the output of the state machine when receiving MsgBeat 2531func TestRecvMsgBeat(t *testing.T) { 2532 tests := []struct { 2533 state StateType 2534 wMsg int 2535 }{ 2536 {StateLeader, 2}, 2537 // candidate and follower should ignore MsgBeat 2538 {StateCandidate, 0}, 2539 {StateFollower, 0}, 2540 } 2541 2542 for i, tt := range tests { 2543 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 2544 sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} 2545 sm.Term = 1 2546 sm.state = tt.state 2547 switch tt.state { 2548 case StateFollower: 2549 sm.step = stepFollower 2550 case StateCandidate: 2551 sm.step = stepCandidate 2552 case StateLeader: 2553 sm.step = stepLeader 2554 } 2555 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 2556 2557 msgs := sm.readMessages() 2558 if len(msgs) != tt.wMsg { 2559 t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg) 2560 } 2561 for _, m := range msgs { 2562 if m.Type != pb.MsgHeartbeat { 2563 t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat) 2564 } 2565 } 2566 } 2567} 2568 2569func TestLeaderIncreaseNext(t *testing.T) { 2570 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 2571 tests := []struct { 2572 // progress 2573 state tracker.StateType 2574 next uint64 2575 2576 wnext uint64 2577 }{ 2578 // state replicate, optimistically increase next 2579 // previous entries + noop entry + propose + 1 2580 {tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, 2581 // state probe, not optimistically increase next 2582 {tracker.StateProbe, 2, 2}, 2583 } 2584 2585 for i, tt := range tests { 2586 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 2587 sm.raftLog.append(previousEnts...) 2588 sm.becomeCandidate() 2589 sm.becomeLeader() 2590 sm.prs.Progress[2].State = tt.state 2591 sm.prs.Progress[2].Next = tt.next 2592 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 2593 2594 p := sm.prs.Progress[2] 2595 if p.Next != tt.wnext { 2596 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) 2597 } 2598 } 2599} 2600 2601func TestSendAppendForProgressProbe(t *testing.T) { 2602 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 2603 r.becomeCandidate() 2604 r.becomeLeader() 2605 r.readMessages() 2606 r.prs.Progress[2].BecomeProbe() 2607 2608 // each round is a heartbeat 2609 for i := 0; i < 3; i++ { 2610 if i == 0 { 2611 // we expect that raft will only send out one msgAPP on the first 2612 // loop. After that, the follower is paused until a heartbeat response is 2613 // received. 2614 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) 2615 r.sendAppend(2) 2616 msg := r.readMessages() 2617 if len(msg) != 1 { 2618 t.Errorf("len(msg) = %d, want %d", len(msg), 1) 2619 } 2620 if msg[0].Index != 0 { 2621 t.Errorf("index = %d, want %d", msg[0].Index, 0) 2622 } 2623 } 2624 2625 if !r.prs.Progress[2].ProbeSent { 2626 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) 2627 } 2628 for j := 0; j < 10; j++ { 2629 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) 2630 r.sendAppend(2) 2631 if l := len(r.readMessages()); l != 0 { 2632 t.Errorf("len(msg) = %d, want %d", l, 0) 2633 } 2634 } 2635 2636 // do a heartbeat 2637 for j := 0; j < r.heartbeatTimeout; j++ { 2638 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 2639 } 2640 if !r.prs.Progress[2].ProbeSent { 2641 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) 2642 } 2643 2644 // consume the heartbeat 2645 msg := r.readMessages() 2646 if len(msg) != 1 { 2647 t.Errorf("len(msg) = %d, want %d", len(msg), 1) 2648 } 2649 if msg[0].Type != pb.MsgHeartbeat { 2650 t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat) 2651 } 2652 } 2653 2654 // a heartbeat response will allow another message to be sent 2655 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) 2656 msg := r.readMessages() 2657 if len(msg) != 1 { 2658 t.Errorf("len(msg) = %d, want %d", len(msg), 1) 2659 } 2660 if msg[0].Index != 0 { 2661 t.Errorf("index = %d, want %d", msg[0].Index, 0) 2662 } 2663 if !r.prs.Progress[2].ProbeSent { 2664 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent) 2665 } 2666} 2667 2668func TestSendAppendForProgressReplicate(t *testing.T) { 2669 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 2670 r.becomeCandidate() 2671 r.becomeLeader() 2672 r.readMessages() 2673 r.prs.Progress[2].BecomeReplicate() 2674 2675 for i := 0; i < 10; i++ { 2676 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) 2677 r.sendAppend(2) 2678 msgs := r.readMessages() 2679 if len(msgs) != 1 { 2680 t.Errorf("len(msg) = %d, want %d", len(msgs), 1) 2681 } 2682 } 2683} 2684 2685func TestSendAppendForProgressSnapshot(t *testing.T) { 2686 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 2687 r.becomeCandidate() 2688 r.becomeLeader() 2689 r.readMessages() 2690 r.prs.Progress[2].BecomeSnapshot(10) 2691 2692 for i := 0; i < 10; i++ { 2693 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) 2694 r.sendAppend(2) 2695 msgs := r.readMessages() 2696 if len(msgs) != 0 { 2697 t.Errorf("len(msg) = %d, want %d", len(msgs), 0) 2698 } 2699 } 2700} 2701 2702func TestRecvMsgUnreachable(t *testing.T) { 2703 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 2704 s := NewMemoryStorage() 2705 s.Append(previousEnts) 2706 r := newTestRaft(1, []uint64{1, 2}, 10, 1, s) 2707 r.becomeCandidate() 2708 r.becomeLeader() 2709 r.readMessages() 2710 // set node 2 to state replicate 2711 r.prs.Progress[2].Match = 3 2712 r.prs.Progress[2].BecomeReplicate() 2713 r.prs.Progress[2].OptimisticUpdate(5) 2714 2715 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) 2716 2717 if r.prs.Progress[2].State != tracker.StateProbe { 2718 t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe) 2719 } 2720 if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext { 2721 t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext) 2722 } 2723} 2724 2725func TestRestore(t *testing.T) { 2726 s := pb.Snapshot{ 2727 Metadata: pb.SnapshotMetadata{ 2728 Index: 11, // magic number 2729 Term: 11, // magic number 2730 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, 2731 }, 2732 } 2733 2734 storage := NewMemoryStorage() 2735 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) 2736 if ok := sm.restore(s); !ok { 2737 t.Fatal("restore fail, want succeed") 2738 } 2739 2740 if sm.raftLog.lastIndex() != s.Metadata.Index { 2741 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) 2742 } 2743 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { 2744 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) 2745 } 2746 sg := sm.prs.VoterNodes() 2747 if !reflect.DeepEqual(sg, s.Metadata.ConfState.Voters) { 2748 t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters) 2749 } 2750 2751 if ok := sm.restore(s); ok { 2752 t.Fatal("restore succeed, want fail") 2753 } 2754} 2755 2756// TestRestoreWithLearner restores a snapshot which contains learners. 2757func TestRestoreWithLearner(t *testing.T) { 2758 s := pb.Snapshot{ 2759 Metadata: pb.SnapshotMetadata{ 2760 Index: 11, // magic number 2761 Term: 11, // magic number 2762 ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}}, 2763 }, 2764 } 2765 2766 storage := NewMemoryStorage() 2767 sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 8, 2, storage) 2768 if ok := sm.restore(s); !ok { 2769 t.Error("restore fail, want succeed") 2770 } 2771 2772 if sm.raftLog.lastIndex() != s.Metadata.Index { 2773 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) 2774 } 2775 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { 2776 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) 2777 } 2778 sg := sm.prs.VoterNodes() 2779 if len(sg) != len(s.Metadata.ConfState.Voters) { 2780 t.Errorf("sm.Voters = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Voters) 2781 } 2782 lns := sm.prs.LearnerNodes() 2783 if len(lns) != len(s.Metadata.ConfState.Learners) { 2784 t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) 2785 } 2786 for _, n := range s.Metadata.ConfState.Voters { 2787 if sm.prs.Progress[n].IsLearner { 2788 t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false) 2789 } 2790 } 2791 for _, n := range s.Metadata.ConfState.Learners { 2792 if !sm.prs.Progress[n].IsLearner { 2793 t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true) 2794 } 2795 } 2796 2797 if ok := sm.restore(s); ok { 2798 t.Error("restore succeed, want fail") 2799 } 2800} 2801 2802// TestRestoreVoterToLearner verifies that a normal peer can be downgraded to a 2803// learner through a snapshot. At the time of writing, we don't allow 2804// configuration changes to do this directly, but note that the snapshot may 2805// compress multiple changes to the configuration into one: the voter could have 2806// been removed, then readded as a learner and the snapshot reflects both 2807// changes. In that case, a voter receives a snapshot telling it that it is now 2808// a learner. In fact, the node has to accept that snapshot, or it is 2809// permanently cut off from the Raft log. 2810func TestRestoreVoterToLearner(t *testing.T) { 2811 s := pb.Snapshot{ 2812 Metadata: pb.SnapshotMetadata{ 2813 Index: 11, // magic number 2814 Term: 11, // magic number 2815 ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}}, 2816 }, 2817 } 2818 2819 storage := NewMemoryStorage() 2820 sm := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, storage) 2821 2822 if sm.isLearner { 2823 t.Errorf("%x is learner, want not", sm.id) 2824 } 2825 if ok := sm.restore(s); !ok { 2826 t.Error("restore failed unexpectedly") 2827 } 2828} 2829 2830// TestRestoreLearnerPromotion checks that a learner can become to a follower after 2831// restoring snapshot. 2832func TestRestoreLearnerPromotion(t *testing.T) { 2833 s := pb.Snapshot{ 2834 Metadata: pb.SnapshotMetadata{ 2835 Index: 11, // magic number 2836 Term: 11, // magic number 2837 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}}, 2838 }, 2839 } 2840 2841 storage := NewMemoryStorage() 2842 sm := newTestLearnerRaft(3, []uint64{1, 2}, []uint64{3}, 10, 1, storage) 2843 2844 if !sm.isLearner { 2845 t.Errorf("%x is not learner, want yes", sm.id) 2846 } 2847 2848 if ok := sm.restore(s); !ok { 2849 t.Error("restore fail, want succeed") 2850 } 2851 2852 if sm.isLearner { 2853 t.Errorf("%x is learner, want not", sm.id) 2854 } 2855} 2856 2857// TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader 2858func TestLearnerReceiveSnapshot(t *testing.T) { 2859 // restore the state machine from a snapshot so it has a compacted log and a snapshot 2860 s := pb.Snapshot{ 2861 Metadata: pb.SnapshotMetadata{ 2862 Index: 11, // magic number 2863 Term: 11, // magic number 2864 ConfState: pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}, 2865 }, 2866 } 2867 2868 n1 := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 2869 n2 := newTestLearnerRaft(2, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 2870 2871 n1.restore(s) 2872 2873 // Force set n1 appplied index. 2874 n1.raftLog.appliedTo(n1.raftLog.committed) 2875 2876 nt := newNetwork(n1, n2) 2877 2878 setRandomizedElectionTimeout(n1, n1.electionTimeout) 2879 for i := 0; i < n1.electionTimeout; i++ { 2880 n1.tick() 2881 } 2882 2883 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 2884 2885 if n2.raftLog.committed != n1.raftLog.committed { 2886 t.Errorf("peer 2 must commit to %d, but %d", n1.raftLog.committed, n2.raftLog.committed) 2887 } 2888} 2889 2890func TestRestoreIgnoreSnapshot(t *testing.T) { 2891 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 2892 commit := uint64(1) 2893 storage := NewMemoryStorage() 2894 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) 2895 sm.raftLog.append(previousEnts...) 2896 sm.raftLog.commitTo(commit) 2897 2898 s := pb.Snapshot{ 2899 Metadata: pb.SnapshotMetadata{ 2900 Index: commit, 2901 Term: 1, 2902 ConfState: pb.ConfState{Voters: []uint64{1, 2}}, 2903 }, 2904 } 2905 2906 // ignore snapshot 2907 if ok := sm.restore(s); ok { 2908 t.Errorf("restore = %t, want %t", ok, false) 2909 } 2910 if sm.raftLog.committed != commit { 2911 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit) 2912 } 2913 2914 // ignore snapshot and fast forward commit 2915 s.Metadata.Index = commit + 1 2916 if ok := sm.restore(s); ok { 2917 t.Errorf("restore = %t, want %t", ok, false) 2918 } 2919 if sm.raftLog.committed != commit+1 { 2920 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1) 2921 } 2922} 2923 2924func TestProvideSnap(t *testing.T) { 2925 // restore the state machine from a snapshot so it has a compacted log and a snapshot 2926 s := pb.Snapshot{ 2927 Metadata: pb.SnapshotMetadata{ 2928 Index: 11, // magic number 2929 Term: 11, // magic number 2930 ConfState: pb.ConfState{Voters: []uint64{1, 2}}, 2931 }, 2932 } 2933 storage := NewMemoryStorage() 2934 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 2935 sm.restore(s) 2936 2937 sm.becomeCandidate() 2938 sm.becomeLeader() 2939 2940 // force set the next of node 2, so that node 2 needs a snapshot 2941 sm.prs.Progress[2].Next = sm.raftLog.firstIndex() 2942 sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true}) 2943 2944 msgs := sm.readMessages() 2945 if len(msgs) != 1 { 2946 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 2947 } 2948 m := msgs[0] 2949 if m.Type != pb.MsgSnap { 2950 t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap) 2951 } 2952} 2953 2954func TestIgnoreProvidingSnap(t *testing.T) { 2955 // restore the state machine from a snapshot so it has a compacted log and a snapshot 2956 s := pb.Snapshot{ 2957 Metadata: pb.SnapshotMetadata{ 2958 Index: 11, // magic number 2959 Term: 11, // magic number 2960 ConfState: pb.ConfState{Voters: []uint64{1, 2}}, 2961 }, 2962 } 2963 storage := NewMemoryStorage() 2964 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 2965 sm.restore(s) 2966 2967 sm.becomeCandidate() 2968 sm.becomeLeader() 2969 2970 // force set the next of node 2, so that node 2 needs a snapshot 2971 // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 2972 sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1 2973 sm.prs.Progress[2].RecentActive = false 2974 2975 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 2976 2977 msgs := sm.readMessages() 2978 if len(msgs) != 0 { 2979 t.Errorf("len(msgs) = %d, want 0", len(msgs)) 2980 } 2981} 2982 2983func TestRestoreFromSnapMsg(t *testing.T) { 2984 s := pb.Snapshot{ 2985 Metadata: pb.SnapshotMetadata{ 2986 Index: 11, // magic number 2987 Term: 11, // magic number 2988 ConfState: pb.ConfState{Voters: []uint64{1, 2}}, 2989 }, 2990 } 2991 m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} 2992 2993 sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 2994 sm.Step(m) 2995 2996 if sm.lead != uint64(1) { 2997 t.Errorf("sm.lead = %d, want 1", sm.lead) 2998 } 2999 3000 // TODO(bdarnell): what should this test? 3001} 3002 3003func TestSlowNodeRestore(t *testing.T) { 3004 nt := newNetwork(nil, nil, nil) 3005 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3006 3007 nt.isolate(3) 3008 for j := 0; j <= 100; j++ { 3009 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3010 } 3011 lead := nt.peers[1].(*raft) 3012 nextEnts(lead, nt.storage[1]) 3013 nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil) 3014 nt.storage[1].Compact(lead.raftLog.applied) 3015 3016 nt.recover() 3017 // send heartbeats so that the leader can learn everyone is active. 3018 // node 3 will only be considered as active when node 1 receives a reply from it. 3019 for { 3020 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 3021 if lead.prs.Progress[3].RecentActive { 3022 break 3023 } 3024 } 3025 3026 // trigger a snapshot 3027 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3028 3029 follower := nt.peers[3].(*raft) 3030 3031 // trigger a commit 3032 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3033 if follower.raftLog.committed != lead.raftLog.committed { 3034 t.Errorf("follower.committed = %d, want %d", follower.raftLog.committed, lead.raftLog.committed) 3035 } 3036} 3037 3038// TestStepConfig tests that when raft step msgProp in EntryConfChange type, 3039// it appends the entry to log and sets pendingConf to be true. 3040func TestStepConfig(t *testing.T) { 3041 // a raft that cannot make progress 3042 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 3043 r.becomeCandidate() 3044 r.becomeLeader() 3045 index := r.raftLog.lastIndex() 3046 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 3047 if g := r.raftLog.lastIndex(); g != index+1 { 3048 t.Errorf("index = %d, want %d", g, index+1) 3049 } 3050 if r.pendingConfIndex != index+1 { 3051 t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1) 3052 } 3053} 3054 3055// TestStepIgnoreConfig tests that if raft step the second msgProp in 3056// EntryConfChange type when the first one is uncommitted, the node will set 3057// the proposal to noop and keep its original state. 3058func TestStepIgnoreConfig(t *testing.T) { 3059 // a raft that cannot make progress 3060 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 3061 r.becomeCandidate() 3062 r.becomeLeader() 3063 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 3064 index := r.raftLog.lastIndex() 3065 pendingConfIndex := r.pendingConfIndex 3066 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 3067 wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} 3068 ents, err := r.raftLog.entries(index+1, noLimit) 3069 if err != nil { 3070 t.Fatalf("unexpected error %v", err) 3071 } 3072 if !reflect.DeepEqual(ents, wents) { 3073 t.Errorf("ents = %+v, want %+v", ents, wents) 3074 } 3075 if r.pendingConfIndex != pendingConfIndex { 3076 t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex) 3077 } 3078} 3079 3080// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex 3081// based on uncommitted entries. 3082func TestNewLeaderPendingConfig(t *testing.T) { 3083 tests := []struct { 3084 addEntry bool 3085 wpendingIndex uint64 3086 }{ 3087 {false, 0}, 3088 {true, 1}, 3089 } 3090 for i, tt := range tests { 3091 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 3092 if tt.addEntry { 3093 mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal}) 3094 } 3095 r.becomeCandidate() 3096 r.becomeLeader() 3097 if r.pendingConfIndex != tt.wpendingIndex { 3098 t.Errorf("#%d: pendingConfIndex = %d, want %d", 3099 i, r.pendingConfIndex, tt.wpendingIndex) 3100 } 3101 } 3102} 3103 3104// TestAddNode tests that addNode could update nodes correctly. 3105func TestAddNode(t *testing.T) { 3106 r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 3107 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) 3108 nodes := r.prs.VoterNodes() 3109 wnodes := []uint64{1, 2} 3110 if !reflect.DeepEqual(nodes, wnodes) { 3111 t.Errorf("nodes = %v, want %v", nodes, wnodes) 3112 } 3113} 3114 3115// TestAddLearner tests that addLearner could update nodes correctly. 3116func TestAddLearner(t *testing.T) { 3117 r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 3118 // Add new learner peer. 3119 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) 3120 if r.isLearner { 3121 t.Fatal("expected 1 to be voter") 3122 } 3123 nodes := r.prs.LearnerNodes() 3124 wnodes := []uint64{2} 3125 if !reflect.DeepEqual(nodes, wnodes) { 3126 t.Errorf("nodes = %v, want %v", nodes, wnodes) 3127 } 3128 if !r.prs.Progress[2].IsLearner { 3129 t.Fatal("expected 2 to be learner") 3130 } 3131 3132 // Promote peer to voter. 3133 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) 3134 if r.prs.Progress[2].IsLearner { 3135 t.Fatal("expected 2 to be voter") 3136 } 3137 3138 // Demote r. 3139 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2()) 3140 if !r.prs.Progress[1].IsLearner { 3141 t.Fatal("expected 1 to be learner") 3142 } 3143 if !r.isLearner { 3144 t.Fatal("expected 1 to be learner") 3145 } 3146 3147 // Promote r again. 3148 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2()) 3149 if r.prs.Progress[1].IsLearner { 3150 t.Fatal("expected 1 to be voter") 3151 } 3152 if r.isLearner { 3153 t.Fatal("expected 1 to be voter") 3154 } 3155} 3156 3157// TestAddNodeCheckQuorum tests that addNode does not trigger a leader election 3158// immediately when checkQuorum is set. 3159func TestAddNodeCheckQuorum(t *testing.T) { 3160 r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 3161 r.checkQuorum = true 3162 3163 r.becomeCandidate() 3164 r.becomeLeader() 3165 3166 for i := 0; i < r.electionTimeout-1; i++ { 3167 r.tick() 3168 } 3169 3170 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2()) 3171 3172 // This tick will reach electionTimeout, which triggers a quorum check. 3173 r.tick() 3174 3175 // Node 1 should still be the leader after a single tick. 3176 if r.state != StateLeader { 3177 t.Errorf("state = %v, want %v", r.state, StateLeader) 3178 } 3179 3180 // After another electionTimeout ticks without hearing from node 2, 3181 // node 1 should step down. 3182 for i := 0; i < r.electionTimeout; i++ { 3183 r.tick() 3184 } 3185 3186 if r.state != StateFollower { 3187 t.Errorf("state = %v, want %v", r.state, StateFollower) 3188 } 3189} 3190 3191// TestRemoveNode tests that removeNode could update nodes and 3192// and removed list correctly. 3193func TestRemoveNode(t *testing.T) { 3194 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 3195 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) 3196 w := []uint64{1} 3197 if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { 3198 t.Errorf("nodes = %v, want %v", g, w) 3199 } 3200 3201 // Removing the remaining voter will panic. 3202 defer func() { 3203 if r := recover(); r == nil { 3204 t.Error("did not panic") 3205 } 3206 }() 3207 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) 3208} 3209 3210// TestRemoveLearner tests that removeNode could update nodes and 3211// and removed list correctly. 3212func TestRemoveLearner(t *testing.T) { 3213 r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) 3214 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2()) 3215 w := []uint64{1} 3216 if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) { 3217 t.Errorf("nodes = %v, want %v", g, w) 3218 } 3219 3220 w = nil 3221 if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) { 3222 t.Errorf("nodes = %v, want %v", g, w) 3223 } 3224 3225 // Removing the remaining voter will panic. 3226 defer func() { 3227 if r := recover(); r == nil { 3228 t.Error("did not panic") 3229 } 3230 }() 3231 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2()) 3232} 3233 3234func TestPromotable(t *testing.T) { 3235 id := uint64(1) 3236 tests := []struct { 3237 peers []uint64 3238 wp bool 3239 }{ 3240 {[]uint64{1}, true}, 3241 {[]uint64{1, 2, 3}, true}, 3242 {[]uint64{}, false}, 3243 {[]uint64{2, 3}, false}, 3244 } 3245 for i, tt := range tests { 3246 r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage()) 3247 if g := r.promotable(); g != tt.wp { 3248 t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp) 3249 } 3250 } 3251} 3252 3253func TestRaftNodes(t *testing.T) { 3254 tests := []struct { 3255 ids []uint64 3256 wids []uint64 3257 }{ 3258 { 3259 []uint64{1, 2, 3}, 3260 []uint64{1, 2, 3}, 3261 }, 3262 { 3263 []uint64{3, 2, 1}, 3264 []uint64{1, 2, 3}, 3265 }, 3266 } 3267 for i, tt := range tests { 3268 r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) 3269 if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) { 3270 t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids) 3271 } 3272 } 3273} 3274 3275func TestCampaignWhileLeader(t *testing.T) { 3276 testCampaignWhileLeader(t, false) 3277} 3278 3279func TestPreCampaignWhileLeader(t *testing.T) { 3280 testCampaignWhileLeader(t, true) 3281} 3282 3283func testCampaignWhileLeader(t *testing.T, preVote bool) { 3284 cfg := newTestConfig(1, []uint64{1}, 5, 1, NewMemoryStorage()) 3285 cfg.PreVote = preVote 3286 r := newRaft(cfg) 3287 if r.state != StateFollower { 3288 t.Errorf("expected new node to be follower but got %s", r.state) 3289 } 3290 // We don't call campaign() directly because it comes after the check 3291 // for our current state. 3292 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3293 if r.state != StateLeader { 3294 t.Errorf("expected single-node election to become leader but got %s", r.state) 3295 } 3296 term := r.Term 3297 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3298 if r.state != StateLeader { 3299 t.Errorf("expected to remain leader but got %s", r.state) 3300 } 3301 if r.Term != term { 3302 t.Errorf("expected to remain in term %v but got %v", term, r.Term) 3303 } 3304} 3305 3306// TestCommitAfterRemoveNode verifies that pending commands can become 3307// committed when a config change reduces the quorum requirements. 3308func TestCommitAfterRemoveNode(t *testing.T) { 3309 // Create a cluster with two nodes. 3310 s := NewMemoryStorage() 3311 r := newTestRaft(1, []uint64{1, 2}, 5, 1, s) 3312 r.becomeCandidate() 3313 r.becomeLeader() 3314 3315 // Begin to remove the second node. 3316 cc := pb.ConfChange{ 3317 Type: pb.ConfChangeRemoveNode, 3318 NodeID: 2, 3319 } 3320 ccData, err := cc.Marshal() 3321 if err != nil { 3322 t.Fatal(err) 3323 } 3324 r.Step(pb.Message{ 3325 Type: pb.MsgProp, 3326 Entries: []pb.Entry{ 3327 {Type: pb.EntryConfChange, Data: ccData}, 3328 }, 3329 }) 3330 // Stabilize the log and make sure nothing is committed yet. 3331 if ents := nextEnts(r, s); len(ents) > 0 { 3332 t.Fatalf("unexpected committed entries: %v", ents) 3333 } 3334 ccIndex := r.raftLog.lastIndex() 3335 3336 // While the config change is pending, make another proposal. 3337 r.Step(pb.Message{ 3338 Type: pb.MsgProp, 3339 Entries: []pb.Entry{ 3340 {Type: pb.EntryNormal, Data: []byte("hello")}, 3341 }, 3342 }) 3343 3344 // Node 2 acknowledges the config change, committing it. 3345 r.Step(pb.Message{ 3346 Type: pb.MsgAppResp, 3347 From: 2, 3348 Index: ccIndex, 3349 }) 3350 ents := nextEnts(r, s) 3351 if len(ents) != 2 { 3352 t.Fatalf("expected two committed entries, got %v", ents) 3353 } 3354 if ents[0].Type != pb.EntryNormal || ents[0].Data != nil { 3355 t.Fatalf("expected ents[0] to be empty, but got %v", ents[0]) 3356 } 3357 if ents[1].Type != pb.EntryConfChange { 3358 t.Fatalf("expected ents[1] to be EntryConfChange, got %v", ents[1]) 3359 } 3360 3361 // Apply the config change. This reduces quorum requirements so the 3362 // pending command can now commit. 3363 r.applyConfChange(cc.AsV2()) 3364 ents = nextEnts(r, s) 3365 if len(ents) != 1 || ents[0].Type != pb.EntryNormal || 3366 string(ents[0].Data) != "hello" { 3367 t.Fatalf("expected one committed EntryNormal, got %v", ents) 3368 } 3369} 3370 3371// TestLeaderTransferToUpToDateNode verifies transferring should succeed 3372// if the transferee has the most up-to-date log entries when transfer starts. 3373func TestLeaderTransferToUpToDateNode(t *testing.T) { 3374 nt := newNetwork(nil, nil, nil) 3375 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3376 3377 lead := nt.peers[1].(*raft) 3378 3379 if lead.lead != 1 { 3380 t.Fatalf("after election leader is %x, want 1", lead.lead) 3381 } 3382 3383 // Transfer leadership to 2. 3384 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) 3385 3386 checkLeaderTransferState(t, lead, StateFollower, 2) 3387 3388 // After some log replication, transfer leadership back to 1. 3389 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3390 3391 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader}) 3392 3393 checkLeaderTransferState(t, lead, StateLeader, 1) 3394} 3395 3396// TestLeaderTransferToUpToDateNodeFromFollower verifies transferring should succeed 3397// if the transferee has the most up-to-date log entries when transfer starts. 3398// Not like TestLeaderTransferToUpToDateNode, where the leader transfer message 3399// is sent to the leader, in this test case every leader transfer message is sent 3400// to the follower. 3401func TestLeaderTransferToUpToDateNodeFromFollower(t *testing.T) { 3402 nt := newNetwork(nil, nil, nil) 3403 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3404 3405 lead := nt.peers[1].(*raft) 3406 3407 if lead.lead != 1 { 3408 t.Fatalf("after election leader is %x, want 1", lead.lead) 3409 } 3410 3411 // Transfer leadership to 2. 3412 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgTransferLeader}) 3413 3414 checkLeaderTransferState(t, lead, StateFollower, 2) 3415 3416 // After some log replication, transfer leadership back to 1. 3417 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3418 3419 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader}) 3420 3421 checkLeaderTransferState(t, lead, StateLeader, 1) 3422} 3423 3424// TestLeaderTransferWithCheckQuorum ensures transferring leader still works 3425// even the current leader is still under its leader lease 3426func TestLeaderTransferWithCheckQuorum(t *testing.T) { 3427 nt := newNetwork(nil, nil, nil) 3428 for i := 1; i < 4; i++ { 3429 r := nt.peers[uint64(i)].(*raft) 3430 r.checkQuorum = true 3431 setRandomizedElectionTimeout(r, r.electionTimeout+i) 3432 } 3433 3434 // Letting peer 2 electionElapsed reach to timeout so that it can vote for peer 1 3435 f := nt.peers[2].(*raft) 3436 for i := 0; i < f.electionTimeout; i++ { 3437 f.tick() 3438 } 3439 3440 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3441 3442 lead := nt.peers[1].(*raft) 3443 3444 if lead.lead != 1 { 3445 t.Fatalf("after election leader is %x, want 1", lead.lead) 3446 } 3447 3448 // Transfer leadership to 2. 3449 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) 3450 3451 checkLeaderTransferState(t, lead, StateFollower, 2) 3452 3453 // After some log replication, transfer leadership back to 1. 3454 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3455 3456 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader}) 3457 3458 checkLeaderTransferState(t, lead, StateLeader, 1) 3459} 3460 3461func TestLeaderTransferToSlowFollower(t *testing.T) { 3462 defaultLogger.EnableDebug() 3463 nt := newNetwork(nil, nil, nil) 3464 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3465 3466 nt.isolate(3) 3467 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3468 3469 nt.recover() 3470 lead := nt.peers[1].(*raft) 3471 if lead.prs.Progress[3].Match != 1 { 3472 t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1) 3473 } 3474 3475 // Transfer leadership to 3 when node 3 is lack of log. 3476 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3477 3478 checkLeaderTransferState(t, lead, StateFollower, 3) 3479} 3480 3481func TestLeaderTransferAfterSnapshot(t *testing.T) { 3482 nt := newNetwork(nil, nil, nil) 3483 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3484 3485 nt.isolate(3) 3486 3487 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3488 lead := nt.peers[1].(*raft) 3489 nextEnts(lead, nt.storage[1]) 3490 nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil) 3491 nt.storage[1].Compact(lead.raftLog.applied) 3492 3493 nt.recover() 3494 if lead.prs.Progress[3].Match != 1 { 3495 t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1) 3496 } 3497 3498 // Transfer leadership to 3 when node 3 is lack of snapshot. 3499 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3500 // Send pb.MsgHeartbeatResp to leader to trigger a snapshot for node 3. 3501 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgHeartbeatResp}) 3502 3503 checkLeaderTransferState(t, lead, StateFollower, 3) 3504} 3505 3506func TestLeaderTransferToSelf(t *testing.T) { 3507 nt := newNetwork(nil, nil, nil) 3508 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3509 3510 lead := nt.peers[1].(*raft) 3511 3512 // Transfer leadership to self, there will be noop. 3513 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader}) 3514 checkLeaderTransferState(t, lead, StateLeader, 1) 3515} 3516 3517func TestLeaderTransferToNonExistingNode(t *testing.T) { 3518 nt := newNetwork(nil, nil, nil) 3519 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3520 3521 lead := nt.peers[1].(*raft) 3522 // Transfer leadership to non-existing node, there will be noop. 3523 nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader}) 3524 checkLeaderTransferState(t, lead, StateLeader, 1) 3525} 3526 3527func TestLeaderTransferTimeout(t *testing.T) { 3528 nt := newNetwork(nil, nil, nil) 3529 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3530 3531 nt.isolate(3) 3532 3533 lead := nt.peers[1].(*raft) 3534 3535 // Transfer leadership to isolated node, wait for timeout. 3536 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3537 if lead.leadTransferee != 3 { 3538 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3539 } 3540 for i := 0; i < lead.heartbeatTimeout; i++ { 3541 lead.tick() 3542 } 3543 if lead.leadTransferee != 3 { 3544 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3545 } 3546 3547 for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ { 3548 lead.tick() 3549 } 3550 3551 checkLeaderTransferState(t, lead, StateLeader, 1) 3552} 3553 3554func TestLeaderTransferIgnoreProposal(t *testing.T) { 3555 nt := newNetwork(nil, nil, nil) 3556 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3557 3558 nt.isolate(3) 3559 3560 lead := nt.peers[1].(*raft) 3561 3562 // Transfer leadership to isolated node to let transfer pending, then send proposal. 3563 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3564 if lead.leadTransferee != 3 { 3565 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3566 } 3567 3568 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3569 err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 3570 if err != ErrProposalDropped { 3571 t.Fatalf("should return drop proposal error while transferring") 3572 } 3573 3574 if lead.prs.Progress[1].Match != 1 { 3575 t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1) 3576 } 3577} 3578 3579func TestLeaderTransferReceiveHigherTermVote(t *testing.T) { 3580 nt := newNetwork(nil, nil, nil) 3581 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3582 3583 nt.isolate(3) 3584 3585 lead := nt.peers[1].(*raft) 3586 3587 // Transfer leadership to isolated node to let transfer pending. 3588 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3589 if lead.leadTransferee != 3 { 3590 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3591 } 3592 3593 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2}) 3594 3595 checkLeaderTransferState(t, lead, StateFollower, 2) 3596} 3597 3598func TestLeaderTransferRemoveNode(t *testing.T) { 3599 nt := newNetwork(nil, nil, nil) 3600 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3601 3602 nt.ignore(pb.MsgTimeoutNow) 3603 3604 lead := nt.peers[1].(*raft) 3605 3606 // The leadTransferee is removed when leadship transferring. 3607 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3608 if lead.leadTransferee != 3 { 3609 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3610 } 3611 3612 lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2()) 3613 3614 checkLeaderTransferState(t, lead, StateLeader, 1) 3615} 3616 3617func TestLeaderTransferDemoteNode(t *testing.T) { 3618 nt := newNetwork(nil, nil, nil) 3619 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3620 3621 nt.ignore(pb.MsgTimeoutNow) 3622 3623 lead := nt.peers[1].(*raft) 3624 3625 // The leadTransferee is demoted when leadship transferring. 3626 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3627 if lead.leadTransferee != 3 { 3628 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3629 } 3630 3631 lead.applyConfChange(pb.ConfChangeV2{ 3632 Changes: []pb.ConfChangeSingle{ 3633 { 3634 Type: pb.ConfChangeRemoveNode, 3635 NodeID: 3, 3636 }, 3637 { 3638 Type: pb.ConfChangeAddLearnerNode, 3639 NodeID: 3, 3640 }, 3641 }, 3642 }) 3643 3644 // Make the Raft group commit the LeaveJoint entry. 3645 lead.applyConfChange(pb.ConfChangeV2{}) 3646 checkLeaderTransferState(t, lead, StateLeader, 1) 3647} 3648 3649// TestLeaderTransferBack verifies leadership can transfer back to self when last transfer is pending. 3650func TestLeaderTransferBack(t *testing.T) { 3651 nt := newNetwork(nil, nil, nil) 3652 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3653 3654 nt.isolate(3) 3655 3656 lead := nt.peers[1].(*raft) 3657 3658 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3659 if lead.leadTransferee != 3 { 3660 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3661 } 3662 3663 // Transfer leadership back to self. 3664 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader}) 3665 3666 checkLeaderTransferState(t, lead, StateLeader, 1) 3667} 3668 3669// TestLeaderTransferSecondTransferToAnotherNode verifies leader can transfer to another node 3670// when last transfer is pending. 3671func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) { 3672 nt := newNetwork(nil, nil, nil) 3673 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3674 3675 nt.isolate(3) 3676 3677 lead := nt.peers[1].(*raft) 3678 3679 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3680 if lead.leadTransferee != 3 { 3681 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3682 } 3683 3684 // Transfer leadership to another node. 3685 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) 3686 3687 checkLeaderTransferState(t, lead, StateFollower, 2) 3688} 3689 3690// TestLeaderTransferSecondTransferToSameNode verifies second transfer leader request 3691// to the same node should not extend the timeout while the first one is pending. 3692func TestLeaderTransferSecondTransferToSameNode(t *testing.T) { 3693 nt := newNetwork(nil, nil, nil) 3694 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3695 3696 nt.isolate(3) 3697 3698 lead := nt.peers[1].(*raft) 3699 3700 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3701 if lead.leadTransferee != 3 { 3702 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3) 3703 } 3704 3705 for i := 0; i < lead.heartbeatTimeout; i++ { 3706 lead.tick() 3707 } 3708 // Second transfer leadership request to the same node. 3709 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) 3710 3711 for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ { 3712 lead.tick() 3713 } 3714 3715 checkLeaderTransferState(t, lead, StateLeader, 1) 3716} 3717 3718func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) { 3719 if r.state != state || r.lead != lead { 3720 t.Fatalf("after transferring, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead) 3721 } 3722 if r.leadTransferee != None { 3723 t.Fatalf("after transferring, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None) 3724 } 3725} 3726 3727// TestTransferNonMember verifies that when a MsgTimeoutNow arrives at 3728// a node that has been removed from the group, nothing happens. 3729// (previously, if the node also got votes, it would panic as it 3730// transitioned to StateLeader) 3731func TestTransferNonMember(t *testing.T) { 3732 r := newTestRaft(1, []uint64{2, 3, 4}, 5, 1, NewMemoryStorage()) 3733 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgTimeoutNow}) 3734 3735 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVoteResp}) 3736 r.Step(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp}) 3737 if r.state != StateFollower { 3738 t.Fatalf("state is %s, want StateFollower", r.state) 3739 } 3740} 3741 3742// TestNodeWithSmallerTermCanCompleteElection tests the scenario where a node 3743// that has been partitioned away (and fallen behind) rejoins the cluster at 3744// about the same time the leader node gets partitioned away. 3745// Previously the cluster would come to a standstill when run with PreVote 3746// enabled. 3747func TestNodeWithSmallerTermCanCompleteElection(t *testing.T) { 3748 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3749 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3750 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3751 3752 n1.becomeFollower(1, None) 3753 n2.becomeFollower(1, None) 3754 n3.becomeFollower(1, None) 3755 3756 n1.preVote = true 3757 n2.preVote = true 3758 n3.preVote = true 3759 3760 // cause a network partition to isolate node 3 3761 nt := newNetwork(n1, n2, n3) 3762 nt.cut(1, 3) 3763 nt.cut(2, 3) 3764 3765 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3766 3767 sm := nt.peers[1].(*raft) 3768 if sm.state != StateLeader { 3769 t.Errorf("peer 1 state: %s, want %s", sm.state, StateLeader) 3770 } 3771 3772 sm = nt.peers[2].(*raft) 3773 if sm.state != StateFollower { 3774 t.Errorf("peer 2 state: %s, want %s", sm.state, StateFollower) 3775 } 3776 3777 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 3778 sm = nt.peers[3].(*raft) 3779 if sm.state != StatePreCandidate { 3780 t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate) 3781 } 3782 3783 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 3784 3785 // check whether the term values are expected 3786 // a.Term == 3 3787 // b.Term == 3 3788 // c.Term == 1 3789 sm = nt.peers[1].(*raft) 3790 if sm.Term != 3 { 3791 t.Errorf("peer 1 term: %d, want %d", sm.Term, 3) 3792 } 3793 3794 sm = nt.peers[2].(*raft) 3795 if sm.Term != 3 { 3796 t.Errorf("peer 2 term: %d, want %d", sm.Term, 3) 3797 } 3798 3799 sm = nt.peers[3].(*raft) 3800 if sm.Term != 1 { 3801 t.Errorf("peer 3 term: %d, want %d", sm.Term, 1) 3802 } 3803 3804 // check state 3805 // a == follower 3806 // b == leader 3807 // c == pre-candidate 3808 sm = nt.peers[1].(*raft) 3809 if sm.state != StateFollower { 3810 t.Errorf("peer 1 state: %s, want %s", sm.state, StateFollower) 3811 } 3812 sm = nt.peers[2].(*raft) 3813 if sm.state != StateLeader { 3814 t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader) 3815 } 3816 sm = nt.peers[3].(*raft) 3817 if sm.state != StatePreCandidate { 3818 t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate) 3819 } 3820 3821 sm.logger.Infof("going to bring back peer 3 and kill peer 2") 3822 // recover the network then immediately isolate b which is currently 3823 // the leader, this is to emulate the crash of b. 3824 nt.recover() 3825 nt.cut(2, 1) 3826 nt.cut(2, 3) 3827 3828 // call for election 3829 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 3830 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3831 3832 // do we have a leader? 3833 sma := nt.peers[1].(*raft) 3834 smb := nt.peers[3].(*raft) 3835 if sma.state != StateLeader && smb.state != StateLeader { 3836 t.Errorf("no leader") 3837 } 3838} 3839 3840// TestPreVoteWithSplitVote verifies that after split vote, cluster can complete 3841// election in next round. 3842func TestPreVoteWithSplitVote(t *testing.T) { 3843 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3844 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3845 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3846 3847 n1.becomeFollower(1, None) 3848 n2.becomeFollower(1, None) 3849 n3.becomeFollower(1, None) 3850 3851 n1.preVote = true 3852 n2.preVote = true 3853 n3.preVote = true 3854 3855 nt := newNetwork(n1, n2, n3) 3856 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3857 3858 // simulate leader down. followers start split vote. 3859 nt.isolate(1) 3860 nt.send([]pb.Message{ 3861 {From: 2, To: 2, Type: pb.MsgHup}, 3862 {From: 3, To: 3, Type: pb.MsgHup}, 3863 }...) 3864 3865 // check whether the term values are expected 3866 // n2.Term == 3 3867 // n3.Term == 3 3868 sm := nt.peers[2].(*raft) 3869 if sm.Term != 3 { 3870 t.Errorf("peer 2 term: %d, want %d", sm.Term, 3) 3871 } 3872 sm = nt.peers[3].(*raft) 3873 if sm.Term != 3 { 3874 t.Errorf("peer 3 term: %d, want %d", sm.Term, 3) 3875 } 3876 3877 // check state 3878 // n2 == candidate 3879 // n3 == candidate 3880 sm = nt.peers[2].(*raft) 3881 if sm.state != StateCandidate { 3882 t.Errorf("peer 2 state: %s, want %s", sm.state, StateCandidate) 3883 } 3884 sm = nt.peers[3].(*raft) 3885 if sm.state != StateCandidate { 3886 t.Errorf("peer 3 state: %s, want %s", sm.state, StateCandidate) 3887 } 3888 3889 // node 2 election timeout first 3890 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 3891 3892 // check whether the term values are expected 3893 // n2.Term == 4 3894 // n3.Term == 4 3895 sm = nt.peers[2].(*raft) 3896 if sm.Term != 4 { 3897 t.Errorf("peer 2 term: %d, want %d", sm.Term, 4) 3898 } 3899 sm = nt.peers[3].(*raft) 3900 if sm.Term != 4 { 3901 t.Errorf("peer 3 term: %d, want %d", sm.Term, 4) 3902 } 3903 3904 // check state 3905 // n2 == leader 3906 // n3 == follower 3907 sm = nt.peers[2].(*raft) 3908 if sm.state != StateLeader { 3909 t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader) 3910 } 3911 sm = nt.peers[3].(*raft) 3912 if sm.state != StateFollower { 3913 t.Errorf("peer 3 state: %s, want %s", sm.state, StateFollower) 3914 } 3915} 3916 3917// TestPreVoteWithCheckQuorum ensures that after a node become pre-candidate, 3918// it will checkQuorum correctly. 3919func TestPreVoteWithCheckQuorum(t *testing.T) { 3920 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3921 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3922 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 3923 3924 n1.becomeFollower(1, None) 3925 n2.becomeFollower(1, None) 3926 n3.becomeFollower(1, None) 3927 3928 n1.preVote = true 3929 n2.preVote = true 3930 n3.preVote = true 3931 3932 n1.checkQuorum = true 3933 n2.checkQuorum = true 3934 n3.checkQuorum = true 3935 3936 nt := newNetwork(n1, n2, n3) 3937 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3938 3939 // isolate node 1. node 2 and node 3 have leader info 3940 nt.isolate(1) 3941 3942 // check state 3943 sm := nt.peers[1].(*raft) 3944 if sm.state != StateLeader { 3945 t.Fatalf("peer 1 state: %s, want %s", sm.state, StateLeader) 3946 } 3947 sm = nt.peers[2].(*raft) 3948 if sm.state != StateFollower { 3949 t.Fatalf("peer 2 state: %s, want %s", sm.state, StateFollower) 3950 } 3951 sm = nt.peers[3].(*raft) 3952 if sm.state != StateFollower { 3953 t.Fatalf("peer 3 state: %s, want %s", sm.state, StateFollower) 3954 } 3955 3956 // node 2 will ignore node 3's PreVote 3957 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 3958 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 3959 3960 // Do we have a leader? 3961 if n2.state != StateLeader && n3.state != StateFollower { 3962 t.Errorf("no leader") 3963 } 3964} 3965 3966// TestLearnerCampaign verifies that a learner won't campaign even if it receives 3967// a MsgHup or MsgTimeoutNow. 3968func TestLearnerCampaign(t *testing.T) { 3969 n1 := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 3970 n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) 3971 n2 := newTestRaft(2, []uint64{1}, 10, 1, NewMemoryStorage()) 3972 n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2()) 3973 nt := newNetwork(n1, n2) 3974 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 3975 3976 if !n2.isLearner { 3977 t.Fatalf("failed to make n2 a learner") 3978 } 3979 3980 if n2.state != StateFollower { 3981 t.Fatalf("n2 campaigned despite being learner") 3982 } 3983 3984 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 3985 if n1.state != StateLeader || n1.lead != 1 { 3986 t.Fatalf("n1 did not become leader") 3987 } 3988 3989 // NB: TransferLeader already checks that the recipient is not a learner, but 3990 // the check could have happened by the time the recipient becomes a learner, 3991 // in which case it will receive MsgTimeoutNow as in this test case and we 3992 // verify that it's ignored. 3993 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTimeoutNow}) 3994 3995 if n2.state != StateFollower { 3996 t.Fatalf("n2 accepted leadership transfer despite being learner") 3997 } 3998} 3999 4000// simulate rolling update a cluster for Pre-Vote. cluster has 3 nodes [n1, n2, n3]. 4001// n1 is leader with term 2 4002// n2 is follower with term 2 4003// n3 is partitioned, with term 4 and less log, state is candidate 4004func newPreVoteMigrationCluster(t *testing.T) *network { 4005 n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 4006 n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 4007 n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 4008 4009 n1.becomeFollower(1, None) 4010 n2.becomeFollower(1, None) 4011 n3.becomeFollower(1, None) 4012 4013 n1.preVote = true 4014 n2.preVote = true 4015 // We intentionally do not enable PreVote for n3, this is done so in order 4016 // to simulate a rolling restart process where it's possible to have a mixed 4017 // version cluster with replicas with PreVote enabled, and replicas without. 4018 4019 nt := newNetwork(n1, n2, n3) 4020 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 4021 4022 // Cause a network partition to isolate n3. 4023 nt.isolate(3) 4024 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 4025 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4026 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4027 4028 // check state 4029 // n1.state == StateLeader 4030 // n2.state == StateFollower 4031 // n3.state == StateCandidate 4032 if n1.state != StateLeader { 4033 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader) 4034 } 4035 if n2.state != StateFollower { 4036 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower) 4037 } 4038 if n3.state != StateCandidate { 4039 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate) 4040 } 4041 4042 // check term 4043 // n1.Term == 2 4044 // n2.Term == 2 4045 // n3.Term == 4 4046 if n1.Term != 2 { 4047 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2) 4048 } 4049 if n2.Term != 2 { 4050 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2) 4051 } 4052 if n3.Term != 4 { 4053 t.Fatalf("node 3 term: %d, want %d", n3.Term, 4) 4054 } 4055 4056 // Enable prevote on n3, then recover the network 4057 n3.preVote = true 4058 nt.recover() 4059 4060 return nt 4061} 4062 4063func TestPreVoteMigrationCanCompleteElection(t *testing.T) { 4064 nt := newPreVoteMigrationCluster(t) 4065 4066 // n1 is leader with term 2 4067 // n2 is follower with term 2 4068 // n3 is pre-candidate with term 4, and less log 4069 n2 := nt.peers[2].(*raft) 4070 n3 := nt.peers[3].(*raft) 4071 4072 // simulate leader down 4073 nt.isolate(1) 4074 4075 // Call for elections from both n2 and n3. 4076 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4077 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 4078 4079 // check state 4080 // n2.state == Follower 4081 // n3.state == PreCandidate 4082 if n2.state != StateFollower { 4083 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower) 4084 } 4085 if n3.state != StatePreCandidate { 4086 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate) 4087 } 4088 4089 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4090 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 4091 4092 // Do we have a leader? 4093 if n2.state != StateLeader && n3.state != StateFollower { 4094 t.Errorf("no leader") 4095 } 4096} 4097 4098func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) { 4099 nt := newPreVoteMigrationCluster(t) 4100 4101 // n1 is leader with term 2 4102 // n2 is follower with term 2 4103 // n3 is pre-candidate with term 4, and less log 4104 n1 := nt.peers[1].(*raft) 4105 n2 := nt.peers[2].(*raft) 4106 n3 := nt.peers[3].(*raft) 4107 4108 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4109 4110 if n1.state != StateLeader { 4111 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader) 4112 } 4113 if n2.state != StateFollower { 4114 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower) 4115 } 4116 if n3.state != StatePreCandidate { 4117 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate) 4118 } 4119 4120 // Pre-Vote again for safety 4121 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 4122 4123 if n1.state != StateLeader { 4124 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader) 4125 } 4126 if n2.state != StateFollower { 4127 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower) 4128 } 4129 if n3.state != StatePreCandidate { 4130 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate) 4131 } 4132 4133 nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: n1.Term}) 4134 4135 // Disrupt the leader so that the stuck peer is freed 4136 if n1.state != StateFollower { 4137 t.Errorf("state = %s, want %s", n1.state, StateFollower) 4138 } 4139 if n3.Term != n1.Term { 4140 t.Errorf("term = %d, want %d", n3.Term, n1.Term) 4141 } 4142} 4143 4144func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft { 4145 storage := NewMemoryStorage() 4146 for i, term := range terms { 4147 storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}}) 4148 } 4149 cfg := newTestConfig(1, []uint64{}, 5, 1, storage) 4150 if configFunc != nil { 4151 configFunc(cfg) 4152 } 4153 sm := newRaft(cfg) 4154 sm.reset(terms[len(terms)-1]) 4155 return sm 4156} 4157 4158// votedWithConfig creates a raft state machine with Vote and Term set 4159// to the given value but no log entries (indicating that it voted in 4160// the given term but has not received any logs). 4161func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft { 4162 storage := NewMemoryStorage() 4163 storage.SetHardState(pb.HardState{Vote: vote, Term: term}) 4164 cfg := newTestConfig(1, []uint64{}, 5, 1, storage) 4165 if configFunc != nil { 4166 configFunc(cfg) 4167 } 4168 sm := newRaft(cfg) 4169 sm.reset(term) 4170 return sm 4171} 4172 4173type network struct { 4174 peers map[uint64]stateMachine 4175 storage map[uint64]*MemoryStorage 4176 dropm map[connem]float64 4177 ignorem map[pb.MessageType]bool 4178 4179 // msgHook is called for each message sent. It may inspect the 4180 // message and return true to send it or false to drop it. 4181 msgHook func(pb.Message) bool 4182} 4183 4184// newNetwork initializes a network from peers. 4185// A nil node will be replaced with a new *stateMachine. 4186// A *stateMachine will get its k, id. 4187// When using stateMachine, the address list is always [1, n]. 4188func newNetwork(peers ...stateMachine) *network { 4189 return newNetworkWithConfig(nil, peers...) 4190} 4191 4192// newNetworkWithConfig is like newNetwork but calls the given func to 4193// modify the configuration of any state machines it creates. 4194func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *network { 4195 size := len(peers) 4196 peerAddrs := idsBySize(size) 4197 4198 npeers := make(map[uint64]stateMachine, size) 4199 nstorage := make(map[uint64]*MemoryStorage, size) 4200 4201 for j, p := range peers { 4202 id := peerAddrs[j] 4203 switch v := p.(type) { 4204 case nil: 4205 nstorage[id] = NewMemoryStorage() 4206 cfg := newTestConfig(id, peerAddrs, 10, 1, nstorage[id]) 4207 if configFunc != nil { 4208 configFunc(cfg) 4209 } 4210 sm := newRaft(cfg) 4211 npeers[id] = sm 4212 case *raft: 4213 // TODO(tbg): this is all pretty confused. Clean this up. 4214 learners := make(map[uint64]bool, len(v.prs.Learners)) 4215 for i := range v.prs.Learners { 4216 learners[i] = true 4217 } 4218 v.id = id 4219 v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) 4220 if len(learners) > 0 { 4221 v.prs.Learners = map[uint64]struct{}{} 4222 } 4223 for i := 0; i < size; i++ { 4224 pr := &tracker.Progress{} 4225 if _, ok := learners[peerAddrs[i]]; ok { 4226 pr.IsLearner = true 4227 v.prs.Learners[peerAddrs[i]] = struct{}{} 4228 } else { 4229 v.prs.Voters[0][peerAddrs[i]] = struct{}{} 4230 } 4231 v.prs.Progress[peerAddrs[i]] = pr 4232 } 4233 v.reset(v.Term) 4234 npeers[id] = v 4235 case *blackHole: 4236 npeers[id] = v 4237 default: 4238 panic(fmt.Sprintf("unexpected state machine type: %T", p)) 4239 } 4240 } 4241 return &network{ 4242 peers: npeers, 4243 storage: nstorage, 4244 dropm: make(map[connem]float64), 4245 ignorem: make(map[pb.MessageType]bool), 4246 } 4247} 4248 4249func preVoteConfig(c *Config) { 4250 c.PreVote = true 4251} 4252 4253func (nw *network) send(msgs ...pb.Message) { 4254 for len(msgs) > 0 { 4255 m := msgs[0] 4256 p := nw.peers[m.To] 4257 p.Step(m) 4258 msgs = append(msgs[1:], nw.filter(p.readMessages())...) 4259 } 4260} 4261 4262func (nw *network) drop(from, to uint64, perc float64) { 4263 nw.dropm[connem{from, to}] = perc 4264} 4265 4266func (nw *network) cut(one, other uint64) { 4267 nw.drop(one, other, 2.0) // always drop 4268 nw.drop(other, one, 2.0) // always drop 4269} 4270 4271func (nw *network) isolate(id uint64) { 4272 for i := 0; i < len(nw.peers); i++ { 4273 nid := uint64(i) + 1 4274 if nid != id { 4275 nw.drop(id, nid, 1.0) // always drop 4276 nw.drop(nid, id, 1.0) // always drop 4277 } 4278 } 4279} 4280 4281func (nw *network) ignore(t pb.MessageType) { 4282 nw.ignorem[t] = true 4283} 4284 4285func (nw *network) recover() { 4286 nw.dropm = make(map[connem]float64) 4287 nw.ignorem = make(map[pb.MessageType]bool) 4288} 4289 4290func (nw *network) filter(msgs []pb.Message) []pb.Message { 4291 mm := []pb.Message{} 4292 for _, m := range msgs { 4293 if nw.ignorem[m.Type] { 4294 continue 4295 } 4296 switch m.Type { 4297 case pb.MsgHup: 4298 // hups never go over the network, so don't drop them but panic 4299 panic("unexpected msgHup") 4300 default: 4301 perc := nw.dropm[connem{m.From, m.To}] 4302 if n := rand.Float64(); n < perc { 4303 continue 4304 } 4305 } 4306 if nw.msgHook != nil { 4307 if !nw.msgHook(m) { 4308 continue 4309 } 4310 } 4311 mm = append(mm, m) 4312 } 4313 return mm 4314} 4315 4316type connem struct { 4317 from, to uint64 4318} 4319 4320type blackHole struct{} 4321 4322func (blackHole) Step(pb.Message) error { return nil } 4323func (blackHole) readMessages() []pb.Message { return nil } 4324 4325var nopStepper = &blackHole{} 4326 4327func idsBySize(size int) []uint64 { 4328 ids := make([]uint64, size) 4329 for i := 0; i < size; i++ { 4330 ids[i] = 1 + uint64(i) 4331 } 4332 return ids 4333} 4334 4335// setRandomizedElectionTimeout set up the value by caller instead of choosing 4336// by system, in some test scenario we need to fill in some expected value to 4337// ensure the certainty 4338func setRandomizedElectionTimeout(r *raft, v int) { 4339 r.randomizedElectionTimeout = v 4340} 4341 4342func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage Storage) *Config { 4343 return &Config{ 4344 ID: id, 4345 peers: peers, 4346 ElectionTick: election, 4347 HeartbeatTick: heartbeat, 4348 Storage: storage, 4349 MaxSizePerMsg: noLimit, 4350 MaxInflightMsgs: 256, 4351 } 4352} 4353 4354func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { 4355 return newRaft(newTestConfig(id, peers, election, heartbeat, storage)) 4356} 4357 4358func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, heartbeat int, storage Storage) *raft { 4359 cfg := newTestConfig(id, peers, election, heartbeat, storage) 4360 cfg.learners = learners 4361 return newRaft(cfg) 4362} 4363 4364// newTestRawNode sets up a RawNode with the given peers. The configuration will 4365// not be reflected in the Storage. 4366func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { 4367 cfg := newTestConfig(id, peers, election, heartbeat, storage) 4368 rn, err := NewRawNode(cfg) 4369 if err != nil { 4370 panic(err) 4371 } 4372 return rn 4373} 4374