1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "encoding/json" 19 "expvar" 20 "sort" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 26 "github.com/coreos/etcd/etcdserver/membership" 27 "github.com/coreos/etcd/pkg/contention" 28 "github.com/coreos/etcd/pkg/pbutil" 29 "github.com/coreos/etcd/pkg/types" 30 "github.com/coreos/etcd/raft" 31 "github.com/coreos/etcd/raft/raftpb" 32 "github.com/coreos/etcd/rafthttp" 33 "github.com/coreos/etcd/wal" 34 "github.com/coreos/etcd/wal/walpb" 35 "github.com/coreos/pkg/capnslog" 36) 37 38const ( 39 // Number of entries for slow follower to catch-up after compacting 40 // the raft storage entries. 41 // We expect the follower has a millisecond level latency with the leader. 42 // The max throughput is around 10K. Keep a 5K entries is enough for helping 43 // follower to catch up. 44 numberOfCatchUpEntries = 5000 45 46 // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). 47 // Assuming the RTT is around 10ms, 1MB max size is large enough. 48 maxSizePerMsg = 1 * 1024 * 1024 49 // Never overflow the rafthttp buffer, which is 4096. 50 // TODO: a better const? 51 maxInflightMsgs = 4096 / 8 52) 53 54var ( 55 // protects raftStatus 56 raftStatusMu sync.Mutex 57 // indirection for expvar func interface 58 // expvar panics when publishing duplicate name 59 // expvar does not support remove a registered name 60 // so only register a func that calls raftStatus 61 // and change raftStatus as we need. 62 raftStatus func() raft.Status 63) 64 65func init() { 66 raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft")) 67 expvar.Publish("raft.status", expvar.Func(func() interface{} { 68 raftStatusMu.Lock() 69 defer raftStatusMu.Unlock() 70 return raftStatus() 71 })) 72} 73 74type RaftTimer interface { 75 Index() uint64 76 Term() uint64 77} 78 79// apply contains entries, snapshot to be applied. Once 80// an apply is consumed, the entries will be persisted to 81// to raft storage concurrently; the application must read 82// raftDone before assuming the raft messages are stable. 83type apply struct { 84 entries []raftpb.Entry 85 snapshot raftpb.Snapshot 86 // notifyc synchronizes etcd server applies with the raft node 87 notifyc chan struct{} 88} 89 90type raftNode struct { 91 // Cache of the latest raft index and raft term the server has seen. 92 // These three unit64 fields must be the first elements to keep 64-bit 93 // alignment for atomic access to the fields. 94 index uint64 95 term uint64 96 lead uint64 97 98 tickMu *sync.Mutex 99 raftNodeConfig 100 101 // a chan to send/receive snapshot 102 msgSnapC chan raftpb.Message 103 104 // a chan to send out apply 105 applyc chan apply 106 107 // a chan to send out readState 108 readStateC chan raft.ReadState 109 110 // utility 111 ticker *time.Ticker 112 // contention detectors for raft heartbeat message 113 td *contention.TimeoutDetector 114 115 stopped chan struct{} 116 done chan struct{} 117} 118 119type raftNodeConfig struct { 120 // to check if msg receiver is removed from cluster 121 isIDRemoved func(id uint64) bool 122 raft.Node 123 raftStorage *raft.MemoryStorage 124 storage Storage 125 heartbeat time.Duration // for logging 126 // transport specifies the transport to send and receive msgs to members. 127 // Sending messages MUST NOT block. It is okay to drop messages, since 128 // clients should timeout and reissue their messages. 129 // If transport is nil, server will panic. 130 transport rafthttp.Transporter 131} 132 133func newRaftNode(cfg raftNodeConfig) *raftNode { 134 r := &raftNode{ 135 tickMu: new(sync.Mutex), 136 raftNodeConfig: cfg, 137 // set up contention detectors for raft heartbeat message. 138 // expect to send a heartbeat within 2 heartbeat intervals. 139 td: contention.NewTimeoutDetector(2 * cfg.heartbeat), 140 readStateC: make(chan raft.ReadState, 1), 141 msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), 142 applyc: make(chan apply), 143 stopped: make(chan struct{}), 144 done: make(chan struct{}), 145 } 146 if r.heartbeat == 0 { 147 r.ticker = &time.Ticker{} 148 } else { 149 r.ticker = time.NewTicker(r.heartbeat) 150 } 151 return r 152} 153 154// raft.Node does not have locks in Raft package 155func (r *raftNode) tick() { 156 r.tickMu.Lock() 157 r.Tick() 158 r.tickMu.Unlock() 159} 160 161// start prepares and starts raftNode in a new goroutine. It is no longer safe 162// to modify the fields after it has been started. 163func (r *raftNode) start(rh *raftReadyHandler) { 164 internalTimeout := time.Second 165 166 go func() { 167 defer r.onStop() 168 islead := false 169 170 for { 171 select { 172 case <-r.ticker.C: 173 r.tick() 174 case rd := <-r.Ready(): 175 if rd.SoftState != nil { 176 newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead 177 if newLeader { 178 leaderChanges.Inc() 179 } 180 181 if rd.SoftState.Lead == raft.None { 182 hasLeader.Set(0) 183 } else { 184 hasLeader.Set(1) 185 } 186 187 atomic.StoreUint64(&r.lead, rd.SoftState.Lead) 188 islead = rd.RaftState == raft.StateLeader 189 if islead { 190 isLeader.Set(1) 191 } else { 192 isLeader.Set(0) 193 } 194 rh.updateLeadership(newLeader) 195 r.td.Reset() 196 } 197 198 if len(rd.ReadStates) != 0 { 199 select { 200 case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: 201 case <-time.After(internalTimeout): 202 plog.Warningf("timed out sending read state") 203 case <-r.stopped: 204 return 205 } 206 } 207 208 notifyc := make(chan struct{}, 1) 209 ap := apply{ 210 entries: rd.CommittedEntries, 211 snapshot: rd.Snapshot, 212 notifyc: notifyc, 213 } 214 215 updateCommittedIndex(&ap, rh) 216 217 select { 218 case r.applyc <- ap: 219 case <-r.stopped: 220 return 221 } 222 223 // the leader can write to its disk in parallel with replicating to the followers and them 224 // writing to their disks. 225 // For more details, check raft thesis 10.2.1 226 if islead { 227 // gofail: var raftBeforeLeaderSend struct{} 228 r.transport.Send(r.processMessages(rd.Messages)) 229 } 230 231 // gofail: var raftBeforeSave struct{} 232 if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { 233 plog.Fatalf("raft save state and entries error: %v", err) 234 } 235 if !raft.IsEmptyHardState(rd.HardState) { 236 proposalsCommitted.Set(float64(rd.HardState.Commit)) 237 } 238 // gofail: var raftAfterSave struct{} 239 240 if !raft.IsEmptySnap(rd.Snapshot) { 241 // gofail: var raftBeforeSaveSnap struct{} 242 if err := r.storage.SaveSnap(rd.Snapshot); err != nil { 243 plog.Fatalf("raft save snapshot error: %v", err) 244 } 245 // etcdserver now claim the snapshot has been persisted onto the disk 246 notifyc <- struct{}{} 247 248 // gofail: var raftAfterSaveSnap struct{} 249 r.raftStorage.ApplySnapshot(rd.Snapshot) 250 plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) 251 // gofail: var raftAfterApplySnap struct{} 252 } 253 254 r.raftStorage.Append(rd.Entries) 255 256 if !islead { 257 // finish processing incoming messages before we signal raftdone chan 258 msgs := r.processMessages(rd.Messages) 259 260 // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots 261 notifyc <- struct{}{} 262 263 // Candidate or follower needs to wait for all pending configuration 264 // changes to be applied before sending messages. 265 // Otherwise we might incorrectly count votes (e.g. votes from removed members). 266 // Also slow machine's follower raft-layer could proceed to become the leader 267 // on its own single-node cluster, before apply-layer applies the config change. 268 // We simply wait for ALL pending entries to be applied for now. 269 // We might improve this later on if it causes unnecessary long blocking issues. 270 waitApply := false 271 for _, ent := range rd.CommittedEntries { 272 if ent.Type == raftpb.EntryConfChange { 273 waitApply = true 274 break 275 } 276 } 277 if waitApply { 278 // blocks until 'applyAll' calls 'applyWait.Trigger' 279 // to be in sync with scheduled config-change job 280 // (assume notifyc has cap of 1) 281 select { 282 case notifyc <- struct{}{}: 283 case <-r.stopped: 284 return 285 } 286 } 287 288 // gofail: var raftBeforeFollowerSend struct{} 289 r.transport.Send(msgs) 290 } else { 291 // leader already processed 'MsgSnap' and signaled 292 notifyc <- struct{}{} 293 } 294 295 r.Advance() 296 case <-r.stopped: 297 return 298 } 299 } 300 }() 301} 302 303func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { 304 var ci uint64 305 if len(ap.entries) != 0 { 306 ci = ap.entries[len(ap.entries)-1].Index 307 } 308 if ap.snapshot.Metadata.Index > ci { 309 ci = ap.snapshot.Metadata.Index 310 } 311 if ci != 0 { 312 rh.updateCommittedIndex(ci) 313 } 314} 315 316func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { 317 sentAppResp := false 318 for i := len(ms) - 1; i >= 0; i-- { 319 if r.isIDRemoved(ms[i].To) { 320 ms[i].To = 0 321 } 322 323 if ms[i].Type == raftpb.MsgAppResp { 324 if sentAppResp { 325 ms[i].To = 0 326 } else { 327 sentAppResp = true 328 } 329 } 330 331 if ms[i].Type == raftpb.MsgSnap { 332 // There are two separate data store: the store for v2, and the KV for v3. 333 // The msgSnap only contains the most recent snapshot of store without KV. 334 // So we need to redirect the msgSnap to etcd server main loop for merging in the 335 // current store snapshot and KV snapshot. 336 select { 337 case r.msgSnapC <- ms[i]: 338 default: 339 // drop msgSnap if the inflight chan if full. 340 } 341 ms[i].To = 0 342 } 343 if ms[i].Type == raftpb.MsgHeartbeat { 344 ok, exceed := r.td.Observe(ms[i].To) 345 if !ok { 346 // TODO: limit request rate. 347 plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) 348 plog.Warningf("server is likely overloaded") 349 heartbeatSendFailures.Inc() 350 } 351 } 352 } 353 return ms 354} 355 356func (r *raftNode) apply() chan apply { 357 return r.applyc 358} 359 360func (r *raftNode) stop() { 361 r.stopped <- struct{}{} 362 <-r.done 363} 364 365func (r *raftNode) onStop() { 366 r.Stop() 367 r.ticker.Stop() 368 r.transport.Stop() 369 if err := r.storage.Close(); err != nil { 370 plog.Panicf("raft close storage error: %v", err) 371 } 372 close(r.done) 373} 374 375// for testing 376func (r *raftNode) pauseSending() { 377 p := r.transport.(rafthttp.Pausable) 378 p.Pause() 379} 380 381func (r *raftNode) resumeSending() { 382 p := r.transport.(rafthttp.Pausable) 383 p.Resume() 384} 385 386// advanceTicks advances ticks of Raft node. 387// This can be used for fast-forwarding election 388// ticks in multi data-center deployments, thus 389// speeding up election process. 390func (r *raftNode) advanceTicks(ticks int) { 391 for i := 0; i < ticks; i++ { 392 r.tick() 393 } 394} 395 396func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { 397 var err error 398 member := cl.MemberByName(cfg.Name) 399 metadata := pbutil.MustMarshal( 400 &pb.Metadata{ 401 NodeID: uint64(member.ID), 402 ClusterID: uint64(cl.ID()), 403 }, 404 ) 405 if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { 406 plog.Fatalf("create wal error: %v", err) 407 } 408 peers := make([]raft.Peer, len(ids)) 409 for i, id := range ids { 410 ctx, err := json.Marshal((*cl).Member(id)) 411 if err != nil { 412 plog.Panicf("marshal member should never fail: %v", err) 413 } 414 peers[i] = raft.Peer{ID: uint64(id), Context: ctx} 415 } 416 id = member.ID 417 plog.Infof("starting member %s in cluster %s", id, cl.ID()) 418 s = raft.NewMemoryStorage() 419 c := &raft.Config{ 420 ID: uint64(id), 421 ElectionTick: cfg.ElectionTicks, 422 HeartbeatTick: 1, 423 Storage: s, 424 MaxSizePerMsg: maxSizePerMsg, 425 MaxInflightMsgs: maxInflightMsgs, 426 CheckQuorum: true, 427 } 428 429 n = raft.StartNode(c, peers) 430 raftStatusMu.Lock() 431 raftStatus = n.Status 432 raftStatusMu.Unlock() 433 434 return id, n, s, w 435} 436 437func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { 438 var walsnap walpb.Snapshot 439 if snapshot != nil { 440 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term 441 } 442 w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) 443 444 plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit) 445 cl := membership.NewCluster("") 446 cl.SetID(cid) 447 s := raft.NewMemoryStorage() 448 if snapshot != nil { 449 s.ApplySnapshot(*snapshot) 450 } 451 s.SetHardState(st) 452 s.Append(ents) 453 c := &raft.Config{ 454 ID: uint64(id), 455 ElectionTick: cfg.ElectionTicks, 456 HeartbeatTick: 1, 457 Storage: s, 458 MaxSizePerMsg: maxSizePerMsg, 459 MaxInflightMsgs: maxInflightMsgs, 460 CheckQuorum: true, 461 } 462 463 n := raft.RestartNode(c) 464 raftStatusMu.Lock() 465 raftStatus = n.Status 466 raftStatusMu.Unlock() 467 return id, cl, n, s, w 468} 469 470func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) { 471 var walsnap walpb.Snapshot 472 if snapshot != nil { 473 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term 474 } 475 w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) 476 477 // discard the previously uncommitted entries 478 for i, ent := range ents { 479 if ent.Index > st.Commit { 480 plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i) 481 ents = ents[:i] 482 break 483 } 484 } 485 486 // force append the configuration change entries 487 toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit) 488 ents = append(ents, toAppEnts...) 489 490 // force commit newly appended entries 491 err := w.Save(raftpb.HardState{}, toAppEnts) 492 if err != nil { 493 plog.Fatalf("%v", err) 494 } 495 if len(ents) != 0 { 496 st.Commit = ents[len(ents)-1].Index 497 } 498 499 plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit) 500 cl := membership.NewCluster("") 501 cl.SetID(cid) 502 s := raft.NewMemoryStorage() 503 if snapshot != nil { 504 s.ApplySnapshot(*snapshot) 505 } 506 s.SetHardState(st) 507 s.Append(ents) 508 c := &raft.Config{ 509 ID: uint64(id), 510 ElectionTick: cfg.ElectionTicks, 511 HeartbeatTick: 1, 512 Storage: s, 513 MaxSizePerMsg: maxSizePerMsg, 514 MaxInflightMsgs: maxInflightMsgs, 515 CheckQuorum: true, 516 } 517 n := raft.RestartNode(c) 518 raftStatus = n.Status 519 return id, cl, n, s, w 520} 521 522// getIDs returns an ordered set of IDs included in the given snapshot and 523// the entries. The given snapshot/entries can contain two kinds of 524// ID-related entry: 525// - ConfChangeAddNode, in which case the contained ID will be added into the set. 526// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set. 527func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { 528 ids := make(map[uint64]bool) 529 if snap != nil { 530 for _, id := range snap.Metadata.ConfState.Nodes { 531 ids[id] = true 532 } 533 } 534 for _, e := range ents { 535 if e.Type != raftpb.EntryConfChange { 536 continue 537 } 538 var cc raftpb.ConfChange 539 pbutil.MustUnmarshal(&cc, e.Data) 540 switch cc.Type { 541 case raftpb.ConfChangeAddNode: 542 ids[cc.NodeID] = true 543 case raftpb.ConfChangeRemoveNode: 544 delete(ids, cc.NodeID) 545 case raftpb.ConfChangeUpdateNode: 546 // do nothing 547 default: 548 plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!") 549 } 550 } 551 sids := make(types.Uint64Slice, 0, len(ids)) 552 for id := range ids { 553 sids = append(sids, id) 554 } 555 sort.Sort(sids) 556 return []uint64(sids) 557} 558 559// createConfigChangeEnts creates a series of Raft entries (i.e. 560// EntryConfChange) to remove the set of given IDs from the cluster. The ID 561// `self` is _not_ removed, even if present in the set. 562// If `self` is not inside the given ids, it creates a Raft entry to add a 563// default member with the given `self`. 564func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry { 565 ents := make([]raftpb.Entry, 0) 566 next := index + 1 567 found := false 568 for _, id := range ids { 569 if id == self { 570 found = true 571 continue 572 } 573 cc := &raftpb.ConfChange{ 574 Type: raftpb.ConfChangeRemoveNode, 575 NodeID: id, 576 } 577 e := raftpb.Entry{ 578 Type: raftpb.EntryConfChange, 579 Data: pbutil.MustMarshal(cc), 580 Term: term, 581 Index: next, 582 } 583 ents = append(ents, e) 584 next++ 585 } 586 if !found { 587 m := membership.Member{ 588 ID: types.ID(self), 589 RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}}, 590 } 591 ctx, err := json.Marshal(m) 592 if err != nil { 593 plog.Panicf("marshal member should never fail: %v", err) 594 } 595 cc := &raftpb.ConfChange{ 596 Type: raftpb.ConfChangeAddNode, 597 NodeID: self, 598 Context: ctx, 599 } 600 e := raftpb.Entry{ 601 Type: raftpb.EntryConfChange, 602 Data: pbutil.MustMarshal(cc), 603 Term: term, 604 Index: next, 605 } 606 ents = append(ents, e) 607 } 608 return ents 609} 610