1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"encoding/json"
19	"expvar"
20	"fmt"
21	"math"
22	"math/rand"
23	"net/http"
24	"os"
25	"path"
26	"regexp"
27	"sync"
28	"sync/atomic"
29	"time"
30
31	"github.com/coreos/etcd/alarm"
32	"github.com/coreos/etcd/auth"
33	"github.com/coreos/etcd/compactor"
34	"github.com/coreos/etcd/discovery"
35	"github.com/coreos/etcd/etcdserver/api"
36	"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
37	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
38	"github.com/coreos/etcd/etcdserver/membership"
39	"github.com/coreos/etcd/etcdserver/stats"
40	"github.com/coreos/etcd/lease"
41	"github.com/coreos/etcd/mvcc"
42	"github.com/coreos/etcd/mvcc/backend"
43	"github.com/coreos/etcd/pkg/fileutil"
44	"github.com/coreos/etcd/pkg/idutil"
45	"github.com/coreos/etcd/pkg/pbutil"
46	"github.com/coreos/etcd/pkg/runtime"
47	"github.com/coreos/etcd/pkg/schedule"
48	"github.com/coreos/etcd/pkg/types"
49	"github.com/coreos/etcd/pkg/wait"
50	"github.com/coreos/etcd/raft"
51	"github.com/coreos/etcd/raft/raftpb"
52	"github.com/coreos/etcd/rafthttp"
53	"github.com/coreos/etcd/snap"
54	"github.com/coreos/etcd/store"
55	"github.com/coreos/etcd/version"
56	"github.com/coreos/etcd/wal"
57
58	"github.com/coreos/go-semver/semver"
59	"github.com/coreos/pkg/capnslog"
60	"github.com/prometheus/client_golang/prometheus"
61	"golang.org/x/net/context"
62)
63
64const (
65	DefaultSnapCount = 100000
66
67	StoreClusterPrefix = "/0"
68	StoreKeysPrefix    = "/1"
69
70	// HealthInterval is the minimum time the cluster should be healthy
71	// before accepting add member requests.
72	HealthInterval = 5 * time.Second
73
74	purgeFileInterval = 30 * time.Second
75	// monitorVersionInterval should be smaller than the timeout
76	// on the connection. Or we will not be able to reuse the connection
77	// (since it will timeout).
78	monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
79
80	// max number of in-flight snapshot messages etcdserver allows to have
81	// This number is more than enough for most clusters with 5 machines.
82	maxInFlightMsgSnap = 16
83
84	releaseDelayAfterSnapshot = 30 * time.Second
85
86	// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
87	maxPendingRevokes          = 16
88	recommendedMaxRequestBytes = 10 * 1024 * 1024
89)
90
91var (
92	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
93
94	storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
95)
96
97func init() {
98	rand.Seed(time.Now().UnixNano())
99
100	expvar.Publish(
101		"file_descriptor_limit",
102		expvar.Func(
103			func() interface{} {
104				n, _ := runtime.FDLimit()
105				return n
106			},
107		),
108	)
109}
110
111type Response struct {
112	Event   *store.Event
113	Watcher store.Watcher
114	err     error
115}
116
117type Server interface {
118	// Start performs any initialization of the Server necessary for it to
119	// begin serving requests. It must be called before Do or Process.
120	// Start must be non-blocking; any long-running server functionality
121	// should be implemented in goroutines.
122	Start()
123	// Stop terminates the Server and performs any necessary finalization.
124	// Do and Process cannot be called after Stop has been invoked.
125	Stop()
126	// ID returns the ID of the Server.
127	ID() types.ID
128	// Leader returns the ID of the leader Server.
129	Leader() types.ID
130	// Do takes a request and attempts to fulfill it, returning a Response.
131	Do(ctx context.Context, r pb.Request) (Response, error)
132	// Process takes a raft message and applies it to the server's raft state
133	// machine, respecting any timeout of the given context.
134	Process(ctx context.Context, m raftpb.Message) error
135	// AddMember attempts to add a member into the cluster. It will return
136	// ErrIDRemoved if member ID is removed from the cluster, or return
137	// ErrIDExists if member ID exists in the cluster.
138	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
139	// RemoveMember attempts to remove a member from the cluster. It will
140	// return ErrIDRemoved if member ID is removed from the cluster, or return
141	// ErrIDNotFound if member ID is not in the cluster.
142	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
143
144	// UpdateMember attempts to update an existing member in the cluster. It will
145	// return ErrIDNotFound if the member ID does not exist.
146	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
147
148	// ClusterVersion is the cluster-wide minimum major.minor version.
149	// Cluster version is set to the min version that an etcd member is
150	// compatible with when first bootstrap.
151	//
152	// ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
153	//
154	// During a rolling upgrades, the ClusterVersion will be updated
155	// automatically after a sync. (5 second by default)
156	//
157	// The API/raft component can utilize ClusterVersion to determine if
158	// it can accept a client request or a raft RPC.
159	// NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
160	// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
161	// this feature is introduced post 2.0.
162	ClusterVersion() *semver.Version
163}
164
165// EtcdServer is the production implementation of the Server interface
166type EtcdServer struct {
167	// inflightSnapshots holds count the number of snapshots currently inflight.
168	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
169	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
170	committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
171	// consistIndex used to hold the offset of current executing entry
172	// It is initialized to 0 before executing any entry.
173	consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
174	Cfg          *ServerConfig
175
176	readych chan struct{}
177	r       raftNode
178
179	snapCount uint64
180
181	w wait.Wait
182
183	readMu sync.RWMutex
184	// read routine notifies etcd server that it waits for reading by sending an empty struct to
185	// readwaitC
186	readwaitc chan struct{}
187	// readNotifier is used to notify the read routine that it can process the request
188	// when there is no error
189	readNotifier *notifier
190
191	// stop signals the run goroutine should shutdown.
192	stop chan struct{}
193	// stopping is closed by run goroutine on shutdown.
194	stopping chan struct{}
195	// done is closed when all goroutines from start() complete.
196	done chan struct{}
197
198	errorc     chan error
199	id         types.ID
200	attributes membership.Attributes
201
202	cluster *membership.RaftCluster
203
204	store       store.Store
205	snapshotter *snap.Snapshotter
206
207	applyV2 ApplierV2
208
209	// applyV3 is the applier with auth and quotas
210	applyV3 applierV3
211	// applyV3Base is the core applier without auth or quotas
212	applyV3Base applierV3
213	applyWait   wait.WaitTime
214
215	kv         mvcc.ConsistentWatchableKV
216	lessor     lease.Lessor
217	bemu       sync.Mutex
218	be         backend.Backend
219	authStore  auth.AuthStore
220	alarmStore *alarm.AlarmStore
221
222	stats  *stats.ServerStats
223	lstats *stats.LeaderStats
224
225	SyncTicker *time.Ticker
226	// compactor is used to auto-compact the KV.
227	compactor *compactor.Periodic
228
229	// peerRt used to send requests (version, lease) to peers.
230	peerRt   http.RoundTripper
231	reqIDGen *idutil.Generator
232
233	// forceVersionC is used to force the version monitor loop
234	// to detect the cluster version immediately.
235	forceVersionC chan struct{}
236
237	// wgMu blocks concurrent waitgroup mutation while server stopping
238	wgMu sync.RWMutex
239	// wg is used to wait for the go routines that depends on the server state
240	// to exit when stopping the server.
241	wg sync.WaitGroup
242
243	// ctx is used for etcd-initiated requests that may need to be canceled
244	// on etcd server shutdown.
245	ctx    context.Context
246	cancel context.CancelFunc
247
248	leadTimeMu      sync.RWMutex
249	leadElectedTime time.Time
250}
251
252// NewServer creates a new EtcdServer from the supplied configuration. The
253// configuration is considered static for the lifetime of the EtcdServer.
254func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
255	st := store.New(StoreClusterPrefix, StoreKeysPrefix)
256
257	var (
258		w  *wal.WAL
259		n  raft.Node
260		s  *raft.MemoryStorage
261		id types.ID
262		cl *membership.RaftCluster
263	)
264
265	if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
266		plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
267	}
268
269	if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
270		return nil, fmt.Errorf("cannot access data directory: %v", terr)
271	}
272
273	haveWAL := wal.Exist(cfg.WALDir())
274
275	if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
276		plog.Fatalf("create snapshot directory error: %v", err)
277	}
278	ss := snap.New(cfg.SnapDir())
279
280	bepath := cfg.backendPath()
281	beExist := fileutil.Exist(bepath)
282	be := openBackend(cfg)
283
284	defer func() {
285		if err != nil {
286			be.Close()
287		}
288	}()
289
290	prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
291	if err != nil {
292		return nil, err
293	}
294	var (
295		remotes  []*membership.Member
296		snapshot *raftpb.Snapshot
297	)
298
299	switch {
300	case !haveWAL && !cfg.NewCluster:
301		if err = cfg.VerifyJoinExisting(); err != nil {
302			return nil, err
303		}
304		cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
305		if err != nil {
306			return nil, err
307		}
308		existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
309		if gerr != nil {
310			return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
311		}
312		if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
313			return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
314		}
315		if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
316			return nil, fmt.Errorf("incompatible with current running cluster")
317		}
318
319		remotes = existingCluster.Members()
320		cl.SetID(existingCluster.ID())
321		cl.SetStore(st)
322		cl.SetBackend(be)
323		cfg.Print()
324		id, n, s, w = startNode(cfg, cl, nil)
325	case !haveWAL && cfg.NewCluster:
326		if err = cfg.VerifyBootstrap(); err != nil {
327			return nil, err
328		}
329		cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
330		if err != nil {
331			return nil, err
332		}
333		m := cl.MemberByName(cfg.Name)
334		if isMemberBootstrapped(cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
335			return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
336		}
337		if cfg.ShouldDiscover() {
338			var str string
339			str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
340			if err != nil {
341				return nil, &DiscoveryError{Op: "join", Err: err}
342			}
343			var urlsmap types.URLsMap
344			urlsmap, err = types.NewURLsMap(str)
345			if err != nil {
346				return nil, err
347			}
348			if checkDuplicateURL(urlsmap) {
349				return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
350			}
351			if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
352				return nil, err
353			}
354		}
355		cl.SetStore(st)
356		cl.SetBackend(be)
357		cfg.PrintWithInitial()
358		id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
359	case haveWAL:
360		if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
361			return nil, fmt.Errorf("cannot write to member directory: %v", err)
362		}
363
364		if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
365			return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
366		}
367
368		if cfg.ShouldDiscover() {
369			plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
370		}
371		snapshot, err = ss.Load()
372		if err != nil && err != snap.ErrNoSnapshot {
373			return nil, err
374		}
375		if snapshot != nil {
376			if err = st.Recovery(snapshot.Data); err != nil {
377				plog.Panicf("recovered store from snapshot error: %v", err)
378			}
379			plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
380			if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
381				plog.Panicf("recovering backend from snapshot error: %v", err)
382			}
383		}
384		cfg.Print()
385		if !cfg.ForceNewCluster {
386			id, cl, n, s, w = restartNode(cfg, snapshot)
387		} else {
388			id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
389		}
390		cl.SetStore(st)
391		cl.SetBackend(be)
392		cl.Recover(api.UpdateCapability)
393		if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
394			os.RemoveAll(bepath)
395			return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
396		}
397	default:
398		return nil, fmt.Errorf("unsupported bootstrap config")
399	}
400
401	if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
402		return nil, fmt.Errorf("cannot access member directory: %v", terr)
403	}
404
405	sstats := stats.NewServerStats(cfg.Name, id.String())
406	lstats := stats.NewLeaderStats(id.String())
407
408	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
409	srv = &EtcdServer{
410		readych:     make(chan struct{}),
411		Cfg:         cfg,
412		snapCount:   cfg.SnapCount,
413		errorc:      make(chan error, 1),
414		store:       st,
415		snapshotter: ss,
416		r: *newRaftNode(
417			raftNodeConfig{
418				isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
419				Node:        n,
420				heartbeat:   heartbeat,
421				raftStorage: s,
422				storage:     NewStorage(w, ss),
423			},
424		),
425		id:            id,
426		attributes:    membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
427		cluster:       cl,
428		stats:         sstats,
429		lstats:        lstats,
430		SyncTicker:    time.NewTicker(500 * time.Millisecond),
431		peerRt:        prt,
432		reqIDGen:      idutil.NewGenerator(uint16(id), time.Now()),
433		forceVersionC: make(chan struct{}),
434	}
435	serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
436
437	srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
438
439	srv.be = be
440	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
441
442	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
443	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
444	srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
445	srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
446	if beExist {
447		kvindex := srv.kv.ConsistentIndex()
448		// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
449		// etcd from pre-3.0 release.
450		if snapshot != nil && kvindex < snapshot.Metadata.Index {
451			if kvindex != 0 {
452				return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
453			}
454			plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
455		}
456	}
457	newSrv := srv // since srv == nil in defer if srv is returned as nil
458	defer func() {
459		// closing backend without first closing kv can cause
460		// resumed compactions to fail with closed tx errors
461		if err != nil {
462			newSrv.kv.Close()
463		}
464	}()
465
466	srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
467	tp, err := auth.NewTokenProvider(cfg.AuthToken,
468		func(index uint64) <-chan struct{} {
469			return srv.applyWait.Wait(index)
470		},
471	)
472	if err != nil {
473		plog.Errorf("failed to create token provider: %s", err)
474		return nil, err
475	}
476	srv.authStore = auth.NewAuthStore(srv.be, tp)
477	if h := cfg.AutoCompactionRetention; h != 0 {
478		srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
479		srv.compactor.Run()
480	}
481
482	srv.applyV3Base = &applierV3backend{srv}
483	if err = srv.restoreAlarms(); err != nil {
484		return nil, err
485	}
486
487	// TODO: move transport initialization near the definition of remote
488	tr := &rafthttp.Transport{
489		TLSInfo:     cfg.PeerTLSInfo,
490		DialTimeout: cfg.peerDialTimeout(),
491		ID:          id,
492		URLs:        cfg.PeerURLs,
493		ClusterID:   cl.ID(),
494		Raft:        srv,
495		Snapshotter: ss,
496		ServerStats: sstats,
497		LeaderStats: lstats,
498		ErrorC:      srv.errorc,
499	}
500	if err = tr.Start(); err != nil {
501		return nil, err
502	}
503	// add all remotes into transport
504	for _, m := range remotes {
505		if m.ID != id {
506			tr.AddRemote(m.ID, m.PeerURLs)
507		}
508	}
509	for _, m := range cl.Members() {
510		if m.ID != id {
511			tr.AddPeer(m.ID, m.PeerURLs)
512		}
513	}
514	srv.r.transport = tr
515
516	return srv, nil
517}
518
519func (s *EtcdServer) adjustTicks() {
520	clusterN := len(s.cluster.Members())
521
522	// single-node fresh start, or single-node recovers from snapshot
523	if clusterN == 1 {
524		ticks := s.Cfg.ElectionTicks - 1
525		plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
526		s.r.advanceTicks(ticks)
527		return
528	}
529
530	if !s.Cfg.InitialElectionTickAdvance {
531		plog.Infof("skipping initial election tick advance (election tick %d)", s.Cfg.ElectionTicks)
532		return
533	}
534
535	// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
536	// until peer connection reports; otherwise:
537	// 1. all connections failed, or
538	// 2. no active peers, or
539	// 3. restarted single-node with no snapshot
540	// then, do nothing, because advancing ticks would have no effect
541	waitTime := rafthttp.ConnReadTimeout
542	itv := 50 * time.Millisecond
543	for i := int64(0); i < int64(waitTime/itv); i++ {
544		select {
545		case <-time.After(itv):
546		case <-s.stopping:
547			return
548		}
549
550		peerN := s.r.transport.ActivePeers()
551		if peerN > 1 {
552			// multi-node received peer connection reports
553			// adjust ticks, in case slow leader message receive
554			ticks := s.Cfg.ElectionTicks - 2
555			plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
556			s.r.advanceTicks(ticks)
557			return
558		}
559	}
560}
561
562// Start performs any initialization of the Server necessary for it to
563// begin serving requests. It must be called before Do or Process.
564// Start must be non-blocking; any long-running server functionality
565// should be implemented in goroutines.
566func (s *EtcdServer) Start() {
567	s.start()
568	s.goAttach(func() { s.adjustTicks() })
569	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
570	s.goAttach(s.purgeFile)
571	s.goAttach(func() { monitorFileDescriptor(s.stopping) })
572	s.goAttach(s.monitorVersions)
573	s.goAttach(s.linearizableReadLoop)
574}
575
576// start prepares and starts server in a new goroutine. It is no longer safe to
577// modify a server's fields after it has been sent to Start.
578// This function is just used for testing.
579func (s *EtcdServer) start() {
580	if s.snapCount == 0 {
581		plog.Infof("set snapshot count to default %d", DefaultSnapCount)
582		s.snapCount = DefaultSnapCount
583	}
584	s.w = wait.New()
585	s.applyWait = wait.NewTimeList()
586	s.done = make(chan struct{})
587	s.stop = make(chan struct{})
588	s.stopping = make(chan struct{})
589	s.ctx, s.cancel = context.WithCancel(context.Background())
590	s.readwaitc = make(chan struct{}, 1)
591	s.readNotifier = newNotifier()
592	if s.ClusterVersion() != nil {
593		plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
594		membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
595	} else {
596		plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
597	}
598	// TODO: if this is an empty log, writes all peer infos
599	// into the first entry
600	go s.run()
601}
602
603func (s *EtcdServer) purgeFile() {
604	var dberrc, serrc, werrc <-chan error
605	var dbdonec, sdonec, wdonec <-chan struct{}
606	if s.Cfg.MaxSnapFiles > 0 {
607		dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
608		sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
609	}
610	if s.Cfg.MaxWALFiles > 0 {
611		wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
612	}
613	select {
614	case e := <-dberrc:
615		plog.Fatalf("failed to purge snap db file %v", e)
616	case e := <-serrc:
617		plog.Fatalf("failed to purge snap file %v", e)
618	case e := <-werrc:
619		plog.Fatalf("failed to purge wal file %v", e)
620	case <-s.stopping:
621		if dbdonec != nil {
622			<-dbdonec
623		}
624		if sdonec != nil {
625			<-sdonec
626		}
627		if wdonec != nil {
628			<-wdonec
629		}
630		return
631	}
632}
633
634func (s *EtcdServer) ID() types.ID { return s.id }
635
636func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster }
637
638func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
639
640func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
641
642func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
643
644func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
645	if s.cluster.IsIDRemoved(types.ID(m.From)) {
646		plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
647		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
648	}
649	if m.Type == raftpb.MsgApp {
650		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
651	}
652	return s.r.Step(ctx, m)
653}
654
655func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
656
657func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
658
659// ReportSnapshot reports snapshot sent status to the raft state machine,
660// and clears the used snapshot from the snapshot store.
661func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
662	s.r.ReportSnapshot(id, status)
663}
664
665type etcdProgress struct {
666	confState raftpb.ConfState
667	snapi     uint64
668	appliedt  uint64
669	appliedi  uint64
670}
671
672// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
673// and helps decouple state machine logic from Raft algorithms.
674// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
675type raftReadyHandler struct {
676	updateLeadership     func(newLeader bool)
677	updateCommittedIndex func(uint64)
678}
679
680func (s *EtcdServer) run() {
681	sn, err := s.r.raftStorage.Snapshot()
682	if err != nil {
683		plog.Panicf("get snapshot from raft storage error: %v", err)
684	}
685
686	// asynchronously accept apply packets, dispatch progress in-order
687	sched := schedule.NewFIFOScheduler()
688
689	var (
690		smu   sync.RWMutex
691		syncC <-chan time.Time
692	)
693	setSyncC := func(ch <-chan time.Time) {
694		smu.Lock()
695		syncC = ch
696		smu.Unlock()
697	}
698	getSyncC := func() (ch <-chan time.Time) {
699		smu.RLock()
700		ch = syncC
701		smu.RUnlock()
702		return
703	}
704	rh := &raftReadyHandler{
705		updateLeadership: func(newLeader bool) {
706			if !s.isLeader() {
707				if s.lessor != nil {
708					s.lessor.Demote()
709				}
710				if s.compactor != nil {
711					s.compactor.Pause()
712				}
713				setSyncC(nil)
714			} else {
715				if newLeader {
716					t := time.Now()
717					s.leadTimeMu.Lock()
718					s.leadElectedTime = t
719					s.leadTimeMu.Unlock()
720				}
721				setSyncC(s.SyncTicker.C)
722				if s.compactor != nil {
723					s.compactor.Resume()
724				}
725			}
726
727			// TODO: remove the nil checking
728			// current test utility does not provide the stats
729			if s.stats != nil {
730				s.stats.BecomeLeader()
731			}
732		},
733		updateCommittedIndex: func(ci uint64) {
734			cci := s.getCommittedIndex()
735			if ci > cci {
736				s.setCommittedIndex(ci)
737			}
738		},
739	}
740	s.r.start(rh)
741
742	ep := etcdProgress{
743		confState: sn.Metadata.ConfState,
744		snapi:     sn.Metadata.Index,
745		appliedt:  sn.Metadata.Term,
746		appliedi:  sn.Metadata.Index,
747	}
748
749	defer func() {
750		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
751		close(s.stopping)
752		s.wgMu.Unlock()
753		s.cancel()
754
755		sched.Stop()
756
757		// wait for gouroutines before closing raft so wal stays open
758		s.wg.Wait()
759
760		s.SyncTicker.Stop()
761
762		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
763		// by adding a peer after raft stops the transport
764		s.r.stop()
765
766		// kv, lessor and backend can be nil if running without v3 enabled
767		// or running unit tests.
768		if s.lessor != nil {
769			s.lessor.Stop()
770		}
771		if s.kv != nil {
772			s.kv.Close()
773		}
774		if s.authStore != nil {
775			s.authStore.Close()
776		}
777		if s.be != nil {
778			s.be.Close()
779		}
780		if s.compactor != nil {
781			s.compactor.Stop()
782		}
783		close(s.done)
784	}()
785
786	var expiredLeaseC <-chan []*lease.Lease
787	if s.lessor != nil {
788		expiredLeaseC = s.lessor.ExpiredLeasesC()
789	}
790
791	for {
792		select {
793		case ap := <-s.r.apply():
794			f := func(context.Context) { s.applyAll(&ep, &ap) }
795			sched.Schedule(f)
796		case leases := <-expiredLeaseC:
797			s.goAttach(func() {
798				// Increases throughput of expired leases deletion process through parallelization
799				c := make(chan struct{}, maxPendingRevokes)
800				for _, lease := range leases {
801					select {
802					case c <- struct{}{}:
803					case <-s.stopping:
804						return
805					}
806					lid := lease.ID
807					s.goAttach(func() {
808						_, lerr := s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
809						if lerr == nil {
810							leaseExpired.Inc()
811						} else {
812							plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
813						}
814
815						<-c
816					})
817				}
818			})
819		case err := <-s.errorc:
820			plog.Errorf("%s", err)
821			plog.Infof("the data-dir used by this member must be removed.")
822			return
823		case <-getSyncC():
824			if s.store.HasTTLKeys() {
825				s.sync(s.Cfg.ReqTimeout())
826			}
827		case <-s.stop:
828			return
829		}
830	}
831}
832
833func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
834	s.applySnapshot(ep, apply)
835	s.applyEntries(ep, apply)
836
837	proposalsApplied.Set(float64(ep.appliedi))
838	s.applyWait.Trigger(ep.appliedi)
839	// wait for the raft routine to finish the disk writes before triggering a
840	// snapshot. or applied index might be greater than the last index in raft
841	// storage, since the raft routine might be slower than apply routine.
842	<-apply.notifyc
843
844	s.triggerSnapshot(ep)
845	select {
846	// snapshot requested via send()
847	case m := <-s.r.msgSnapC:
848		merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
849		s.sendMergedSnap(merged)
850	default:
851	}
852}
853
854func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
855	if raft.IsEmptySnap(apply.snapshot) {
856		return
857	}
858
859	plog.Infof("applying snapshot at index %d...", ep.snapi)
860	defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
861
862	if apply.snapshot.Metadata.Index <= ep.appliedi {
863		plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
864			apply.snapshot.Metadata.Index, ep.appliedi)
865	}
866
867	// wait for raftNode to persist snapshot onto the disk
868	<-apply.notifyc
869
870	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
871	if err != nil {
872		plog.Panic(err)
873	}
874
875	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
876	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
877	if s.lessor != nil {
878		plog.Info("recovering lessor...")
879		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
880		plog.Info("finished recovering lessor")
881	}
882
883	plog.Info("restoring mvcc store...")
884
885	if err := s.kv.Restore(newbe); err != nil {
886		plog.Panicf("restore KV error: %v", err)
887	}
888	s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
889
890	plog.Info("finished restoring mvcc store")
891
892	// Closing old backend might block until all the txns
893	// on the backend are finished.
894	// We do not want to wait on closing the old backend.
895	s.bemu.Lock()
896	oldbe := s.be
897	go func() {
898		plog.Info("closing old backend...")
899		defer plog.Info("finished closing old backend")
900
901		if err := oldbe.Close(); err != nil {
902			plog.Panicf("close backend error: %v", err)
903		}
904	}()
905
906	s.be = newbe
907	s.bemu.Unlock()
908
909	plog.Info("recovering alarms...")
910	if err := s.restoreAlarms(); err != nil {
911		plog.Panicf("restore alarms error: %v", err)
912	}
913	plog.Info("finished recovering alarms")
914
915	if s.authStore != nil {
916		plog.Info("recovering auth store...")
917		s.authStore.Recover(newbe)
918		plog.Info("finished recovering auth store")
919	}
920
921	plog.Info("recovering store v2...")
922	if err := s.store.Recovery(apply.snapshot.Data); err != nil {
923		plog.Panicf("recovery store error: %v", err)
924	}
925	plog.Info("finished recovering store v2")
926
927	s.cluster.SetBackend(s.be)
928	plog.Info("recovering cluster configuration...")
929	s.cluster.Recover(api.UpdateCapability)
930	plog.Info("finished recovering cluster configuration")
931
932	plog.Info("removing old peers from network...")
933	// recover raft transport
934	s.r.transport.RemoveAllPeers()
935	plog.Info("finished removing old peers from network")
936
937	plog.Info("adding peers from new cluster configuration into network...")
938	for _, m := range s.cluster.Members() {
939		if m.ID == s.ID() {
940			continue
941		}
942		s.r.transport.AddPeer(m.ID, m.PeerURLs)
943	}
944	plog.Info("finished adding peers from new cluster configuration into network...")
945
946	ep.appliedt = apply.snapshot.Metadata.Term
947	ep.appliedi = apply.snapshot.Metadata.Index
948	ep.snapi = ep.appliedi
949	ep.confState = apply.snapshot.Metadata.ConfState
950}
951
952func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
953	if len(apply.entries) == 0 {
954		return
955	}
956	firsti := apply.entries[0].Index
957	if firsti > ep.appliedi+1 {
958		plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
959	}
960	var ents []raftpb.Entry
961	if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
962		ents = apply.entries[ep.appliedi+1-firsti:]
963	}
964	if len(ents) == 0 {
965		return
966	}
967	var shouldstop bool
968	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
969		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
970	}
971}
972
973func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
974	if ep.appliedi-ep.snapi <= s.snapCount {
975		return
976	}
977
978	plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
979	s.snapshot(ep.appliedi, ep.confState)
980	ep.snapi = ep.appliedi
981}
982
983func (s *EtcdServer) isMultiNode() bool {
984	return s.cluster != nil && len(s.cluster.MemberIDs()) > 1
985}
986
987func (s *EtcdServer) isLeader() bool {
988	return uint64(s.ID()) == s.Lead()
989}
990
991// transferLeadership transfers the leader to the given transferee.
992// TODO: maybe expose to client?
993func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error {
994	now := time.Now()
995	interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
996
997	plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
998	s.r.TransferLeadership(ctx, lead, transferee)
999	for s.Lead() != transferee {
1000		select {
1001		case <-ctx.Done(): // time out
1002			return ErrTimeoutLeaderTransfer
1003		case <-time.After(interval):
1004		}
1005	}
1006
1007	// TODO: drain all requests, or drop all messages to the old leader
1008
1009	plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
1010	return nil
1011}
1012
1013// TransferLeadership transfers the leader to the chosen transferee.
1014func (s *EtcdServer) TransferLeadership() error {
1015	if !s.isLeader() {
1016		plog.Printf("skipped leadership transfer for stopping non-leader member")
1017		return nil
1018	}
1019
1020	if !s.isMultiNode() {
1021		plog.Printf("skipped leadership transfer for single member cluster")
1022		return nil
1023	}
1024
1025	transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs())
1026	if !ok {
1027		return ErrUnhealthy
1028	}
1029
1030	tm := s.Cfg.ReqTimeout()
1031	ctx, cancel := context.WithTimeout(s.ctx, tm)
1032	err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
1033	cancel()
1034	return err
1035}
1036
1037// HardStop stops the server without coordination with other members in the cluster.
1038func (s *EtcdServer) HardStop() {
1039	select {
1040	case s.stop <- struct{}{}:
1041	case <-s.done:
1042		return
1043	}
1044	<-s.done
1045}
1046
1047// Stop stops the server gracefully, and shuts down the running goroutine.
1048// Stop should be called after a Start(s), otherwise it will block forever.
1049// When stopping leader, Stop transfers its leadership to one of its peers
1050// before stopping the server.
1051func (s *EtcdServer) Stop() {
1052	if err := s.TransferLeadership(); err != nil {
1053		plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)
1054	}
1055	s.HardStop()
1056}
1057
1058// ReadyNotify returns a channel that will be closed when the server
1059// is ready to serve client requests
1060func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1061
1062func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1063	select {
1064	case <-time.After(d):
1065	case <-s.done:
1066	}
1067	select {
1068	case s.errorc <- err:
1069	default:
1070	}
1071}
1072
1073// StopNotify returns a channel that receives a empty struct
1074// when the server is stopped.
1075func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1076
1077func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1078
1079func (s *EtcdServer) LeaderStats() []byte {
1080	lead := atomic.LoadUint64(&s.r.lead)
1081	if lead != uint64(s.id) {
1082		return nil
1083	}
1084	return s.lstats.JSON()
1085}
1086
1087func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
1088
1089func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1090	if s.authStore == nil {
1091		// In the context of ordinary etcd process, s.authStore will never be nil.
1092		// This branch is for handling cases in server_test.go
1093		return nil
1094	}
1095
1096	// Note that this permission check is done in the API layer,
1097	// so TOCTOU problem can be caused potentially in a schedule like this:
1098	// update membership with user A -> revoke root role of A -> apply membership change
1099	// in the state machine layer
1100	// However, both of membership change and role management requires the root privilege.
1101	// So careful operation by admins can prevent the problem.
1102	authInfo, err := s.AuthInfoFromCtx(ctx)
1103	if err != nil {
1104		return err
1105	}
1106
1107	return s.AuthStore().IsAdminPermitted(authInfo)
1108}
1109
1110func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1111	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1112		return nil, err
1113	}
1114
1115	if s.Cfg.StrictReconfigCheck {
1116		// by default StrictReconfigCheck is enabled; reject new members if unhealthy
1117		if !s.cluster.IsReadyToAddNewMember() {
1118			plog.Warningf("not enough started members, rejecting member add %+v", memb)
1119			return nil, ErrNotEnoughStartedMembers
1120		}
1121		if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) {
1122			plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
1123			return nil, ErrUnhealthy
1124		}
1125	}
1126
1127	// TODO: move Member to protobuf type
1128	b, err := json.Marshal(memb)
1129	if err != nil {
1130		return nil, err
1131	}
1132	cc := raftpb.ConfChange{
1133		Type:    raftpb.ConfChangeAddNode,
1134		NodeID:  uint64(memb.ID),
1135		Context: b,
1136	}
1137	return s.configure(ctx, cc)
1138}
1139
1140func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1141	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1142		return nil, err
1143	}
1144
1145	// by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1146	if err := s.mayRemoveMember(types.ID(id)); err != nil {
1147		return nil, err
1148	}
1149
1150	cc := raftpb.ConfChange{
1151		Type:   raftpb.ConfChangeRemoveNode,
1152		NodeID: id,
1153	}
1154	return s.configure(ctx, cc)
1155}
1156
1157func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1158	if !s.Cfg.StrictReconfigCheck {
1159		return nil
1160	}
1161
1162	if !s.cluster.IsReadyToRemoveMember(uint64(id)) {
1163		plog.Warningf("not enough started members, rejecting remove member %s", id)
1164		return ErrNotEnoughStartedMembers
1165	}
1166
1167	// downed member is safe to remove since it's not part of the active quorum
1168	if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1169		return nil
1170	}
1171
1172	// protect quorum if some members are down
1173	m := s.cluster.Members()
1174	active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1175	if (active - 1) < 1+((len(m)-1)/2) {
1176		plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id)
1177		return ErrUnhealthy
1178	}
1179
1180	return nil
1181}
1182
1183func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1184	b, merr := json.Marshal(memb)
1185	if merr != nil {
1186		return nil, merr
1187	}
1188
1189	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1190		return nil, err
1191	}
1192	cc := raftpb.ConfChange{
1193		Type:    raftpb.ConfChangeUpdateNode,
1194		NodeID:  uint64(memb.ID),
1195		Context: b,
1196	}
1197	return s.configure(ctx, cc)
1198}
1199
1200// Implement the RaftTimer interface
1201
1202func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
1203
1204func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
1205
1206// Lead is only for testing purposes.
1207// TODO: add Raft server interface to expose raft related info:
1208// Index, Term, Lead, Committed, Applied, LastIndex, etc.
1209func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
1210
1211func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
1212
1213type confChangeResponse struct {
1214	membs []*membership.Member
1215	err   error
1216}
1217
1218// configure sends a configuration change through consensus and
1219// then waits for it to be applied to the server. It
1220// will block until the change is performed or there is an error.
1221func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1222	cc.ID = s.reqIDGen.Next()
1223	ch := s.w.Register(cc.ID)
1224	start := time.Now()
1225	if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1226		s.w.Trigger(cc.ID, nil)
1227		return nil, err
1228	}
1229	select {
1230	case x := <-ch:
1231		if x == nil {
1232			plog.Panicf("configure trigger value should never be nil")
1233		}
1234		resp := x.(*confChangeResponse)
1235		return resp.membs, resp.err
1236	case <-ctx.Done():
1237		s.w.Trigger(cc.ID, nil) // GC wait
1238		return nil, s.parseProposeCtxErr(ctx.Err(), start)
1239	case <-s.stopping:
1240		return nil, ErrStopped
1241	}
1242}
1243
1244// sync proposes a SYNC request and is non-blocking.
1245// This makes no guarantee that the request will be proposed or performed.
1246// The request will be canceled after the given timeout.
1247func (s *EtcdServer) sync(timeout time.Duration) {
1248	req := pb.Request{
1249		Method: "SYNC",
1250		ID:     s.reqIDGen.Next(),
1251		Time:   time.Now().UnixNano(),
1252	}
1253	data := pbutil.MustMarshal(&req)
1254	// There is no promise that node has leader when do SYNC request,
1255	// so it uses goroutine to propose.
1256	ctx, cancel := context.WithTimeout(s.ctx, timeout)
1257	s.goAttach(func() {
1258		s.r.Propose(ctx, data)
1259		cancel()
1260	})
1261}
1262
1263// publish registers server information into the cluster. The information
1264// is the JSON representation of this server's member struct, updated with the
1265// static clientURLs of the server.
1266// The function keeps attempting to register until it succeeds,
1267// or its server is stopped.
1268func (s *EtcdServer) publish(timeout time.Duration) {
1269	b, err := json.Marshal(s.attributes)
1270	if err != nil {
1271		plog.Panicf("json marshal error: %v", err)
1272		return
1273	}
1274	req := pb.Request{
1275		Method: "PUT",
1276		Path:   membership.MemberAttributesStorePath(s.id),
1277		Val:    string(b),
1278	}
1279
1280	for {
1281		ctx, cancel := context.WithTimeout(s.ctx, timeout)
1282		_, err := s.Do(ctx, req)
1283		cancel()
1284		switch err {
1285		case nil:
1286			close(s.readych)
1287			plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
1288			return
1289		case ErrStopped:
1290			plog.Infof("aborting publish because server is stopped")
1291			return
1292		default:
1293			plog.Errorf("publish error: %v", err)
1294		}
1295	}
1296}
1297
1298func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
1299	atomic.AddInt64(&s.inflightSnapshots, 1)
1300
1301	s.r.transport.SendSnapshot(merged)
1302	s.goAttach(func() {
1303		select {
1304		case ok := <-merged.CloseNotify():
1305			// delay releasing inflight snapshot for another 30 seconds to
1306			// block log compaction.
1307			// If the follower still fails to catch up, it is probably just too slow
1308			// to catch up. We cannot avoid the snapshot cycle anyway.
1309			if ok {
1310				select {
1311				case <-time.After(releaseDelayAfterSnapshot):
1312				case <-s.stopping:
1313				}
1314			}
1315			atomic.AddInt64(&s.inflightSnapshots, -1)
1316		case <-s.stopping:
1317			return
1318		}
1319	})
1320}
1321
1322// apply takes entries received from Raft (after it has been committed) and
1323// applies them to the current state of the EtcdServer.
1324// The given entries should not be empty.
1325func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
1326	for i := range es {
1327		e := es[i]
1328		switch e.Type {
1329		case raftpb.EntryNormal:
1330			s.applyEntryNormal(&e)
1331		case raftpb.EntryConfChange:
1332			// set the consistent index of current executing entry
1333			if e.Index > s.consistIndex.ConsistentIndex() {
1334				s.consistIndex.setConsistentIndex(e.Index)
1335			}
1336			var cc raftpb.ConfChange
1337			pbutil.MustUnmarshal(&cc, e.Data)
1338			removedSelf, err := s.applyConfChange(cc, confState)
1339			s.setAppliedIndex(e.Index)
1340			shouldStop = shouldStop || removedSelf
1341			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
1342		default:
1343			plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
1344		}
1345		atomic.StoreUint64(&s.r.index, e.Index)
1346		atomic.StoreUint64(&s.r.term, e.Term)
1347		appliedt = e.Term
1348		appliedi = e.Index
1349	}
1350	return appliedt, appliedi, shouldStop
1351}
1352
1353// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
1354func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
1355	shouldApplyV3 := false
1356	if e.Index > s.consistIndex.ConsistentIndex() {
1357		// set the consistent index of current executing entry
1358		s.consistIndex.setConsistentIndex(e.Index)
1359		shouldApplyV3 = true
1360	}
1361	defer s.setAppliedIndex(e.Index)
1362
1363	// raft state machine may generate noop entry when leader confirmation.
1364	// skip it in advance to avoid some potential bug in the future
1365	if len(e.Data) == 0 {
1366		select {
1367		case s.forceVersionC <- struct{}{}:
1368		default:
1369		}
1370		// promote lessor when the local member is leader and finished
1371		// applying all entries from the last term.
1372		if s.isLeader() {
1373			s.lessor.Promote(s.Cfg.electionTimeout())
1374		}
1375		return
1376	}
1377
1378	var raftReq pb.InternalRaftRequest
1379	if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
1380		var r pb.Request
1381		pbutil.MustUnmarshal(&r, e.Data)
1382		s.w.Trigger(r.ID, s.applyV2Request(&r))
1383		return
1384	}
1385	if raftReq.V2 != nil {
1386		req := raftReq.V2
1387		s.w.Trigger(req.ID, s.applyV2Request(req))
1388		return
1389	}
1390
1391	// do not re-apply applied entries.
1392	if !shouldApplyV3 {
1393		return
1394	}
1395
1396	id := raftReq.ID
1397	if id == 0 {
1398		id = raftReq.Header.ID
1399	}
1400
1401	var ar *applyResult
1402	needResult := s.w.IsRegistered(id)
1403	if needResult || !noSideEffect(&raftReq) {
1404		if !needResult && raftReq.Txn != nil {
1405			removeNeedlessRangeReqs(raftReq.Txn)
1406		}
1407		ar = s.applyV3.Apply(&raftReq)
1408	}
1409
1410	if ar == nil {
1411		return
1412	}
1413
1414	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
1415		s.w.Trigger(id, ar)
1416		return
1417	}
1418
1419	plog.Errorf("applying raft message exceeded backend quota")
1420	s.goAttach(func() {
1421		a := &pb.AlarmRequest{
1422			MemberID: uint64(s.ID()),
1423			Action:   pb.AlarmRequest_ACTIVATE,
1424			Alarm:    pb.AlarmType_NOSPACE,
1425		}
1426		s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
1427		s.w.Trigger(id, ar)
1428	})
1429}
1430
1431// applyConfChange applies a ConfChange to the server. It is only
1432// invoked with a ConfChange that has already passed through Raft
1433func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
1434	if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
1435		cc.NodeID = raft.None
1436		s.r.ApplyConfChange(cc)
1437		return false, err
1438	}
1439	*confState = *s.r.ApplyConfChange(cc)
1440	switch cc.Type {
1441	case raftpb.ConfChangeAddNode:
1442		m := new(membership.Member)
1443		if err := json.Unmarshal(cc.Context, m); err != nil {
1444			plog.Panicf("unmarshal member should never fail: %v", err)
1445		}
1446		if cc.NodeID != uint64(m.ID) {
1447			plog.Panicf("nodeID should always be equal to member ID")
1448		}
1449		s.cluster.AddMember(m)
1450		if m.ID != s.id {
1451			s.r.transport.AddPeer(m.ID, m.PeerURLs)
1452		}
1453	case raftpb.ConfChangeRemoveNode:
1454		id := types.ID(cc.NodeID)
1455		s.cluster.RemoveMember(id)
1456		if id == s.id {
1457			return true, nil
1458		}
1459		s.r.transport.RemovePeer(id)
1460	case raftpb.ConfChangeUpdateNode:
1461		m := new(membership.Member)
1462		if err := json.Unmarshal(cc.Context, m); err != nil {
1463			plog.Panicf("unmarshal member should never fail: %v", err)
1464		}
1465		if cc.NodeID != uint64(m.ID) {
1466			plog.Panicf("nodeID should always be equal to member ID")
1467		}
1468		s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
1469		if m.ID != s.id {
1470			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
1471		}
1472	}
1473	return false, nil
1474}
1475
1476// TODO: non-blocking snapshot
1477func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
1478	clone := s.store.Clone()
1479	// commit kv to write metadata (for example: consistent index) to disk.
1480	// KV().commit() updates the consistent index in backend.
1481	// All operations that update consistent index must be called sequentially
1482	// from applyAll function.
1483	// So KV().Commit() cannot run in parallel with apply. It has to be called outside
1484	// the go routine created below.
1485	s.KV().Commit()
1486
1487	s.goAttach(func() {
1488		d, err := clone.SaveNoCopy()
1489		// TODO: current store will never fail to do a snapshot
1490		// what should we do if the store might fail?
1491		if err != nil {
1492			plog.Panicf("store save should never fail: %v", err)
1493		}
1494		snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
1495		if err != nil {
1496			// the snapshot was done asynchronously with the progress of raft.
1497			// raft might have already got a newer snapshot.
1498			if err == raft.ErrSnapOutOfDate {
1499				return
1500			}
1501			plog.Panicf("unexpected create snapshot error %v", err)
1502		}
1503		// SaveSnap saves the snapshot and releases the locked wal files
1504		// to the snapshot index.
1505		if err = s.r.storage.SaveSnap(snap); err != nil {
1506			plog.Fatalf("save snapshot error: %v", err)
1507		}
1508		plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
1509
1510		// When sending a snapshot, etcd will pause compaction.
1511		// After receives a snapshot, the slow follower needs to get all the entries right after
1512		// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
1513		// the snapshot sent might already be compacted. It happens when the snapshot takes long time
1514		// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
1515		if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
1516			plog.Infof("skip compaction since there is an inflight snapshot")
1517			return
1518		}
1519
1520		// keep some in memory log entries for slow followers.
1521		compacti := uint64(1)
1522		if snapi > numberOfCatchUpEntries {
1523			compacti = snapi - numberOfCatchUpEntries
1524		}
1525		err = s.r.raftStorage.Compact(compacti)
1526		if err != nil {
1527			// the compaction was done asynchronously with the progress of raft.
1528			// raft log might already been compact.
1529			if err == raft.ErrCompacted {
1530				return
1531			}
1532			plog.Panicf("unexpected compaction error %v", err)
1533		}
1534		plog.Infof("compacted raft log at %d", compacti)
1535	})
1536}
1537
1538// CutPeer drops messages to the specified peer.
1539func (s *EtcdServer) CutPeer(id types.ID) {
1540	tr, ok := s.r.transport.(*rafthttp.Transport)
1541	if ok {
1542		tr.CutPeer(id)
1543	}
1544}
1545
1546// MendPeer recovers the message dropping behavior of the given peer.
1547func (s *EtcdServer) MendPeer(id types.ID) {
1548	tr, ok := s.r.transport.(*rafthttp.Transport)
1549	if ok {
1550		tr.MendPeer(id)
1551	}
1552}
1553
1554func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
1555
1556func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
1557
1558func (s *EtcdServer) ClusterVersion() *semver.Version {
1559	if s.cluster == nil {
1560		return nil
1561	}
1562	return s.cluster.Version()
1563}
1564
1565// monitorVersions checks the member's version every monitorVersionInterval.
1566// It updates the cluster version if all members agrees on a higher one.
1567// It prints out log if there is a member with a higher version than the
1568// local version.
1569func (s *EtcdServer) monitorVersions() {
1570	for {
1571		select {
1572		case <-s.forceVersionC:
1573		case <-time.After(monitorVersionInterval):
1574		case <-s.stopping:
1575			return
1576		}
1577
1578		if s.Leader() != s.ID() {
1579			continue
1580		}
1581
1582		v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt))
1583		if v != nil {
1584			// only keep major.minor version for comparison
1585			v = &semver.Version{
1586				Major: v.Major,
1587				Minor: v.Minor,
1588			}
1589		}
1590
1591		// if the current version is nil:
1592		// 1. use the decided version if possible
1593		// 2. or use the min cluster version
1594		if s.cluster.Version() == nil {
1595			verStr := version.MinClusterVersion
1596			if v != nil {
1597				verStr = v.String()
1598			}
1599			s.goAttach(func() { s.updateClusterVersion(verStr) })
1600			continue
1601		}
1602
1603		// update cluster version only if the decided version is greater than
1604		// the current cluster version
1605		if v != nil && s.cluster.Version().LessThan(*v) {
1606			s.goAttach(func() { s.updateClusterVersion(v.String()) })
1607		}
1608	}
1609}
1610
1611func (s *EtcdServer) updateClusterVersion(ver string) {
1612	if s.cluster.Version() == nil {
1613		plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver))
1614	} else {
1615		plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver))
1616	}
1617	req := pb.Request{
1618		Method: "PUT",
1619		Path:   membership.StoreClusterVersionKey(),
1620		Val:    ver,
1621	}
1622	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
1623	_, err := s.Do(ctx, req)
1624	cancel()
1625	switch err {
1626	case nil:
1627		return
1628	case ErrStopped:
1629		plog.Infof("aborting update cluster version because server is stopped")
1630		return
1631	default:
1632		plog.Errorf("error updating cluster version (%v)", err)
1633	}
1634}
1635
1636func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
1637	switch err {
1638	case context.Canceled:
1639		return ErrCanceled
1640	case context.DeadlineExceeded:
1641		s.leadTimeMu.RLock()
1642		curLeadElected := s.leadElectedTime
1643		s.leadTimeMu.RUnlock()
1644		prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
1645		if start.After(prevLeadLost) && start.Before(curLeadElected) {
1646			return ErrTimeoutDueToLeaderFail
1647		}
1648
1649		lead := types.ID(atomic.LoadUint64(&s.r.lead))
1650		switch lead {
1651		case types.ID(raft.None):
1652			// TODO: return error to specify it happens because the cluster does not have leader now
1653		case s.ID():
1654			if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
1655				return ErrTimeoutDueToConnectionLost
1656			}
1657		default:
1658			if !isConnectedSince(s.r.transport, start, lead) {
1659				return ErrTimeoutDueToConnectionLost
1660			}
1661		}
1662
1663		return ErrTimeout
1664	default:
1665		return err
1666	}
1667}
1668
1669func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
1670func (s *EtcdServer) Backend() backend.Backend {
1671	s.bemu.Lock()
1672	defer s.bemu.Unlock()
1673	return s.be
1674}
1675
1676func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
1677
1678func (s *EtcdServer) restoreAlarms() error {
1679	s.applyV3 = s.newApplierV3()
1680	as, err := alarm.NewAlarmStore(s)
1681	if err != nil {
1682		return err
1683	}
1684	s.alarmStore = as
1685	if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
1686		s.applyV3 = newApplierV3Capped(s.applyV3)
1687	}
1688	return nil
1689}
1690
1691func (s *EtcdServer) getAppliedIndex() uint64 {
1692	return atomic.LoadUint64(&s.appliedIndex)
1693}
1694
1695func (s *EtcdServer) setAppliedIndex(v uint64) {
1696	atomic.StoreUint64(&s.appliedIndex, v)
1697}
1698
1699func (s *EtcdServer) getCommittedIndex() uint64 {
1700	return atomic.LoadUint64(&s.committedIndex)
1701}
1702
1703func (s *EtcdServer) setCommittedIndex(v uint64) {
1704	atomic.StoreUint64(&s.committedIndex, v)
1705}
1706
1707// goAttach creates a goroutine on a given function and tracks it using
1708// the etcdserver waitgroup.
1709func (s *EtcdServer) goAttach(f func()) {
1710	s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
1711	defer s.wgMu.RUnlock()
1712	select {
1713	case <-s.stopping:
1714		plog.Warning("server has stopped (skipping goAttach)")
1715		return
1716	default:
1717	}
1718
1719	// now safe to add since waitgroup wait has not started yet
1720	s.wg.Add(1)
1721	go func() {
1722		defer s.wg.Done()
1723		f()
1724	}()
1725}
1726