1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"encoding/json"
19	"expvar"
20	"sort"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
26	"github.com/coreos/etcd/etcdserver/membership"
27	"github.com/coreos/etcd/pkg/contention"
28	"github.com/coreos/etcd/pkg/pbutil"
29	"github.com/coreos/etcd/pkg/types"
30	"github.com/coreos/etcd/raft"
31	"github.com/coreos/etcd/raft/raftpb"
32	"github.com/coreos/etcd/rafthttp"
33	"github.com/coreos/etcd/wal"
34	"github.com/coreos/etcd/wal/walpb"
35	"github.com/coreos/pkg/capnslog"
36)
37
38const (
39	// Number of entries for slow follower to catch-up after compacting
40	// the raft storage entries.
41	// We expect the follower has a millisecond level latency with the leader.
42	// The max throughput is around 10K. Keep a 5K entries is enough for helping
43	// follower to catch up.
44	numberOfCatchUpEntries = 5000
45
46	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
47	// Assuming the RTT is around 10ms, 1MB max size is large enough.
48	maxSizePerMsg = 1 * 1024 * 1024
49	// Never overflow the rafthttp buffer, which is 4096.
50	// TODO: a better const?
51	maxInflightMsgs = 4096 / 8
52)
53
54var (
55	// protects raftStatus
56	raftStatusMu sync.Mutex
57	// indirection for expvar func interface
58	// expvar panics when publishing duplicate name
59	// expvar does not support remove a registered name
60	// so only register a func that calls raftStatus
61	// and change raftStatus as we need.
62	raftStatus func() raft.Status
63)
64
65func init() {
66	raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
67	expvar.Publish("raft.status", expvar.Func(func() interface{} {
68		raftStatusMu.Lock()
69		defer raftStatusMu.Unlock()
70		return raftStatus()
71	}))
72}
73
74type RaftTimer interface {
75	Index() uint64
76	Term() uint64
77}
78
79// apply contains entries, snapshot to be applied. Once
80// an apply is consumed, the entries will be persisted to
81// to raft storage concurrently; the application must read
82// raftDone before assuming the raft messages are stable.
83type apply struct {
84	entries  []raftpb.Entry
85	snapshot raftpb.Snapshot
86	// notifyc synchronizes etcd server applies with the raft node
87	notifyc chan struct{}
88}
89
90type raftNode struct {
91	// Cache of the latest raft index and raft term the server has seen.
92	// These three unit64 fields must be the first elements to keep 64-bit
93	// alignment for atomic access to the fields.
94	index uint64
95	term  uint64
96	lead  uint64
97
98	tickMu *sync.Mutex
99	raftNodeConfig
100
101	// a chan to send/receive snapshot
102	msgSnapC chan raftpb.Message
103
104	// a chan to send out apply
105	applyc chan apply
106
107	// a chan to send out readState
108	readStateC chan raft.ReadState
109
110	// utility
111	ticker *time.Ticker
112	// contention detectors for raft heartbeat message
113	td *contention.TimeoutDetector
114
115	stopped chan struct{}
116	done    chan struct{}
117}
118
119type raftNodeConfig struct {
120	// to check if msg receiver is removed from cluster
121	isIDRemoved func(id uint64) bool
122	raft.Node
123	raftStorage *raft.MemoryStorage
124	storage     Storage
125	heartbeat   time.Duration // for logging
126	// transport specifies the transport to send and receive msgs to members.
127	// Sending messages MUST NOT block. It is okay to drop messages, since
128	// clients should timeout and reissue their messages.
129	// If transport is nil, server will panic.
130	transport rafthttp.Transporter
131}
132
133func newRaftNode(cfg raftNodeConfig) *raftNode {
134	r := &raftNode{
135		tickMu:         new(sync.Mutex),
136		raftNodeConfig: cfg,
137		// set up contention detectors for raft heartbeat message.
138		// expect to send a heartbeat within 2 heartbeat intervals.
139		td:         contention.NewTimeoutDetector(2 * cfg.heartbeat),
140		readStateC: make(chan raft.ReadState, 1),
141		msgSnapC:   make(chan raftpb.Message, maxInFlightMsgSnap),
142		applyc:     make(chan apply),
143		stopped:    make(chan struct{}),
144		done:       make(chan struct{}),
145	}
146	if r.heartbeat == 0 {
147		r.ticker = &time.Ticker{}
148	} else {
149		r.ticker = time.NewTicker(r.heartbeat)
150	}
151	return r
152}
153
154// raft.Node does not have locks in Raft package
155func (r *raftNode) tick() {
156	r.tickMu.Lock()
157	r.Tick()
158	r.tickMu.Unlock()
159}
160
161// start prepares and starts raftNode in a new goroutine. It is no longer safe
162// to modify the fields after it has been started.
163func (r *raftNode) start(rh *raftReadyHandler) {
164	internalTimeout := time.Second
165
166	go func() {
167		defer r.onStop()
168		islead := false
169
170		for {
171			select {
172			case <-r.ticker.C:
173				r.tick()
174			case rd := <-r.Ready():
175				if rd.SoftState != nil {
176					newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
177					if newLeader {
178						leaderChanges.Inc()
179					}
180
181					if rd.SoftState.Lead == raft.None {
182						hasLeader.Set(0)
183					} else {
184						hasLeader.Set(1)
185					}
186
187					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
188					islead = rd.RaftState == raft.StateLeader
189					if islead {
190						isLeader.Set(1)
191					} else {
192						isLeader.Set(0)
193					}
194					rh.updateLeadership(newLeader)
195					r.td.Reset()
196				}
197
198				if len(rd.ReadStates) != 0 {
199					select {
200					case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
201					case <-time.After(internalTimeout):
202						plog.Warningf("timed out sending read state")
203					case <-r.stopped:
204						return
205					}
206				}
207
208				notifyc := make(chan struct{}, 1)
209				ap := apply{
210					entries:  rd.CommittedEntries,
211					snapshot: rd.Snapshot,
212					notifyc:  notifyc,
213				}
214
215				updateCommittedIndex(&ap, rh)
216
217				select {
218				case r.applyc <- ap:
219				case <-r.stopped:
220					return
221				}
222
223				// the leader can write to its disk in parallel with replicating to the followers and them
224				// writing to their disks.
225				// For more details, check raft thesis 10.2.1
226				if islead {
227					// gofail: var raftBeforeLeaderSend struct{}
228					r.transport.Send(r.processMessages(rd.Messages))
229				}
230
231				// gofail: var raftBeforeSave struct{}
232				if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
233					plog.Fatalf("raft save state and entries error: %v", err)
234				}
235				if !raft.IsEmptyHardState(rd.HardState) {
236					proposalsCommitted.Set(float64(rd.HardState.Commit))
237				}
238				// gofail: var raftAfterSave struct{}
239
240				if !raft.IsEmptySnap(rd.Snapshot) {
241					// gofail: var raftBeforeSaveSnap struct{}
242					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
243						plog.Fatalf("raft save snapshot error: %v", err)
244					}
245					// etcdserver now claim the snapshot has been persisted onto the disk
246					notifyc <- struct{}{}
247
248					// gofail: var raftAfterSaveSnap struct{}
249					r.raftStorage.ApplySnapshot(rd.Snapshot)
250					plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
251					// gofail: var raftAfterApplySnap struct{}
252				}
253
254				r.raftStorage.Append(rd.Entries)
255
256				if !islead {
257					// finish processing incoming messages before we signal raftdone chan
258					msgs := r.processMessages(rd.Messages)
259
260					// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
261					notifyc <- struct{}{}
262
263					// Candidate or follower needs to wait for all pending configuration
264					// changes to be applied before sending messages.
265					// Otherwise we might incorrectly count votes (e.g. votes from removed members).
266					// Also slow machine's follower raft-layer could proceed to become the leader
267					// on its own single-node cluster, before apply-layer applies the config change.
268					// We simply wait for ALL pending entries to be applied for now.
269					// We might improve this later on if it causes unnecessary long blocking issues.
270					waitApply := false
271					for _, ent := range rd.CommittedEntries {
272						if ent.Type == raftpb.EntryConfChange {
273							waitApply = true
274							break
275						}
276					}
277					if waitApply {
278						// blocks until 'applyAll' calls 'applyWait.Trigger'
279						// to be in sync with scheduled config-change job
280						// (assume notifyc has cap of 1)
281						select {
282						case notifyc <- struct{}{}:
283						case <-r.stopped:
284							return
285						}
286					}
287
288					// gofail: var raftBeforeFollowerSend struct{}
289					r.transport.Send(msgs)
290				} else {
291					// leader already processed 'MsgSnap' and signaled
292					notifyc <- struct{}{}
293				}
294
295				r.Advance()
296			case <-r.stopped:
297				return
298			}
299		}
300	}()
301}
302
303func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
304	var ci uint64
305	if len(ap.entries) != 0 {
306		ci = ap.entries[len(ap.entries)-1].Index
307	}
308	if ap.snapshot.Metadata.Index > ci {
309		ci = ap.snapshot.Metadata.Index
310	}
311	if ci != 0 {
312		rh.updateCommittedIndex(ci)
313	}
314}
315
316func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
317	sentAppResp := false
318	for i := len(ms) - 1; i >= 0; i-- {
319		if r.isIDRemoved(ms[i].To) {
320			ms[i].To = 0
321		}
322
323		if ms[i].Type == raftpb.MsgAppResp {
324			if sentAppResp {
325				ms[i].To = 0
326			} else {
327				sentAppResp = true
328			}
329		}
330
331		if ms[i].Type == raftpb.MsgSnap {
332			// There are two separate data store: the store for v2, and the KV for v3.
333			// The msgSnap only contains the most recent snapshot of store without KV.
334			// So we need to redirect the msgSnap to etcd server main loop for merging in the
335			// current store snapshot and KV snapshot.
336			select {
337			case r.msgSnapC <- ms[i]:
338			default:
339				// drop msgSnap if the inflight chan if full.
340			}
341			ms[i].To = 0
342		}
343		if ms[i].Type == raftpb.MsgHeartbeat {
344			ok, exceed := r.td.Observe(ms[i].To)
345			if !ok {
346				// TODO: limit request rate.
347				plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
348				plog.Warningf("server is likely overloaded")
349				heartbeatSendFailures.Inc()
350			}
351		}
352	}
353	return ms
354}
355
356func (r *raftNode) apply() chan apply {
357	return r.applyc
358}
359
360func (r *raftNode) stop() {
361	r.stopped <- struct{}{}
362	<-r.done
363}
364
365func (r *raftNode) onStop() {
366	r.Stop()
367	r.ticker.Stop()
368	r.transport.Stop()
369	if err := r.storage.Close(); err != nil {
370		plog.Panicf("raft close storage error: %v", err)
371	}
372	close(r.done)
373}
374
375// for testing
376func (r *raftNode) pauseSending() {
377	p := r.transport.(rafthttp.Pausable)
378	p.Pause()
379}
380
381func (r *raftNode) resumeSending() {
382	p := r.transport.(rafthttp.Pausable)
383	p.Resume()
384}
385
386// advanceTicks advances ticks of Raft node.
387// This can be used for fast-forwarding election
388// ticks in multi data-center deployments, thus
389// speeding up election process.
390func (r *raftNode) advanceTicks(ticks int) {
391	for i := 0; i < ticks; i++ {
392		r.tick()
393	}
394}
395
396func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
397	var err error
398	member := cl.MemberByName(cfg.Name)
399	metadata := pbutil.MustMarshal(
400		&pb.Metadata{
401			NodeID:    uint64(member.ID),
402			ClusterID: uint64(cl.ID()),
403		},
404	)
405	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
406		plog.Fatalf("create wal error: %v", err)
407	}
408	peers := make([]raft.Peer, len(ids))
409	for i, id := range ids {
410		ctx, err := json.Marshal((*cl).Member(id))
411		if err != nil {
412			plog.Panicf("marshal member should never fail: %v", err)
413		}
414		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
415	}
416	id = member.ID
417	plog.Infof("starting member %s in cluster %s", id, cl.ID())
418	s = raft.NewMemoryStorage()
419	c := &raft.Config{
420		ID:              uint64(id),
421		ElectionTick:    cfg.ElectionTicks,
422		HeartbeatTick:   1,
423		Storage:         s,
424		MaxSizePerMsg:   maxSizePerMsg,
425		MaxInflightMsgs: maxInflightMsgs,
426		CheckQuorum:     true,
427	}
428
429	n = raft.StartNode(c, peers)
430	raftStatusMu.Lock()
431	raftStatus = n.Status
432	raftStatusMu.Unlock()
433
434	return id, n, s, w
435}
436
437func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
438	var walsnap walpb.Snapshot
439	if snapshot != nil {
440		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
441	}
442	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
443
444	plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
445	cl := membership.NewCluster("")
446	cl.SetID(cid)
447	s := raft.NewMemoryStorage()
448	if snapshot != nil {
449		s.ApplySnapshot(*snapshot)
450	}
451	s.SetHardState(st)
452	s.Append(ents)
453	c := &raft.Config{
454		ID:              uint64(id),
455		ElectionTick:    cfg.ElectionTicks,
456		HeartbeatTick:   1,
457		Storage:         s,
458		MaxSizePerMsg:   maxSizePerMsg,
459		MaxInflightMsgs: maxInflightMsgs,
460		CheckQuorum:     true,
461	}
462
463	n := raft.RestartNode(c)
464	raftStatusMu.Lock()
465	raftStatus = n.Status
466	raftStatusMu.Unlock()
467	return id, cl, n, s, w
468}
469
470func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
471	var walsnap walpb.Snapshot
472	if snapshot != nil {
473		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
474	}
475	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
476
477	// discard the previously uncommitted entries
478	for i, ent := range ents {
479		if ent.Index > st.Commit {
480			plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
481			ents = ents[:i]
482			break
483		}
484	}
485
486	// force append the configuration change entries
487	toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
488	ents = append(ents, toAppEnts...)
489
490	// force commit newly appended entries
491	err := w.Save(raftpb.HardState{}, toAppEnts)
492	if err != nil {
493		plog.Fatalf("%v", err)
494	}
495	if len(ents) != 0 {
496		st.Commit = ents[len(ents)-1].Index
497	}
498
499	plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
500	cl := membership.NewCluster("")
501	cl.SetID(cid)
502	s := raft.NewMemoryStorage()
503	if snapshot != nil {
504		s.ApplySnapshot(*snapshot)
505	}
506	s.SetHardState(st)
507	s.Append(ents)
508	c := &raft.Config{
509		ID:              uint64(id),
510		ElectionTick:    cfg.ElectionTicks,
511		HeartbeatTick:   1,
512		Storage:         s,
513		MaxSizePerMsg:   maxSizePerMsg,
514		MaxInflightMsgs: maxInflightMsgs,
515		CheckQuorum:     true,
516	}
517	n := raft.RestartNode(c)
518	raftStatus = n.Status
519	return id, cl, n, s, w
520}
521
522// getIDs returns an ordered set of IDs included in the given snapshot and
523// the entries. The given snapshot/entries can contain two kinds of
524// ID-related entry:
525// - ConfChangeAddNode, in which case the contained ID will be added into the set.
526// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
527func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
528	ids := make(map[uint64]bool)
529	if snap != nil {
530		for _, id := range snap.Metadata.ConfState.Nodes {
531			ids[id] = true
532		}
533	}
534	for _, e := range ents {
535		if e.Type != raftpb.EntryConfChange {
536			continue
537		}
538		var cc raftpb.ConfChange
539		pbutil.MustUnmarshal(&cc, e.Data)
540		switch cc.Type {
541		case raftpb.ConfChangeAddNode:
542			ids[cc.NodeID] = true
543		case raftpb.ConfChangeRemoveNode:
544			delete(ids, cc.NodeID)
545		case raftpb.ConfChangeUpdateNode:
546			// do nothing
547		default:
548			plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
549		}
550	}
551	sids := make(types.Uint64Slice, 0, len(ids))
552	for id := range ids {
553		sids = append(sids, id)
554	}
555	sort.Sort(sids)
556	return []uint64(sids)
557}
558
559// createConfigChangeEnts creates a series of Raft entries (i.e.
560// EntryConfChange) to remove the set of given IDs from the cluster. The ID
561// `self` is _not_ removed, even if present in the set.
562// If `self` is not inside the given ids, it creates a Raft entry to add a
563// default member with the given `self`.
564func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
565	ents := make([]raftpb.Entry, 0)
566	next := index + 1
567	found := false
568	for _, id := range ids {
569		if id == self {
570			found = true
571			continue
572		}
573		cc := &raftpb.ConfChange{
574			Type:   raftpb.ConfChangeRemoveNode,
575			NodeID: id,
576		}
577		e := raftpb.Entry{
578			Type:  raftpb.EntryConfChange,
579			Data:  pbutil.MustMarshal(cc),
580			Term:  term,
581			Index: next,
582		}
583		ents = append(ents, e)
584		next++
585	}
586	if !found {
587		m := membership.Member{
588			ID:             types.ID(self),
589			RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
590		}
591		ctx, err := json.Marshal(m)
592		if err != nil {
593			plog.Panicf("marshal member should never fail: %v", err)
594		}
595		cc := &raftpb.ConfChange{
596			Type:    raftpb.ConfChangeAddNode,
597			NodeID:  self,
598			Context: ctx,
599		}
600		e := raftpb.Entry{
601			Type:  raftpb.EntryConfChange,
602			Data:  pbutil.MustMarshal(cc),
603			Term:  term,
604			Index: next,
605		}
606		ents = append(ents, e)
607	}
608	return ents
609}
610