1// Copyright 2015 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import ( 18 "bytes" 19 "fmt" 20 "math" 21 "math/rand" 22 "reflect" 23 "testing" 24 25 pb "github.com/coreos/etcd/raft/raftpb" 26) 27 28// nextEnts returns the appliable entries and updates the applied index 29func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { 30 // Transfer all unstable entries to "stable" storage. 31 s.Append(r.raftLog.unstableEntries()) 32 r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) 33 34 ents = r.raftLog.nextEnts() 35 r.raftLog.appliedTo(r.raftLog.committed) 36 return ents 37} 38 39type stateMachine interface { 40 Step(m pb.Message) error 41 readMessages() []pb.Message 42} 43 44func (r *raft) readMessages() []pb.Message { 45 msgs := r.msgs 46 r.msgs = make([]pb.Message, 0) 47 48 return msgs 49} 50 51func TestProgressBecomeProbe(t *testing.T) { 52 match := uint64(1) 53 tests := []struct { 54 p *Progress 55 wnext uint64 56 }{ 57 { 58 &Progress{State: ProgressStateReplicate, Match: match, Next: 5, ins: newInflights(256)}, 59 2, 60 }, 61 { 62 // snapshot finish 63 &Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, ins: newInflights(256)}, 64 11, 65 }, 66 { 67 // snapshot failure 68 &Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, ins: newInflights(256)}, 69 2, 70 }, 71 } 72 for i, tt := range tests { 73 tt.p.becomeProbe() 74 if tt.p.State != ProgressStateProbe { 75 t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe) 76 } 77 if tt.p.Match != match { 78 t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match) 79 } 80 if tt.p.Next != tt.wnext { 81 t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext) 82 } 83 } 84} 85 86func TestProgressBecomeReplicate(t *testing.T) { 87 p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)} 88 p.becomeReplicate() 89 90 if p.State != ProgressStateReplicate { 91 t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate) 92 } 93 if p.Match != 1 { 94 t.Errorf("match = %d, want 1", p.Match) 95 } 96 if w := p.Match + 1; p.Next != w { 97 t.Errorf("next = %d, want %d", p.Next, w) 98 } 99} 100 101func TestProgressBecomeSnapshot(t *testing.T) { 102 p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)} 103 p.becomeSnapshot(10) 104 105 if p.State != ProgressStateSnapshot { 106 t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot) 107 } 108 if p.Match != 1 { 109 t.Errorf("match = %d, want 1", p.Match) 110 } 111 if p.PendingSnapshot != 10 { 112 t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot) 113 } 114} 115 116func TestProgressUpdate(t *testing.T) { 117 prevM, prevN := uint64(3), uint64(5) 118 tests := []struct { 119 update uint64 120 121 wm uint64 122 wn uint64 123 wok bool 124 }{ 125 {prevM - 1, prevM, prevN, false}, // do not decrease match, next 126 {prevM, prevM, prevN, false}, // do not decrease next 127 {prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next 128 {prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next 129 } 130 for i, tt := range tests { 131 p := &Progress{ 132 Match: prevM, 133 Next: prevN, 134 } 135 ok := p.maybeUpdate(tt.update) 136 if ok != tt.wok { 137 t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok) 138 } 139 if p.Match != tt.wm { 140 t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm) 141 } 142 if p.Next != tt.wn { 143 t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn) 144 } 145 } 146} 147 148func TestProgressMaybeDecr(t *testing.T) { 149 tests := []struct { 150 state ProgressStateType 151 m uint64 152 n uint64 153 rejected uint64 154 last uint64 155 156 w bool 157 wn uint64 158 }{ 159 { 160 // state replicate and rejected is not greater than match 161 ProgressStateReplicate, 5, 10, 5, 5, false, 10, 162 }, 163 { 164 // state replicate and rejected is not greater than match 165 ProgressStateReplicate, 5, 10, 4, 4, false, 10, 166 }, 167 { 168 // state replicate and rejected is greater than match 169 // directly decrease to match+1 170 ProgressStateReplicate, 5, 10, 9, 9, true, 6, 171 }, 172 { 173 // next-1 != rejected is always false 174 ProgressStateProbe, 0, 0, 0, 0, false, 0, 175 }, 176 { 177 // next-1 != rejected is always false 178 ProgressStateProbe, 0, 10, 5, 5, false, 10, 179 }, 180 { 181 // next>1 = decremented by 1 182 ProgressStateProbe, 0, 10, 9, 9, true, 9, 183 }, 184 { 185 // next>1 = decremented by 1 186 ProgressStateProbe, 0, 2, 1, 1, true, 1, 187 }, 188 { 189 // next<=1 = reset to 1 190 ProgressStateProbe, 0, 1, 0, 0, true, 1, 191 }, 192 { 193 // decrease to min(rejected, last+1) 194 ProgressStateProbe, 0, 10, 9, 2, true, 3, 195 }, 196 { 197 // rejected < 1, reset to 1 198 ProgressStateProbe, 0, 10, 9, 0, true, 1, 199 }, 200 } 201 for i, tt := range tests { 202 p := &Progress{ 203 State: tt.state, 204 Match: tt.m, 205 Next: tt.n, 206 } 207 if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w { 208 t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w) 209 } 210 if gm := p.Match; gm != tt.m { 211 t.Errorf("#%d: match= %d, want %d", i, gm, tt.m) 212 } 213 if gn := p.Next; gn != tt.wn { 214 t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn) 215 } 216 } 217} 218 219func TestProgressIsPaused(t *testing.T) { 220 tests := []struct { 221 state ProgressStateType 222 paused bool 223 224 w bool 225 }{ 226 {ProgressStateProbe, false, false}, 227 {ProgressStateProbe, true, true}, 228 {ProgressStateReplicate, false, false}, 229 {ProgressStateReplicate, true, false}, 230 {ProgressStateSnapshot, false, true}, 231 {ProgressStateSnapshot, true, true}, 232 } 233 for i, tt := range tests { 234 p := &Progress{ 235 State: tt.state, 236 Paused: tt.paused, 237 ins: newInflights(256), 238 } 239 if g := p.isPaused(); g != tt.w { 240 t.Errorf("#%d: paused= %t, want %t", i, g, tt.w) 241 } 242 } 243} 244 245// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo 246// will reset progress.paused. 247func TestProgressResume(t *testing.T) { 248 p := &Progress{ 249 Next: 2, 250 Paused: true, 251 } 252 p.maybeDecrTo(1, 1) 253 if p.Paused != false { 254 t.Errorf("paused= %v, want false", p.Paused) 255 } 256 p.Paused = true 257 p.maybeUpdate(2) 258 if p.Paused != false { 259 t.Errorf("paused= %v, want false", p.Paused) 260 } 261} 262 263// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat. 264func TestProgressResumeByHeartbeat(t *testing.T) { 265 r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 266 r.becomeCandidate() 267 r.becomeLeader() 268 r.prs[2].Paused = true 269 270 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 271 if r.prs[2].Paused != false { 272 t.Errorf("paused = %v, want false", r.prs[2].Paused) 273 } 274} 275 276func TestProgressPaused(t *testing.T) { 277 r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) 278 r.becomeCandidate() 279 r.becomeLeader() 280 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 281 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 282 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 283 284 ms := r.readMessages() 285 if len(ms) != 1 { 286 t.Errorf("len(ms) = %d, want 1", len(ms)) 287 } 288} 289 290func TestLeaderElection(t *testing.T) { 291 tests := []struct { 292 *network 293 state StateType 294 }{ 295 {newNetwork(nil, nil, nil), StateLeader}, 296 {newNetwork(nil, nil, nopStepper), StateLeader}, 297 {newNetwork(nil, nopStepper, nopStepper), StateCandidate}, 298 {newNetwork(nil, nopStepper, nopStepper, nil), StateCandidate}, 299 {newNetwork(nil, nopStepper, nopStepper, nil, nil), StateLeader}, 300 301 // three logs further along than 0 302 {newNetwork(nil, ents(1), ents(2), ents(1, 3), nil), StateFollower}, 303 304 // logs converge 305 {newNetwork(ents(1), nil, ents(2), ents(1), nil), StateLeader}, 306 } 307 308 for i, tt := range tests { 309 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 310 sm := tt.network.peers[1].(*raft) 311 if sm.state != tt.state { 312 t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state) 313 } 314 if g := sm.Term; g != 1 { 315 t.Errorf("#%d: term = %d, want %d", i, g, 1) 316 } 317 } 318} 319 320func TestLogReplication(t *testing.T) { 321 tests := []struct { 322 *network 323 msgs []pb.Message 324 wcommitted uint64 325 }{ 326 { 327 newNetwork(nil, nil, nil), 328 []pb.Message{ 329 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 330 }, 331 2, 332 }, 333 { 334 newNetwork(nil, nil, nil), 335 []pb.Message{ 336 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 337 {From: 1, To: 2, Type: pb.MsgHup}, 338 {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}, 339 }, 340 4, 341 }, 342 } 343 344 for i, tt := range tests { 345 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 346 347 for _, m := range tt.msgs { 348 tt.send(m) 349 } 350 351 for j, x := range tt.network.peers { 352 sm := x.(*raft) 353 354 if sm.raftLog.committed != tt.wcommitted { 355 t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted) 356 } 357 358 ents := []pb.Entry{} 359 for _, e := range nextEnts(sm, tt.network.storage[j]) { 360 if e.Data != nil { 361 ents = append(ents, e) 362 } 363 } 364 props := []pb.Message{} 365 for _, m := range tt.msgs { 366 if m.Type == pb.MsgProp { 367 props = append(props, m) 368 } 369 } 370 for k, m := range props { 371 if !bytes.Equal(ents[k].Data, m.Entries[0].Data) { 372 t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data) 373 } 374 } 375 } 376 } 377} 378 379func TestSingleNodeCommit(t *testing.T) { 380 tt := newNetwork(nil) 381 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 382 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 383 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 384 385 sm := tt.peers[1].(*raft) 386 if sm.raftLog.committed != 3 { 387 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) 388 } 389} 390 391// TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed 392// when leader changes, no new proposal comes in and ChangeTerm proposal is 393// filtered. 394func TestCannotCommitWithoutNewTermEntry(t *testing.T) { 395 tt := newNetwork(nil, nil, nil, nil, nil) 396 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 397 398 // 0 cannot reach 2,3,4 399 tt.cut(1, 3) 400 tt.cut(1, 4) 401 tt.cut(1, 5) 402 403 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 404 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 405 406 sm := tt.peers[1].(*raft) 407 if sm.raftLog.committed != 1 { 408 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 409 } 410 411 // network recovery 412 tt.recover() 413 // avoid committing ChangeTerm proposal 414 tt.ignore(pb.MsgApp) 415 416 // elect 2 as the new leader with term 2 417 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 418 419 // no log entries from previous term should be committed 420 sm = tt.peers[2].(*raft) 421 if sm.raftLog.committed != 1 { 422 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 423 } 424 425 tt.recover() 426 // send heartbeat; reset wait 427 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat}) 428 // append an entry at current term 429 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 430 // expect the committed to be advanced 431 if sm.raftLog.committed != 5 { 432 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5) 433 } 434} 435 436// TestCommitWithoutNewTermEntry tests the entries could be committed 437// when leader changes, no new proposal comes in. 438func TestCommitWithoutNewTermEntry(t *testing.T) { 439 tt := newNetwork(nil, nil, nil, nil, nil) 440 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 441 442 // 0 cannot reach 2,3,4 443 tt.cut(1, 3) 444 tt.cut(1, 4) 445 tt.cut(1, 5) 446 447 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 448 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) 449 450 sm := tt.peers[1].(*raft) 451 if sm.raftLog.committed != 1 { 452 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1) 453 } 454 455 // network recovery 456 tt.recover() 457 458 // elect 1 as the new leader with term 2 459 // after append a ChangeTerm entry from the current term, all entries 460 // should be committed 461 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 462 463 if sm.raftLog.committed != 4 { 464 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4) 465 } 466} 467 468func TestDuelingCandidates(t *testing.T) { 469 a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 470 b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 471 c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 472 473 nt := newNetwork(a, b, c) 474 nt.cut(1, 3) 475 476 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 477 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 478 479 nt.recover() 480 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 481 482 wlog := &raftLog{ 483 storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, 484 committed: 1, 485 unstable: unstable{offset: 2}, 486 } 487 tests := []struct { 488 sm *raft 489 state StateType 490 term uint64 491 raftLog *raftLog 492 }{ 493 {a, StateFollower, 2, wlog}, 494 {b, StateFollower, 2, wlog}, 495 {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, 496 } 497 498 for i, tt := range tests { 499 if g := tt.sm.state; g != tt.state { 500 t.Errorf("#%d: state = %s, want %s", i, g, tt.state) 501 } 502 if g := tt.sm.Term; g != tt.term { 503 t.Errorf("#%d: term = %d, want %d", i, g, tt.term) 504 } 505 base := ltoa(tt.raftLog) 506 if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { 507 l := ltoa(sm.raftLog) 508 if g := diffu(base, l); g != "" { 509 t.Errorf("#%d: diff:\n%s", i, g) 510 } 511 } else { 512 t.Logf("#%d: empty log", i) 513 } 514 } 515} 516 517func TestCandidateConcede(t *testing.T) { 518 tt := newNetwork(nil, nil, nil) 519 tt.isolate(1) 520 521 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 522 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) 523 524 // heal the partition 525 tt.recover() 526 // send heartbeat; reset wait 527 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) 528 529 data := []byte("force follower") 530 // send a proposal to 3 to flush out a MsgApp to 1 531 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 532 // send heartbeat; flush out commit 533 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat}) 534 535 a := tt.peers[1].(*raft) 536 if g := a.state; g != StateFollower { 537 t.Errorf("state = %s, want %s", g, StateFollower) 538 } 539 if g := a.Term; g != 1 { 540 t.Errorf("term = %d, want %d", g, 1) 541 } 542 wantLog := ltoa(&raftLog{ 543 storage: &MemoryStorage{ 544 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, 545 }, 546 unstable: unstable{offset: 3}, 547 committed: 2, 548 }) 549 for i, p := range tt.peers { 550 if sm, ok := p.(*raft); ok { 551 l := ltoa(sm.raftLog) 552 if g := diffu(wantLog, l); g != "" { 553 t.Errorf("#%d: diff:\n%s", i, g) 554 } 555 } else { 556 t.Logf("#%d: empty log", i) 557 } 558 } 559} 560 561func TestSingleNodeCandidate(t *testing.T) { 562 tt := newNetwork(nil) 563 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 564 565 sm := tt.peers[1].(*raft) 566 if sm.state != StateLeader { 567 t.Errorf("state = %d, want %d", sm.state, StateLeader) 568 } 569} 570 571func TestOldMessages(t *testing.T) { 572 tt := newNetwork(nil, nil, nil) 573 // make 0 leader @ term 3 574 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 575 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup}) 576 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 577 // pretend we're an old leader trying to make progress; this entry is expected to be ignored. 578 tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}}) 579 // commit a new entry 580 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 581 582 ilog := &raftLog{ 583 storage: &MemoryStorage{ 584 ents: []pb.Entry{ 585 {}, {Data: nil, Term: 1, Index: 1}, 586 {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3}, 587 {Data: []byte("somedata"), Term: 3, Index: 4}, 588 }, 589 }, 590 unstable: unstable{offset: 5}, 591 committed: 4, 592 } 593 base := ltoa(ilog) 594 for i, p := range tt.peers { 595 if sm, ok := p.(*raft); ok { 596 l := ltoa(sm.raftLog) 597 if g := diffu(base, l); g != "" { 598 t.Errorf("#%d: diff:\n%s", i, g) 599 } 600 } else { 601 t.Logf("#%d: empty log", i) 602 } 603 } 604} 605 606// TestOldMessagesReply - optimization - reply with new term. 607 608func TestProposal(t *testing.T) { 609 tests := []struct { 610 *network 611 success bool 612 }{ 613 {newNetwork(nil, nil, nil), true}, 614 {newNetwork(nil, nil, nopStepper), true}, 615 {newNetwork(nil, nopStepper, nopStepper), false}, 616 {newNetwork(nil, nopStepper, nopStepper, nil), false}, 617 {newNetwork(nil, nopStepper, nopStepper, nil, nil), true}, 618 } 619 620 for j, tt := range tests { 621 send := func(m pb.Message) { 622 defer func() { 623 // only recover is we expect it to panic so 624 // panics we don't expect go up. 625 if !tt.success { 626 e := recover() 627 if e != nil { 628 t.Logf("#%d: err: %s", j, e) 629 } 630 } 631 }() 632 tt.send(m) 633 } 634 635 data := []byte("somedata") 636 637 // promote 0 the leader 638 send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 639 send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) 640 641 wantLog := newLog(NewMemoryStorage(), raftLogger) 642 if tt.success { 643 wantLog = &raftLog{ 644 storage: &MemoryStorage{ 645 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, 646 }, 647 unstable: unstable{offset: 3}, 648 committed: 2} 649 } 650 base := ltoa(wantLog) 651 for i, p := range tt.peers { 652 if sm, ok := p.(*raft); ok { 653 l := ltoa(sm.raftLog) 654 if g := diffu(base, l); g != "" { 655 t.Errorf("#%d: diff:\n%s", i, g) 656 } 657 } else { 658 t.Logf("#%d: empty log", i) 659 } 660 } 661 sm := tt.network.peers[1].(*raft) 662 if g := sm.Term; g != 1 { 663 t.Errorf("#%d: term = %d, want %d", j, g, 1) 664 } 665 } 666} 667 668func TestProposalByProxy(t *testing.T) { 669 data := []byte("somedata") 670 tests := []*network{ 671 newNetwork(nil, nil, nil), 672 newNetwork(nil, nil, nopStepper), 673 } 674 675 for j, tt := range tests { 676 // promote 0 the leader 677 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 678 679 // propose via follower 680 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 681 682 wantLog := &raftLog{ 683 storage: &MemoryStorage{ 684 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, 685 }, 686 unstable: unstable{offset: 3}, 687 committed: 2} 688 base := ltoa(wantLog) 689 for i, p := range tt.peers { 690 if sm, ok := p.(*raft); ok { 691 l := ltoa(sm.raftLog) 692 if g := diffu(base, l); g != "" { 693 t.Errorf("#%d: diff:\n%s", i, g) 694 } 695 } else { 696 t.Logf("#%d: empty log", i) 697 } 698 } 699 sm := tt.peers[1].(*raft) 700 if g := sm.Term; g != 1 { 701 t.Errorf("#%d: term = %d, want %d", j, g, 1) 702 } 703 } 704} 705 706func TestCommit(t *testing.T) { 707 tests := []struct { 708 matches []uint64 709 logs []pb.Entry 710 smTerm uint64 711 w uint64 712 }{ 713 // single 714 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1}, 715 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0}, 716 {[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 717 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1}, 718 719 // odd 720 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 721 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 722 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 723 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 724 725 // even 726 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 727 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 728 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1}, 729 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 730 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2}, 731 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0}, 732 } 733 734 for i, tt := range tests { 735 storage := NewMemoryStorage() 736 storage.Append(tt.logs) 737 storage.hardState = pb.HardState{Term: tt.smTerm} 738 739 sm := newTestRaft(1, []uint64{1}, 5, 1, storage) 740 for j := 0; j < len(tt.matches); j++ { 741 sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1) 742 } 743 sm.maybeCommit() 744 if g := sm.raftLog.committed; g != tt.w { 745 t.Errorf("#%d: committed = %d, want %d", i, g, tt.w) 746 } 747 } 748} 749 750func TestIsElectionTimeout(t *testing.T) { 751 tests := []struct { 752 elapse int 753 wprobability float64 754 round bool 755 }{ 756 {5, 0, false}, 757 {13, 0.3, true}, 758 {15, 0.5, true}, 759 {18, 0.8, true}, 760 {20, 1, false}, 761 } 762 763 for i, tt := range tests { 764 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 765 sm.electionElapsed = tt.elapse 766 c := 0 767 for j := 0; j < 10000; j++ { 768 if sm.isElectionTimeout() { 769 c++ 770 } 771 } 772 got := float64(c) / 10000.0 773 if tt.round { 774 got = math.Floor(got*10+0.5) / 10.0 775 } 776 if got != tt.wprobability { 777 t.Errorf("#%d: possibility = %v, want %v", i, got, tt.wprobability) 778 } 779 } 780} 781 782// ensure that the Step function ignores the message from old term and does not pass it to the 783// actual stepX function. 784func TestStepIgnoreOldTermMsg(t *testing.T) { 785 called := false 786 fakeStep := func(r *raft, m pb.Message) { 787 called = true 788 } 789 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 790 sm.step = fakeStep 791 sm.Term = 2 792 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1}) 793 if called == true { 794 t.Errorf("stepFunc called = %v , want %v", called, false) 795 } 796} 797 798// TestHandleMsgApp ensures: 799// 1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm. 800// 2. If an existing entry conflicts with a new one (same index but different terms), 801// delete the existing entry and all that follow it; append any new entries not already in the log. 802// 3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry). 803func TestHandleMsgApp(t *testing.T) { 804 tests := []struct { 805 m pb.Message 806 wIndex uint64 807 wCommit uint64 808 wReject bool 809 }{ 810 // Ensure 1 811 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch 812 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist 813 814 // Ensure 2 815 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, 816 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false}, 817 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false}, 818 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false}, 819 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, 820 821 // Ensure 3 822 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit up to last new entry 1 823 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2 824 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit up to last new entry 2 825 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit up to log.last() 826 } 827 828 for i, tt := range tests { 829 storage := NewMemoryStorage() 830 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}) 831 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 832 sm.becomeFollower(2, None) 833 834 sm.handleAppendEntries(tt.m) 835 if sm.raftLog.lastIndex() != tt.wIndex { 836 t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex) 837 } 838 if sm.raftLog.committed != tt.wCommit { 839 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) 840 } 841 m := sm.readMessages() 842 if len(m) != 1 { 843 t.Fatalf("#%d: msg = nil, want 1", i) 844 } 845 if m[0].Reject != tt.wReject { 846 t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject) 847 } 848 } 849} 850 851// TestHandleHeartbeat ensures that the follower commits to the commit in the message. 852func TestHandleHeartbeat(t *testing.T) { 853 commit := uint64(2) 854 tests := []struct { 855 m pb.Message 856 wCommit uint64 857 }{ 858 {pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit + 1}, commit + 1}, 859 {pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit 860 } 861 862 for i, tt := range tests { 863 storage := NewMemoryStorage() 864 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) 865 sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) 866 sm.becomeFollower(2, 2) 867 sm.raftLog.commitTo(commit) 868 sm.handleHeartbeat(tt.m) 869 if sm.raftLog.committed != tt.wCommit { 870 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) 871 } 872 m := sm.readMessages() 873 if len(m) != 1 { 874 t.Fatalf("#%d: msg = nil, want 1", i) 875 } 876 if m[0].Type != pb.MsgHeartbeatResp { 877 t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type) 878 } 879 } 880} 881 882// TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response. 883func TestHandleHeartbeatResp(t *testing.T) { 884 storage := NewMemoryStorage() 885 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) 886 sm := newTestRaft(1, []uint64{1, 2}, 5, 1, storage) 887 sm.becomeCandidate() 888 sm.becomeLeader() 889 sm.raftLog.commitTo(sm.raftLog.lastIndex()) 890 891 // A heartbeat response from a node that is behind; re-send MsgApp 892 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 893 msgs := sm.readMessages() 894 if len(msgs) != 1 { 895 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 896 } 897 if msgs[0].Type != pb.MsgApp { 898 t.Errorf("type = %v, want MsgApp", msgs[0].Type) 899 } 900 901 // A second heartbeat response with no AppResp does not re-send because we are in the wait state. 902 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 903 msgs = sm.readMessages() 904 if len(msgs) != 0 { 905 t.Fatalf("len(msgs) = %d, want 0", len(msgs)) 906 } 907 908 // Send a heartbeat to reset the wait state; next heartbeat will re-send MsgApp. 909 sm.bcastHeartbeat() 910 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 911 msgs = sm.readMessages() 912 if len(msgs) != 2 { 913 t.Fatalf("len(msgs) = %d, want 2", len(msgs)) 914 } 915 if msgs[0].Type != pb.MsgHeartbeat { 916 t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type) 917 } 918 if msgs[1].Type != pb.MsgApp { 919 t.Errorf("type = %v, want MsgApp", msgs[1].Type) 920 } 921 922 // Once we have an MsgAppResp, heartbeats no longer send MsgApp. 923 sm.Step(pb.Message{ 924 From: 2, 925 Type: pb.MsgAppResp, 926 Index: msgs[1].Index + uint64(len(msgs[1].Entries)), 927 }) 928 // Consume the message sent in response to MsgAppResp 929 sm.readMessages() 930 931 sm.bcastHeartbeat() // reset wait state 932 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) 933 msgs = sm.readMessages() 934 if len(msgs) != 1 { 935 t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs) 936 } 937 if msgs[0].Type != pb.MsgHeartbeat { 938 t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type) 939 } 940} 941 942// TestMsgAppRespWaitReset verifies the resume behavior of a leader 943// MsgAppResp. 944func TestMsgAppRespWaitReset(t *testing.T) { 945 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 946 sm.becomeCandidate() 947 sm.becomeLeader() 948 949 // The new leader has just emitted a new Term 4 entry; consume those messages 950 // from the outgoing queue. 951 sm.bcastAppend() 952 sm.readMessages() 953 954 // Node 2 acks the first entry, making it committed. 955 sm.Step(pb.Message{ 956 From: 2, 957 Type: pb.MsgAppResp, 958 Index: 1, 959 }) 960 if sm.raftLog.committed != 1 { 961 t.Fatalf("expected committed to be 1, got %d", sm.raftLog.committed) 962 } 963 // Also consume the MsgApp messages that update Commit on the followers. 964 sm.readMessages() 965 966 // A new command is now proposed on node 1. 967 sm.Step(pb.Message{ 968 From: 1, 969 Type: pb.MsgProp, 970 Entries: []pb.Entry{{}}, 971 }) 972 973 // The command is broadcast to all nodes not in the wait state. 974 // Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting. 975 msgs := sm.readMessages() 976 if len(msgs) != 1 { 977 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) 978 } 979 if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 { 980 t.Errorf("expected MsgApp to node 2, got %v to %d", msgs[0].Type, msgs[0].To) 981 } 982 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { 983 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) 984 } 985 986 // Now Node 3 acks the first entry. This releases the wait and entry 2 is sent. 987 sm.Step(pb.Message{ 988 From: 3, 989 Type: pb.MsgAppResp, 990 Index: 1, 991 }) 992 msgs = sm.readMessages() 993 if len(msgs) != 1 { 994 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs) 995 } 996 if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 { 997 t.Errorf("expected MsgApp to node 3, got %v to %d", msgs[0].Type, msgs[0].To) 998 } 999 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 { 1000 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries) 1001 } 1002} 1003 1004func TestRecvMsgVote(t *testing.T) { 1005 tests := []struct { 1006 state StateType 1007 i, term uint64 1008 voteFor uint64 1009 wreject bool 1010 }{ 1011 {StateFollower, 0, 0, None, true}, 1012 {StateFollower, 0, 1, None, true}, 1013 {StateFollower, 0, 2, None, true}, 1014 {StateFollower, 0, 3, None, false}, 1015 1016 {StateFollower, 1, 0, None, true}, 1017 {StateFollower, 1, 1, None, true}, 1018 {StateFollower, 1, 2, None, true}, 1019 {StateFollower, 1, 3, None, false}, 1020 1021 {StateFollower, 2, 0, None, true}, 1022 {StateFollower, 2, 1, None, true}, 1023 {StateFollower, 2, 2, None, false}, 1024 {StateFollower, 2, 3, None, false}, 1025 1026 {StateFollower, 3, 0, None, true}, 1027 {StateFollower, 3, 1, None, true}, 1028 {StateFollower, 3, 2, None, false}, 1029 {StateFollower, 3, 3, None, false}, 1030 1031 {StateFollower, 3, 2, 2, false}, 1032 {StateFollower, 3, 2, 1, true}, 1033 1034 {StateLeader, 3, 3, 1, true}, 1035 {StateCandidate, 3, 3, 1, true}, 1036 } 1037 1038 for i, tt := range tests { 1039 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1040 sm.state = tt.state 1041 switch tt.state { 1042 case StateFollower: 1043 sm.step = stepFollower 1044 case StateCandidate: 1045 sm.step = stepCandidate 1046 case StateLeader: 1047 sm.step = stepLeader 1048 } 1049 sm.Vote = tt.voteFor 1050 sm.raftLog = &raftLog{ 1051 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}}, 1052 unstable: unstable{offset: 3}, 1053 } 1054 1055 sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term}) 1056 1057 msgs := sm.readMessages() 1058 if g := len(msgs); g != 1 { 1059 t.Fatalf("#%d: len(msgs) = %d, want 1", i, g) 1060 continue 1061 } 1062 if g := msgs[0].Reject; g != tt.wreject { 1063 t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject) 1064 } 1065 } 1066} 1067 1068func TestStateTransition(t *testing.T) { 1069 tests := []struct { 1070 from StateType 1071 to StateType 1072 wallow bool 1073 wterm uint64 1074 wlead uint64 1075 }{ 1076 {StateFollower, StateFollower, true, 1, None}, 1077 {StateFollower, StateCandidate, true, 1, None}, 1078 {StateFollower, StateLeader, false, 0, None}, 1079 1080 {StateCandidate, StateFollower, true, 0, None}, 1081 {StateCandidate, StateCandidate, true, 1, None}, 1082 {StateCandidate, StateLeader, true, 0, 1}, 1083 1084 {StateLeader, StateFollower, true, 1, None}, 1085 {StateLeader, StateCandidate, false, 1, None}, 1086 {StateLeader, StateLeader, true, 0, 1}, 1087 } 1088 1089 for i, tt := range tests { 1090 func() { 1091 defer func() { 1092 if r := recover(); r != nil { 1093 if tt.wallow == true { 1094 t.Errorf("%d: allow = %v, want %v", i, false, true) 1095 } 1096 } 1097 }() 1098 1099 sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1100 sm.state = tt.from 1101 1102 switch tt.to { 1103 case StateFollower: 1104 sm.becomeFollower(tt.wterm, tt.wlead) 1105 case StateCandidate: 1106 sm.becomeCandidate() 1107 case StateLeader: 1108 sm.becomeLeader() 1109 } 1110 1111 if sm.Term != tt.wterm { 1112 t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm) 1113 } 1114 if sm.lead != tt.wlead { 1115 t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead) 1116 } 1117 }() 1118 } 1119} 1120 1121func TestAllServerStepdown(t *testing.T) { 1122 tests := []struct { 1123 state StateType 1124 1125 wstate StateType 1126 wterm uint64 1127 windex uint64 1128 }{ 1129 {StateFollower, StateFollower, 3, 0}, 1130 {StateCandidate, StateFollower, 3, 0}, 1131 {StateLeader, StateFollower, 3, 1}, 1132 } 1133 1134 tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp} 1135 tterm := uint64(3) 1136 1137 for i, tt := range tests { 1138 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1139 switch tt.state { 1140 case StateFollower: 1141 sm.becomeFollower(1, None) 1142 case StateCandidate: 1143 sm.becomeCandidate() 1144 case StateLeader: 1145 sm.becomeCandidate() 1146 sm.becomeLeader() 1147 } 1148 1149 for j, msgType := range tmsgTypes { 1150 sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm}) 1151 1152 if sm.state != tt.wstate { 1153 t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) 1154 } 1155 if sm.Term != tt.wterm { 1156 t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm) 1157 } 1158 if uint64(sm.raftLog.lastIndex()) != tt.windex { 1159 t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex) 1160 } 1161 if uint64(len(sm.raftLog.allEntries())) != tt.windex { 1162 t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex) 1163 } 1164 wlead := uint64(2) 1165 if msgType == pb.MsgVote { 1166 wlead = None 1167 } 1168 if sm.lead != wlead { 1169 t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None) 1170 } 1171 } 1172 } 1173} 1174 1175func TestLeaderStepdownWhenQuorumActive(t *testing.T) { 1176 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 1177 1178 sm.checkQuorum = true 1179 1180 sm.becomeCandidate() 1181 sm.becomeLeader() 1182 1183 for i := 0; i < sm.electionTimeout+1; i++ { 1184 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Term: sm.Term}) 1185 sm.tick() 1186 } 1187 1188 if sm.state != StateLeader { 1189 t.Errorf("state = %v, want %v", sm.state, StateLeader) 1190 } 1191} 1192 1193func TestLeaderStepdownWhenQuorumLost(t *testing.T) { 1194 sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) 1195 1196 sm.checkQuorum = true 1197 1198 sm.becomeCandidate() 1199 sm.becomeLeader() 1200 1201 for i := 0; i < sm.electionTimeout+1; i++ { 1202 sm.tick() 1203 } 1204 1205 if sm.state != StateFollower { 1206 t.Errorf("state = %v, want %v", sm.state, StateFollower) 1207 } 1208} 1209 1210func TestLeaderAppResp(t *testing.T) { 1211 // initial progress: match = 0; next = 3 1212 tests := []struct { 1213 index uint64 1214 reject bool 1215 // progress 1216 wmatch uint64 1217 wnext uint64 1218 // message 1219 wmsgNum int 1220 windex uint64 1221 wcommitted uint64 1222 }{ 1223 {3, true, 0, 3, 0, 0, 0}, // stale resp; no replies 1224 {2, true, 0, 2, 1, 1, 0}, // denied resp; leader does not commit; decrease next and send probing msg 1225 {2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index 1226 {0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies 1227 } 1228 1229 for i, tt := range tests { 1230 // sm term is 1 after it becomes the leader. 1231 // thus the last log term must be 1 to be committed. 1232 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1233 sm.raftLog = &raftLog{ 1234 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, 1235 unstable: unstable{offset: 3}, 1236 } 1237 sm.becomeCandidate() 1238 sm.becomeLeader() 1239 sm.readMessages() 1240 sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) 1241 1242 p := sm.prs[2] 1243 if p.Match != tt.wmatch { 1244 t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) 1245 } 1246 if p.Next != tt.wnext { 1247 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) 1248 } 1249 1250 msgs := sm.readMessages() 1251 1252 if len(msgs) != tt.wmsgNum { 1253 t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum) 1254 } 1255 for j, msg := range msgs { 1256 if msg.Index != tt.windex { 1257 t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex) 1258 } 1259 if msg.Commit != tt.wcommitted { 1260 t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted) 1261 } 1262 } 1263 } 1264} 1265 1266// When the leader receives a heartbeat tick, it should 1267// send a MsgApp with m.Index = 0, m.LogTerm=0 and empty entries. 1268func TestBcastBeat(t *testing.T) { 1269 offset := uint64(1000) 1270 // make a state machine with log.offset = 1000 1271 s := pb.Snapshot{ 1272 Metadata: pb.SnapshotMetadata{ 1273 Index: offset, 1274 Term: 1, 1275 ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, 1276 }, 1277 } 1278 storage := NewMemoryStorage() 1279 storage.ApplySnapshot(s) 1280 sm := newTestRaft(1, nil, 10, 1, storage) 1281 sm.Term = 1 1282 1283 sm.becomeCandidate() 1284 sm.becomeLeader() 1285 for i := 0; i < 10; i++ { 1286 sm.appendEntry(pb.Entry{Index: uint64(i) + 1}) 1287 } 1288 // slow follower 1289 sm.prs[2].Match, sm.prs[2].Next = 5, 6 1290 // normal follower 1291 sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 1292 1293 sm.Step(pb.Message{Type: pb.MsgBeat}) 1294 msgs := sm.readMessages() 1295 if len(msgs) != 2 { 1296 t.Fatalf("len(msgs) = %v, want 2", len(msgs)) 1297 } 1298 wantCommitMap := map[uint64]uint64{ 1299 2: min(sm.raftLog.committed, sm.prs[2].Match), 1300 3: min(sm.raftLog.committed, sm.prs[3].Match), 1301 } 1302 for i, m := range msgs { 1303 if m.Type != pb.MsgHeartbeat { 1304 t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat) 1305 } 1306 if m.Index != 0 { 1307 t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0) 1308 } 1309 if m.LogTerm != 0 { 1310 t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0) 1311 } 1312 if wantCommitMap[m.To] == 0 { 1313 t.Fatalf("#%d: unexpected to %d", i, m.To) 1314 } else { 1315 if m.Commit != wantCommitMap[m.To] { 1316 t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To]) 1317 } 1318 delete(wantCommitMap, m.To) 1319 } 1320 if len(m.Entries) != 0 { 1321 t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries)) 1322 } 1323 } 1324} 1325 1326// tests the output of the state machine when receiving MsgBeat 1327func TestRecvMsgBeat(t *testing.T) { 1328 tests := []struct { 1329 state StateType 1330 wMsg int 1331 }{ 1332 {StateLeader, 2}, 1333 // candidate and follower should ignore MsgBeat 1334 {StateCandidate, 0}, 1335 {StateFollower, 0}, 1336 } 1337 1338 for i, tt := range tests { 1339 sm := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) 1340 sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} 1341 sm.Term = 1 1342 sm.state = tt.state 1343 switch tt.state { 1344 case StateFollower: 1345 sm.step = stepFollower 1346 case StateCandidate: 1347 sm.step = stepCandidate 1348 case StateLeader: 1349 sm.step = stepLeader 1350 } 1351 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 1352 1353 msgs := sm.readMessages() 1354 if len(msgs) != tt.wMsg { 1355 t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg) 1356 } 1357 for _, m := range msgs { 1358 if m.Type != pb.MsgHeartbeat { 1359 t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat) 1360 } 1361 } 1362 } 1363} 1364 1365func TestLeaderIncreaseNext(t *testing.T) { 1366 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 1367 tests := []struct { 1368 // progress 1369 state ProgressStateType 1370 next uint64 1371 1372 wnext uint64 1373 }{ 1374 // state replicate, optimistically increase next 1375 // previous entries + noop entry + propose + 1 1376 {ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)}, 1377 // state probe, not optimistically increase next 1378 {ProgressStateProbe, 2, 2}, 1379 } 1380 1381 for i, tt := range tests { 1382 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1383 sm.raftLog.append(previousEnts...) 1384 sm.becomeCandidate() 1385 sm.becomeLeader() 1386 sm.prs[2].State = tt.state 1387 sm.prs[2].Next = tt.next 1388 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 1389 1390 p := sm.prs[2] 1391 if p.Next != tt.wnext { 1392 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) 1393 } 1394 } 1395} 1396 1397func TestSendAppendForProgressProbe(t *testing.T) { 1398 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1399 r.becomeCandidate() 1400 r.becomeLeader() 1401 r.readMessages() 1402 r.prs[2].becomeProbe() 1403 1404 // each round is a heartbeat 1405 for i := 0; i < 3; i++ { 1406 // we expect that raft will only send out one msgAPP per heartbeat timeout 1407 r.appendEntry(pb.Entry{Data: []byte("somedata")}) 1408 r.sendAppend(2) 1409 msg := r.readMessages() 1410 if len(msg) != 1 { 1411 t.Errorf("len(msg) = %d, want %d", len(msg), 1) 1412 } 1413 if msg[0].Index != 0 { 1414 t.Errorf("index = %d, want %d", msg[0].Index, 0) 1415 } 1416 1417 if r.prs[2].Paused != true { 1418 t.Errorf("paused = %v, want true", r.prs[2].Paused) 1419 } 1420 for j := 0; j < 10; j++ { 1421 r.appendEntry(pb.Entry{Data: []byte("somedata")}) 1422 r.sendAppend(2) 1423 if l := len(r.readMessages()); l != 0 { 1424 t.Errorf("len(msg) = %d, want %d", l, 0) 1425 } 1426 } 1427 1428 // do a heartbeat 1429 for j := 0; j < r.heartbeatTimeout; j++ { 1430 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 1431 } 1432 // consume the heartbeat 1433 msg = r.readMessages() 1434 if len(msg) != 1 { 1435 t.Errorf("len(msg) = %d, want %d", len(msg), 1) 1436 } 1437 if msg[0].Type != pb.MsgHeartbeat { 1438 t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat) 1439 } 1440 } 1441} 1442 1443func TestSendAppendForProgressReplicate(t *testing.T) { 1444 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1445 r.becomeCandidate() 1446 r.becomeLeader() 1447 r.readMessages() 1448 r.prs[2].becomeReplicate() 1449 1450 for i := 0; i < 10; i++ { 1451 r.appendEntry(pb.Entry{Data: []byte("somedata")}) 1452 r.sendAppend(2) 1453 msgs := r.readMessages() 1454 if len(msgs) != 1 { 1455 t.Errorf("len(msg) = %d, want %d", len(msgs), 1) 1456 } 1457 } 1458} 1459 1460func TestSendAppendForProgressSnapshot(t *testing.T) { 1461 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1462 r.becomeCandidate() 1463 r.becomeLeader() 1464 r.readMessages() 1465 r.prs[2].becomeSnapshot(10) 1466 1467 for i := 0; i < 10; i++ { 1468 r.appendEntry(pb.Entry{Data: []byte("somedata")}) 1469 r.sendAppend(2) 1470 msgs := r.readMessages() 1471 if len(msgs) != 0 { 1472 t.Errorf("len(msg) = %d, want %d", len(msgs), 0) 1473 } 1474 } 1475} 1476 1477func TestRecvMsgUnreachable(t *testing.T) { 1478 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 1479 s := NewMemoryStorage() 1480 s.Append(previousEnts) 1481 r := newTestRaft(1, []uint64{1, 2}, 10, 1, s) 1482 r.becomeCandidate() 1483 r.becomeLeader() 1484 r.readMessages() 1485 // set node 2 to state replicate 1486 r.prs[2].Match = 3 1487 r.prs[2].becomeReplicate() 1488 r.prs[2].optimisticUpdate(5) 1489 1490 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) 1491 1492 if r.prs[2].State != ProgressStateProbe { 1493 t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe) 1494 } 1495 if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext { 1496 t.Errorf("next = %d, want %d", r.prs[2].Next, wnext) 1497 } 1498} 1499 1500func TestRestore(t *testing.T) { 1501 s := pb.Snapshot{ 1502 Metadata: pb.SnapshotMetadata{ 1503 Index: 11, // magic number 1504 Term: 11, // magic number 1505 ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, 1506 }, 1507 } 1508 1509 storage := NewMemoryStorage() 1510 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) 1511 if ok := sm.restore(s); !ok { 1512 t.Fatal("restore fail, want succeed") 1513 } 1514 1515 if sm.raftLog.lastIndex() != s.Metadata.Index { 1516 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) 1517 } 1518 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { 1519 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) 1520 } 1521 sg := sm.nodes() 1522 if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { 1523 t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes) 1524 } 1525 1526 if ok := sm.restore(s); ok { 1527 t.Fatal("restore succeed, want fail") 1528 } 1529} 1530 1531func TestRestoreIgnoreSnapshot(t *testing.T) { 1532 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}} 1533 commit := uint64(1) 1534 storage := NewMemoryStorage() 1535 sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) 1536 sm.raftLog.append(previousEnts...) 1537 sm.raftLog.commitTo(commit) 1538 1539 s := pb.Snapshot{ 1540 Metadata: pb.SnapshotMetadata{ 1541 Index: commit, 1542 Term: 1, 1543 ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, 1544 }, 1545 } 1546 1547 // ignore snapshot 1548 if ok := sm.restore(s); ok { 1549 t.Errorf("restore = %t, want %t", ok, false) 1550 } 1551 if sm.raftLog.committed != commit { 1552 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit) 1553 } 1554 1555 // ignore snapshot and fast forward commit 1556 s.Metadata.Index = commit + 1 1557 if ok := sm.restore(s); ok { 1558 t.Errorf("restore = %t, want %t", ok, false) 1559 } 1560 if sm.raftLog.committed != commit+1 { 1561 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1) 1562 } 1563} 1564 1565func TestProvideSnap(t *testing.T) { 1566 // restore the state machine from a snapshot so it has a compacted log and a snapshot 1567 s := pb.Snapshot{ 1568 Metadata: pb.SnapshotMetadata{ 1569 Index: 11, // magic number 1570 Term: 11, // magic number 1571 ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, 1572 }, 1573 } 1574 storage := NewMemoryStorage() 1575 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 1576 sm.restore(s) 1577 1578 sm.becomeCandidate() 1579 sm.becomeLeader() 1580 1581 // force set the next of node 2, so that node 2 needs a snapshot 1582 sm.prs[2].Next = sm.raftLog.firstIndex() 1583 sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) 1584 1585 msgs := sm.readMessages() 1586 if len(msgs) != 1 { 1587 t.Fatalf("len(msgs) = %d, want 1", len(msgs)) 1588 } 1589 m := msgs[0] 1590 if m.Type != pb.MsgSnap { 1591 t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap) 1592 } 1593} 1594 1595func TestIgnoreProvidingSnap(t *testing.T) { 1596 // restore the state machine from a snapshot so it has a compacted log and a snapshot 1597 s := pb.Snapshot{ 1598 Metadata: pb.SnapshotMetadata{ 1599 Index: 11, // magic number 1600 Term: 11, // magic number 1601 ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, 1602 }, 1603 } 1604 storage := NewMemoryStorage() 1605 sm := newTestRaft(1, []uint64{1}, 10, 1, storage) 1606 sm.restore(s) 1607 1608 sm.becomeCandidate() 1609 sm.becomeLeader() 1610 1611 // force set the next of node 2, so that node 2 needs a snapshot 1612 // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 1613 sm.prs[2].Next = sm.raftLog.firstIndex() - 1 1614 sm.prs[2].RecentActive = false 1615 1616 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) 1617 1618 msgs := sm.readMessages() 1619 if len(msgs) != 0 { 1620 t.Errorf("len(msgs) = %d, want 0", len(msgs)) 1621 } 1622} 1623 1624func TestRestoreFromSnapMsg(t *testing.T) { 1625 s := pb.Snapshot{ 1626 Metadata: pb.SnapshotMetadata{ 1627 Index: 11, // magic number 1628 Term: 11, // magic number 1629 ConfState: pb.ConfState{Nodes: []uint64{1, 2}}, 1630 }, 1631 } 1632 m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s} 1633 1634 sm := newTestRaft(2, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1635 sm.Step(m) 1636 1637 // TODO(bdarnell): what should this test? 1638} 1639 1640func TestSlowNodeRestore(t *testing.T) { 1641 nt := newNetwork(nil, nil, nil) 1642 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1643 1644 nt.isolate(3) 1645 for j := 0; j <= 100; j++ { 1646 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 1647 } 1648 lead := nt.peers[1].(*raft) 1649 nextEnts(lead, nt.storage[1]) 1650 nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) 1651 nt.storage[1].Compact(lead.raftLog.applied) 1652 1653 nt.recover() 1654 // send heartbeats so that the leader can learn everyone is active. 1655 // node 3 will only be considered as active when node 1 receives a reply from it. 1656 for { 1657 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) 1658 if lead.prs[3].RecentActive { 1659 break 1660 } 1661 } 1662 1663 // trigger a snapshot 1664 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 1665 1666 follower := nt.peers[3].(*raft) 1667 1668 // trigger a commit 1669 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) 1670 if follower.raftLog.committed != lead.raftLog.committed { 1671 t.Errorf("follower.committed = %d, want %d", follower.raftLog.committed, lead.raftLog.committed) 1672 } 1673} 1674 1675// TestStepConfig tests that when raft step msgProp in EntryConfChange type, 1676// it appends the entry to log and sets pendingConf to be true. 1677func TestStepConfig(t *testing.T) { 1678 // a raft that cannot make progress 1679 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1680 r.becomeCandidate() 1681 r.becomeLeader() 1682 index := r.raftLog.lastIndex() 1683 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 1684 if g := r.raftLog.lastIndex(); g != index+1 { 1685 t.Errorf("index = %d, want %d", g, index+1) 1686 } 1687 if r.pendingConf != true { 1688 t.Errorf("pendingConf = %v, want true", r.pendingConf) 1689 } 1690} 1691 1692// TestStepIgnoreConfig tests that if raft step the second msgProp in 1693// EntryConfChange type when the first one is uncommitted, the node will set 1694// the proposal to noop and keep its original state. 1695func TestStepIgnoreConfig(t *testing.T) { 1696 // a raft that cannot make progress 1697 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1698 r.becomeCandidate() 1699 r.becomeLeader() 1700 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 1701 index := r.raftLog.lastIndex() 1702 pendingConf := r.pendingConf 1703 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) 1704 wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} 1705 ents, err := r.raftLog.entries(index+1, noLimit) 1706 if err != nil { 1707 t.Fatalf("unexpected error %v", err) 1708 } 1709 if !reflect.DeepEqual(ents, wents) { 1710 t.Errorf("ents = %+v, want %+v", ents, wents) 1711 } 1712 if r.pendingConf != pendingConf { 1713 t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) 1714 } 1715} 1716 1717// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag 1718// based on uncommitted entries. 1719func TestRecoverPendingConfig(t *testing.T) { 1720 tests := []struct { 1721 entType pb.EntryType 1722 wpending bool 1723 }{ 1724 {pb.EntryNormal, false}, 1725 {pb.EntryConfChange, true}, 1726 } 1727 for i, tt := range tests { 1728 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1729 r.appendEntry(pb.Entry{Type: tt.entType}) 1730 r.becomeCandidate() 1731 r.becomeLeader() 1732 if r.pendingConf != tt.wpending { 1733 t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) 1734 } 1735 } 1736} 1737 1738// TestRecoverDoublePendingConfig tests that new leader will panic if 1739// there exist two uncommitted config entries. 1740func TestRecoverDoublePendingConfig(t *testing.T) { 1741 func() { 1742 defer func() { 1743 if err := recover(); err == nil { 1744 t.Errorf("expect panic, but nothing happens") 1745 } 1746 }() 1747 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1748 r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) 1749 r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) 1750 r.becomeCandidate() 1751 r.becomeLeader() 1752 }() 1753} 1754 1755// TestAddNode tests that addNode could update pendingConf and nodes correctly. 1756func TestAddNode(t *testing.T) { 1757 r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) 1758 r.pendingConf = true 1759 r.addNode(2) 1760 if r.pendingConf != false { 1761 t.Errorf("pendingConf = %v, want false", r.pendingConf) 1762 } 1763 nodes := r.nodes() 1764 wnodes := []uint64{1, 2} 1765 if !reflect.DeepEqual(nodes, wnodes) { 1766 t.Errorf("nodes = %v, want %v", nodes, wnodes) 1767 } 1768} 1769 1770// TestRemoveNode tests that removeNode could update pendingConf, nodes and 1771// and removed list correctly. 1772func TestRemoveNode(t *testing.T) { 1773 r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) 1774 r.pendingConf = true 1775 r.removeNode(2) 1776 if r.pendingConf != false { 1777 t.Errorf("pendingConf = %v, want false", r.pendingConf) 1778 } 1779 w := []uint64{1} 1780 if g := r.nodes(); !reflect.DeepEqual(g, w) { 1781 t.Errorf("nodes = %v, want %v", g, w) 1782 } 1783 1784 // remove all nodes from cluster 1785 r.removeNode(1) 1786 w = []uint64{} 1787 if g := r.nodes(); !reflect.DeepEqual(g, w) { 1788 t.Errorf("nodes = %v, want %v", g, w) 1789 } 1790} 1791 1792func TestPromotable(t *testing.T) { 1793 id := uint64(1) 1794 tests := []struct { 1795 peers []uint64 1796 wp bool 1797 }{ 1798 {[]uint64{1}, true}, 1799 {[]uint64{1, 2, 3}, true}, 1800 {[]uint64{}, false}, 1801 {[]uint64{2, 3}, false}, 1802 } 1803 for i, tt := range tests { 1804 r := newTestRaft(id, tt.peers, 5, 1, NewMemoryStorage()) 1805 if g := r.promotable(); g != tt.wp { 1806 t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp) 1807 } 1808 } 1809} 1810 1811func TestRaftNodes(t *testing.T) { 1812 tests := []struct { 1813 ids []uint64 1814 wids []uint64 1815 }{ 1816 { 1817 []uint64{1, 2, 3}, 1818 []uint64{1, 2, 3}, 1819 }, 1820 { 1821 []uint64{3, 2, 1}, 1822 []uint64{1, 2, 3}, 1823 }, 1824 } 1825 for i, tt := range tests { 1826 r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) 1827 if !reflect.DeepEqual(r.nodes(), tt.wids) { 1828 t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) 1829 } 1830 } 1831} 1832 1833func TestCampaignWhileLeader(t *testing.T) { 1834 r := newTestRaft(1, []uint64{1}, 5, 1, NewMemoryStorage()) 1835 if r.state != StateFollower { 1836 t.Errorf("expected new node to be follower but got %s", r.state) 1837 } 1838 // We don't call campaign() directly because it comes after the check 1839 // for our current state. 1840 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1841 if r.state != StateLeader { 1842 t.Errorf("expected single-node election to become leader but got %s", r.state) 1843 } 1844 term := r.Term 1845 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) 1846 if r.state != StateLeader { 1847 t.Errorf("expected to remain leader but got %s", r.state) 1848 } 1849 if r.Term != term { 1850 t.Errorf("expected to remain in term %v but got %v", term, r.Term) 1851 } 1852} 1853 1854// TestCommitAfterRemoveNode verifies that pending commands can become 1855// committed when a config change reduces the quorum requirements. 1856func TestCommitAfterRemoveNode(t *testing.T) { 1857 // Create a cluster with two nodes. 1858 s := NewMemoryStorage() 1859 r := newTestRaft(1, []uint64{1, 2}, 5, 1, s) 1860 r.becomeCandidate() 1861 r.becomeLeader() 1862 1863 // Begin to remove the second node. 1864 cc := pb.ConfChange{ 1865 Type: pb.ConfChangeRemoveNode, 1866 NodeID: 2, 1867 } 1868 ccData, err := cc.Marshal() 1869 if err != nil { 1870 t.Fatal(err) 1871 } 1872 r.Step(pb.Message{ 1873 Type: pb.MsgProp, 1874 Entries: []pb.Entry{ 1875 {Type: pb.EntryConfChange, Data: ccData}, 1876 }, 1877 }) 1878 // Stabilize the log and make sure nothing is committed yet. 1879 if ents := nextEnts(r, s); len(ents) > 0 { 1880 t.Fatalf("unexpected committed entries: %v", ents) 1881 } 1882 ccIndex := r.raftLog.lastIndex() 1883 1884 // While the config change is pending, make another proposal. 1885 r.Step(pb.Message{ 1886 Type: pb.MsgProp, 1887 Entries: []pb.Entry{ 1888 {Type: pb.EntryNormal, Data: []byte("hello")}, 1889 }, 1890 }) 1891 1892 // Node 2 acknowledges the config change, committing it. 1893 r.Step(pb.Message{ 1894 Type: pb.MsgAppResp, 1895 From: 2, 1896 Index: ccIndex, 1897 }) 1898 ents := nextEnts(r, s) 1899 if len(ents) != 2 { 1900 t.Fatalf("expected two committed entries, got %v", ents) 1901 } 1902 if ents[0].Type != pb.EntryNormal || ents[0].Data != nil { 1903 t.Fatalf("expected ents[0] to be empty, but got %v", ents[0]) 1904 } 1905 if ents[1].Type != pb.EntryConfChange { 1906 t.Fatalf("expected ents[1] to be EntryConfChange, got %v", ents[1]) 1907 } 1908 1909 // Apply the config change. This reduces quorum requirements so the 1910 // pending command can now commit. 1911 r.removeNode(2) 1912 ents = nextEnts(r, s) 1913 if len(ents) != 1 || ents[0].Type != pb.EntryNormal || 1914 string(ents[0].Data) != "hello" { 1915 t.Fatalf("expected one committed EntryNormal, got %v", ents) 1916 } 1917} 1918 1919func ents(terms ...uint64) *raft { 1920 storage := NewMemoryStorage() 1921 for i, term := range terms { 1922 storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}}) 1923 } 1924 sm := newTestRaft(1, []uint64{}, 5, 1, storage) 1925 sm.reset(0) 1926 return sm 1927} 1928 1929type network struct { 1930 peers map[uint64]stateMachine 1931 storage map[uint64]*MemoryStorage 1932 dropm map[connem]float64 1933 ignorem map[pb.MessageType]bool 1934} 1935 1936// newNetwork initializes a network from peers. 1937// A nil node will be replaced with a new *stateMachine. 1938// A *stateMachine will get its k, id. 1939// When using stateMachine, the address list is always [1, n]. 1940func newNetwork(peers ...stateMachine) *network { 1941 size := len(peers) 1942 peerAddrs := idsBySize(size) 1943 1944 npeers := make(map[uint64]stateMachine, size) 1945 nstorage := make(map[uint64]*MemoryStorage, size) 1946 1947 for j, p := range peers { 1948 id := peerAddrs[j] 1949 switch v := p.(type) { 1950 case nil: 1951 nstorage[id] = NewMemoryStorage() 1952 sm := newTestRaft(id, peerAddrs, 10, 1, nstorage[id]) 1953 npeers[id] = sm 1954 case *raft: 1955 v.id = id 1956 v.prs = make(map[uint64]*Progress) 1957 for i := 0; i < size; i++ { 1958 v.prs[peerAddrs[i]] = &Progress{} 1959 } 1960 v.reset(0) 1961 npeers[id] = v 1962 case *blackHole: 1963 npeers[id] = v 1964 default: 1965 panic(fmt.Sprintf("unexpected state machine type: %T", p)) 1966 } 1967 } 1968 return &network{ 1969 peers: npeers, 1970 storage: nstorage, 1971 dropm: make(map[connem]float64), 1972 ignorem: make(map[pb.MessageType]bool), 1973 } 1974} 1975 1976func (nw *network) send(msgs ...pb.Message) { 1977 for len(msgs) > 0 { 1978 m := msgs[0] 1979 p := nw.peers[m.To] 1980 p.Step(m) 1981 msgs = append(msgs[1:], nw.filter(p.readMessages())...) 1982 } 1983} 1984 1985func (nw *network) drop(from, to uint64, perc float64) { 1986 nw.dropm[connem{from, to}] = perc 1987} 1988 1989func (nw *network) cut(one, other uint64) { 1990 nw.drop(one, other, 1) 1991 nw.drop(other, one, 1) 1992} 1993 1994func (nw *network) isolate(id uint64) { 1995 for i := 0; i < len(nw.peers); i++ { 1996 nid := uint64(i) + 1 1997 if nid != id { 1998 nw.drop(id, nid, 1.0) 1999 nw.drop(nid, id, 1.0) 2000 } 2001 } 2002} 2003 2004func (nw *network) ignore(t pb.MessageType) { 2005 nw.ignorem[t] = true 2006} 2007 2008func (nw *network) recover() { 2009 nw.dropm = make(map[connem]float64) 2010 nw.ignorem = make(map[pb.MessageType]bool) 2011} 2012 2013func (nw *network) filter(msgs []pb.Message) []pb.Message { 2014 mm := []pb.Message{} 2015 for _, m := range msgs { 2016 if nw.ignorem[m.Type] { 2017 continue 2018 } 2019 switch m.Type { 2020 case pb.MsgHup: 2021 // hups never go over the network, so don't drop them but panic 2022 panic("unexpected msgHup") 2023 default: 2024 perc := nw.dropm[connem{m.From, m.To}] 2025 if n := rand.Float64(); n < perc { 2026 continue 2027 } 2028 } 2029 mm = append(mm, m) 2030 } 2031 return mm 2032} 2033 2034type connem struct { 2035 from, to uint64 2036} 2037 2038type blackHole struct{} 2039 2040func (blackHole) Step(pb.Message) error { return nil } 2041func (blackHole) readMessages() []pb.Message { return nil } 2042 2043var nopStepper = &blackHole{} 2044 2045func idsBySize(size int) []uint64 { 2046 ids := make([]uint64, size) 2047 for i := 0; i < size; i++ { 2048 ids[i] = 1 + uint64(i) 2049 } 2050 return ids 2051} 2052 2053func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage Storage) *Config { 2054 return &Config{ 2055 ID: id, 2056 peers: peers, 2057 ElectionTick: election, 2058 HeartbeatTick: heartbeat, 2059 Storage: storage, 2060 MaxSizePerMsg: noLimit, 2061 MaxInflightMsgs: 256, 2062 } 2063} 2064 2065func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { 2066 return newRaft(newTestConfig(id, peers, election, heartbeat, storage)) 2067} 2068