1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18	"context"
19	"encoding/json"
20	"expvar"
21	"fmt"
22	"math"
23	"math/rand"
24	"net/http"
25	"os"
26	"path"
27	"regexp"
28	"sync"
29	"sync/atomic"
30	"time"
31
32	"go.etcd.io/etcd/auth"
33	"go.etcd.io/etcd/etcdserver/api"
34	"go.etcd.io/etcd/etcdserver/api/membership"
35	"go.etcd.io/etcd/etcdserver/api/membership/membershippb"
36	"go.etcd.io/etcd/etcdserver/api/rafthttp"
37	"go.etcd.io/etcd/etcdserver/api/snap"
38	"go.etcd.io/etcd/etcdserver/api/v2discovery"
39	"go.etcd.io/etcd/etcdserver/api/v2http/httptypes"
40	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
41	"go.etcd.io/etcd/etcdserver/api/v2store"
42	"go.etcd.io/etcd/etcdserver/api/v3alarm"
43	"go.etcd.io/etcd/etcdserver/api/v3compactor"
44	"go.etcd.io/etcd/etcdserver/cindex"
45	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
46	"go.etcd.io/etcd/lease"
47	"go.etcd.io/etcd/lease/leasehttp"
48	"go.etcd.io/etcd/mvcc"
49	"go.etcd.io/etcd/mvcc/backend"
50	"go.etcd.io/etcd/pkg/fileutil"
51	"go.etcd.io/etcd/pkg/idutil"
52	"go.etcd.io/etcd/pkg/pbutil"
53	"go.etcd.io/etcd/pkg/runtime"
54	"go.etcd.io/etcd/pkg/schedule"
55	"go.etcd.io/etcd/pkg/traceutil"
56	"go.etcd.io/etcd/pkg/types"
57	"go.etcd.io/etcd/pkg/wait"
58	"go.etcd.io/etcd/raft"
59	"go.etcd.io/etcd/raft/raftpb"
60	"go.etcd.io/etcd/version"
61	"go.etcd.io/etcd/wal"
62
63	"github.com/coreos/go-semver/semver"
64	humanize "github.com/dustin/go-humanize"
65	"github.com/prometheus/client_golang/prometheus"
66	"go.uber.org/zap"
67)
68
69const (
70	DefaultSnapshotCount = 100000
71
72	// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
73	// to catch-up after compacting the raft storage entries.
74	// We expect the follower has a millisecond level latency with the leader.
75	// The max throughput is around 10K. Keep a 5K entries is enough for helping
76	// follower to catch up.
77	DefaultSnapshotCatchUpEntries uint64 = 5000
78
79	StoreClusterPrefix = "/0"
80	StoreKeysPrefix    = "/1"
81
82	// HealthInterval is the minimum time the cluster should be healthy
83	// before accepting add member requests.
84	HealthInterval = 5 * time.Second
85
86	purgeFileInterval = 30 * time.Second
87	// monitorVersionInterval should be smaller than the timeout
88	// on the connection. Or we will not be able to reuse the connection
89	// (since it will timeout).
90	monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
91
92	// max number of in-flight snapshot messages etcdserver allows to have
93	// This number is more than enough for most clusters with 5 machines.
94	maxInFlightMsgSnap = 16
95
96	releaseDelayAfterSnapshot = 30 * time.Second
97
98	// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
99	maxPendingRevokes = 16
100
101	recommendedMaxRequestBytes = 10 * 1024 * 1024
102
103	readyPercent = 0.9
104)
105
106var (
107	storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
108)
109
110func init() {
111	rand.Seed(time.Now().UnixNano())
112
113	expvar.Publish(
114		"file_descriptor_limit",
115		expvar.Func(
116			func() interface{} {
117				n, _ := runtime.FDLimit()
118				return n
119			},
120		),
121	)
122}
123
124type Response struct {
125	Term    uint64
126	Index   uint64
127	Event   *v2store.Event
128	Watcher v2store.Watcher
129	Err     error
130}
131
132type ServerV2 interface {
133	Server
134	Leader() types.ID
135
136	// Do takes a V2 request and attempts to fulfill it, returning a Response.
137	Do(ctx context.Context, r pb.Request) (Response, error)
138	stats.Stats
139	ClientCertAuthEnabled() bool
140}
141
142type ServerV3 interface {
143	Server
144	RaftStatusGetter
145}
146
147func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
148
149type Server interface {
150	// AddMember attempts to add a member into the cluster. It will return
151	// ErrIDRemoved if member ID is removed from the cluster, or return
152	// ErrIDExists if member ID exists in the cluster.
153	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
154	// RemoveMember attempts to remove a member from the cluster. It will
155	// return ErrIDRemoved if member ID is removed from the cluster, or return
156	// ErrIDNotFound if member ID is not in the cluster.
157	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
158	// UpdateMember attempts to update an existing member in the cluster. It will
159	// return ErrIDNotFound if the member ID does not exist.
160	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
161	// PromoteMember attempts to promote a non-voting node to a voting node. It will
162	// return ErrIDNotFound if the member ID does not exist.
163	// return ErrLearnerNotReady if the member are not ready.
164	// return ErrMemberNotLearner if the member is not a learner.
165	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
166
167	// ClusterVersion is the cluster-wide minimum major.minor version.
168	// Cluster version is set to the min version that an etcd member is
169	// compatible with when first bootstrap.
170	//
171	// ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
172	//
173	// During a rolling upgrades, the ClusterVersion will be updated
174	// automatically after a sync. (5 second by default)
175	//
176	// The API/raft component can utilize ClusterVersion to determine if
177	// it can accept a client request or a raft RPC.
178	// NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
179	// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
180	// this feature is introduced post 2.0.
181	ClusterVersion() *semver.Version
182	Cluster() api.Cluster
183	Alarms() []*pb.AlarmMember
184}
185
186// EtcdServer is the production implementation of the Server interface
187type EtcdServer struct {
188	// inflightSnapshots holds count the number of snapshots currently inflight.
189	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
190	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
191	committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
192	term              uint64 // must use atomic operations to access; keep 64-bit aligned.
193	lead              uint64 // must use atomic operations to access; keep 64-bit aligned.
194
195	consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
196	r            raftNode                 // uses 64-bit atomics; keep 64-bit aligned.
197
198	readych chan struct{}
199	Cfg     ServerConfig
200
201	lgMu *sync.RWMutex
202	lg   *zap.Logger
203
204	w wait.Wait
205
206	readMu sync.RWMutex
207	// read routine notifies etcd server that it waits for reading by sending an empty struct to
208	// readwaitC
209	readwaitc chan struct{}
210	// readNotifier is used to notify the read routine that it can process the request
211	// when there is no error
212	readNotifier *notifier
213
214	// stop signals the run goroutine should shutdown.
215	stop chan struct{}
216	// stopping is closed by run goroutine on shutdown.
217	stopping chan struct{}
218	// done is closed when all goroutines from start() complete.
219	done chan struct{}
220	// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
221	leaderChanged   chan struct{}
222	leaderChangedMu sync.RWMutex
223
224	errorc     chan error
225	id         types.ID
226	attributes membership.Attributes
227
228	cluster *membership.RaftCluster
229
230	v2store     v2store.Store
231	snapshotter *snap.Snapshotter
232
233	applyV2 ApplierV2
234
235	// applyV3 is the applier with auth and quotas
236	applyV3 applierV3
237	// applyV3Base is the core applier without auth or quotas
238	applyV3Base applierV3
239	// applyV3Internal is the applier for internal request
240	applyV3Internal applierV3Internal
241	applyWait       wait.WaitTime
242
243	kv         mvcc.ConsistentWatchableKV
244	lessor     lease.Lessor
245	bemu       sync.Mutex
246	be         backend.Backend
247	authStore  auth.AuthStore
248	alarmStore *v3alarm.AlarmStore
249
250	stats  *stats.ServerStats
251	lstats *stats.LeaderStats
252
253	SyncTicker *time.Ticker
254	// compactor is used to auto-compact the KV.
255	compactor v3compactor.Compactor
256
257	// peerRt used to send requests (version, lease) to peers.
258	peerRt   http.RoundTripper
259	reqIDGen *idutil.Generator
260
261	// forceVersionC is used to force the version monitor loop
262	// to detect the cluster version immediately.
263	forceVersionC chan struct{}
264
265	// wgMu blocks concurrent waitgroup mutation while server stopping
266	wgMu sync.RWMutex
267	// wg is used to wait for the goroutines that depends on the server state
268	// to exit when stopping the server.
269	wg sync.WaitGroup
270
271	// ctx is used for etcd-initiated requests that may need to be canceled
272	// on etcd server shutdown.
273	ctx    context.Context
274	cancel context.CancelFunc
275
276	leadTimeMu      sync.RWMutex
277	leadElectedTime time.Time
278
279	*AccessController
280}
281
282// NewServer creates a new EtcdServer from the supplied configuration. The
283// configuration is considered static for the lifetime of the EtcdServer.
284func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
285	st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
286
287	var (
288		w  *wal.WAL
289		n  raft.Node
290		s  *raft.MemoryStorage
291		id types.ID
292		cl *membership.RaftCluster
293	)
294
295	if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
296		cfg.Logger.Warn(
297			"exceeded recommended request limit",
298			zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
299			zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
300			zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
301			zap.String("recommended-request-size", humanize.Bytes(uint64(recommendedMaxRequestBytes))),
302		)
303	}
304
305	if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
306		return nil, fmt.Errorf("cannot access data directory: %v", terr)
307	}
308
309	haveWAL := wal.Exist(cfg.WALDir())
310
311	if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
312		cfg.Logger.Fatal(
313			"failed to create snapshot directory",
314			zap.String("path", cfg.SnapDir()),
315			zap.Error(err),
316		)
317	}
318	ss := snap.New(cfg.Logger, cfg.SnapDir())
319
320	bepath := cfg.backendPath()
321	beExist := fileutil.Exist(bepath)
322	be := openBackend(cfg)
323
324	defer func() {
325		if err != nil {
326			be.Close()
327		}
328	}()
329
330	prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
331	if err != nil {
332		return nil, err
333	}
334	var (
335		remotes  []*membership.Member
336		snapshot *raftpb.Snapshot
337	)
338
339	switch {
340	case !haveWAL && !cfg.NewCluster:
341		if err = cfg.VerifyJoinExisting(); err != nil {
342			return nil, err
343		}
344		cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
345		if err != nil {
346			return nil, err
347		}
348		existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
349		if gerr != nil {
350			return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
351		}
352		if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
353			return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
354		}
355		if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
356			return nil, fmt.Errorf("incompatible with current running cluster")
357		}
358
359		remotes = existingCluster.Members()
360		cl.SetID(types.ID(0), existingCluster.ID())
361		cl.SetStore(st)
362		cl.SetBackend(be)
363		id, n, s, w = startNode(cfg, cl, nil)
364		cl.SetID(id, existingCluster.ID())
365
366	case !haveWAL && cfg.NewCluster:
367		if err = cfg.VerifyBootstrap(); err != nil {
368			return nil, err
369		}
370		cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
371		if err != nil {
372			return nil, err
373		}
374		m := cl.MemberByName(cfg.Name)
375		if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
376			return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
377		}
378		if cfg.ShouldDiscover() {
379			var str string
380			str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
381			if err != nil {
382				return nil, &DiscoveryError{Op: "join", Err: err}
383			}
384			var urlsmap types.URLsMap
385			urlsmap, err = types.NewURLsMap(str)
386			if err != nil {
387				return nil, err
388			}
389			if checkDuplicateURL(urlsmap) {
390				return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
391			}
392			if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
393				return nil, err
394			}
395		}
396		cl.SetStore(st)
397		cl.SetBackend(be)
398		id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
399		cl.SetID(id, cl.ID())
400
401	case haveWAL:
402		if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
403			return nil, fmt.Errorf("cannot write to member directory: %v", err)
404		}
405
406		if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
407			return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
408		}
409
410		if cfg.ShouldDiscover() {
411			cfg.Logger.Warn(
412				"discovery token is ignored since cluster already initialized; valid logs are found",
413				zap.String("wal-dir", cfg.WALDir()),
414			)
415		}
416		snapshot, err = ss.Load()
417		if err != nil && err != snap.ErrNoSnapshot {
418			return nil, err
419		}
420		if snapshot != nil {
421			if err = st.Recovery(snapshot.Data); err != nil {
422				cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
423			}
424
425			cfg.Logger.Info(
426				"recovered v2 store from snapshot",
427				zap.Uint64("snapshot-index", snapshot.Metadata.Index),
428				zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
429			)
430
431			if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil {
432				cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
433			}
434			s1, s2 := be.Size(), be.SizeInUse()
435			cfg.Logger.Info(
436				"recovered v3 backend from snapshot",
437				zap.Int64("backend-size-bytes", s1),
438				zap.String("backend-size", humanize.Bytes(uint64(s1))),
439				zap.Int64("backend-size-in-use-bytes", s2),
440				zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
441			)
442		}
443
444		if !cfg.ForceNewCluster {
445			id, cl, n, s, w = restartNode(cfg, snapshot)
446		} else {
447			id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
448		}
449
450		cl.SetStore(st)
451		cl.SetBackend(be)
452		cl.Recover(api.UpdateCapability)
453		if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
454			os.RemoveAll(bepath)
455			return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
456		}
457
458	default:
459		return nil, fmt.Errorf("unsupported bootstrap config")
460	}
461
462	if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
463		return nil, fmt.Errorf("cannot access member directory: %v", terr)
464	}
465
466	sstats := stats.NewServerStats(cfg.Name, id.String())
467	lstats := stats.NewLeaderStats(cfg.Logger, id.String())
468
469	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
470	srv = &EtcdServer{
471		readych:     make(chan struct{}),
472		Cfg:         cfg,
473		lgMu:        new(sync.RWMutex),
474		lg:          cfg.Logger,
475		errorc:      make(chan error, 1),
476		v2store:     st,
477		snapshotter: ss,
478		r: *newRaftNode(
479			raftNodeConfig{
480				lg:          cfg.Logger,
481				isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
482				Node:        n,
483				heartbeat:   heartbeat,
484				raftStorage: s,
485				storage:     NewStorage(w, ss),
486			},
487		),
488		id:               id,
489		attributes:       membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
490		cluster:          cl,
491		stats:            sstats,
492		lstats:           lstats,
493		SyncTicker:       time.NewTicker(500 * time.Millisecond),
494		peerRt:           prt,
495		reqIDGen:         idutil.NewGenerator(uint16(id), time.Now()),
496		forceVersionC:    make(chan struct{}),
497		AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
498		consistIndex:     cindex.NewConsistentIndex(be.BatchTx()),
499	}
500	serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
501
502	srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
503
504	srv.be = be
505	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
506
507	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
508	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
509	srv.lessor = lease.NewLessor(
510		srv.getLogger(),
511		srv.be,
512		lease.LessorConfig{
513			MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
514			CheckpointInterval:         cfg.LeaseCheckpointInterval,
515			ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
516		})
517
518	tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
519		func(index uint64) <-chan struct{} {
520			return srv.applyWait.Wait(index)
521		},
522	)
523	if err != nil {
524		cfg.Logger.Warn("failed to create token provider", zap.Error(err))
525		return nil, err
526	}
527	srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
528	kvindex := srv.consistIndex.ConsistentIndex()
529	srv.lg.Debug("restore consistentIndex",
530		zap.Uint64("index", kvindex))
531	if beExist {
532		// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
533		// etcd from pre-3.0 release.
534		if snapshot != nil && kvindex < snapshot.Metadata.Index {
535			if kvindex != 0 {
536				return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index)
537			}
538			cfg.Logger.Warn(
539				"consistent index was never saved",
540				zap.Uint64("snapshot-index", snapshot.Metadata.Index),
541			)
542		}
543	}
544
545	srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost))
546
547	newSrv := srv // since srv == nil in defer if srv is returned as nil
548	defer func() {
549		// closing backend without first closing kv can cause
550		// resumed compactions to fail with closed tx errors
551		if err != nil {
552			newSrv.kv.Close()
553		}
554	}()
555	if num := cfg.AutoCompactionRetention; num != 0 {
556		srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
557		if err != nil {
558			return nil, err
559		}
560		srv.compactor.Run()
561	}
562
563	srv.applyV3Base = srv.newApplierV3Backend()
564	srv.applyV3Internal = srv.newApplierV3Internal()
565	if err = srv.restoreAlarms(); err != nil {
566		return nil, err
567	}
568
569	if srv.Cfg.EnableLeaseCheckpoint {
570		// setting checkpointer enables lease checkpoint feature.
571		srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
572			srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
573		})
574	}
575
576	// TODO: move transport initialization near the definition of remote
577	tr := &rafthttp.Transport{
578		Logger:      cfg.Logger,
579		TLSInfo:     cfg.PeerTLSInfo,
580		DialTimeout: cfg.peerDialTimeout(),
581		ID:          id,
582		URLs:        cfg.PeerURLs,
583		ClusterID:   cl.ID(),
584		Raft:        srv,
585		Snapshotter: ss,
586		ServerStats: sstats,
587		LeaderStats: lstats,
588		ErrorC:      srv.errorc,
589	}
590	if err = tr.Start(); err != nil {
591		return nil, err
592	}
593	// add all remotes into transport
594	for _, m := range remotes {
595		if m.ID != id {
596			tr.AddRemote(m.ID, m.PeerURLs)
597		}
598	}
599	for _, m := range cl.Members() {
600		if m.ID != id {
601			tr.AddPeer(m.ID, m.PeerURLs)
602		}
603	}
604	srv.r.transport = tr
605
606	return srv, nil
607}
608
609func (s *EtcdServer) getLogger() *zap.Logger {
610	s.lgMu.RLock()
611	l := s.lg
612	s.lgMu.RUnlock()
613	return l
614}
615
616func tickToDur(ticks int, tickMs uint) string {
617	return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
618}
619
620func (s *EtcdServer) adjustTicks() {
621	lg := s.getLogger()
622	clusterN := len(s.cluster.Members())
623
624	// single-node fresh start, or single-node recovers from snapshot
625	if clusterN == 1 {
626		ticks := s.Cfg.ElectionTicks - 1
627		lg.Info(
628			"started as single-node; fast-forwarding election ticks",
629			zap.String("local-member-id", s.ID().String()),
630			zap.Int("forward-ticks", ticks),
631			zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
632			zap.Int("election-ticks", s.Cfg.ElectionTicks),
633			zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
634		)
635		s.r.advanceTicks(ticks)
636		return
637	}
638
639	if !s.Cfg.InitialElectionTickAdvance {
640		lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
641		return
642	}
643	lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
644
645	// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
646	// until peer connection reports; otherwise:
647	// 1. all connections failed, or
648	// 2. no active peers, or
649	// 3. restarted single-node with no snapshot
650	// then, do nothing, because advancing ticks would have no effect
651	waitTime := rafthttp.ConnReadTimeout
652	itv := 50 * time.Millisecond
653	for i := int64(0); i < int64(waitTime/itv); i++ {
654		select {
655		case <-time.After(itv):
656		case <-s.stopping:
657			return
658		}
659
660		peerN := s.r.transport.ActivePeers()
661		if peerN > 1 {
662			// multi-node received peer connection reports
663			// adjust ticks, in case slow leader message receive
664			ticks := s.Cfg.ElectionTicks - 2
665
666			lg.Info(
667				"initialized peer connections; fast-forwarding election ticks",
668				zap.String("local-member-id", s.ID().String()),
669				zap.Int("forward-ticks", ticks),
670				zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
671				zap.Int("election-ticks", s.Cfg.ElectionTicks),
672				zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
673				zap.Int("active-remote-members", peerN),
674			)
675
676			s.r.advanceTicks(ticks)
677			return
678		}
679	}
680}
681
682// Start performs any initialization of the Server necessary for it to
683// begin serving requests. It must be called before Do or Process.
684// Start must be non-blocking; any long-running server functionality
685// should be implemented in goroutines.
686func (s *EtcdServer) Start() {
687	s.start()
688	s.goAttach(func() { s.adjustTicks() })
689	s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
690	s.goAttach(s.purgeFile)
691	s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
692	s.goAttach(s.monitorVersions)
693	s.goAttach(s.linearizableReadLoop)
694	s.goAttach(s.monitorKVHash)
695}
696
697// start prepares and starts server in a new goroutine. It is no longer safe to
698// modify a server's fields after it has been sent to Start.
699// This function is just used for testing.
700func (s *EtcdServer) start() {
701	lg := s.getLogger()
702
703	if s.Cfg.SnapshotCount == 0 {
704		lg.Info(
705			"updating snapshot-count to default",
706			zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
707			zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
708		)
709		s.Cfg.SnapshotCount = DefaultSnapshotCount
710	}
711	if s.Cfg.SnapshotCatchUpEntries == 0 {
712		lg.Info(
713			"updating snapshot catch-up entries to default",
714			zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
715			zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
716		)
717		s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
718	}
719
720	s.w = wait.New()
721	s.applyWait = wait.NewTimeList()
722	s.done = make(chan struct{})
723	s.stop = make(chan struct{})
724	s.stopping = make(chan struct{})
725	s.ctx, s.cancel = context.WithCancel(context.Background())
726	s.readwaitc = make(chan struct{}, 1)
727	s.readNotifier = newNotifier()
728	s.leaderChanged = make(chan struct{})
729	if s.ClusterVersion() != nil {
730		lg.Info(
731			"starting etcd server",
732			zap.String("local-member-id", s.ID().String()),
733			zap.String("local-server-version", version.Version),
734			zap.String("cluster-id", s.Cluster().ID().String()),
735			zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
736		)
737		membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
738	} else {
739		lg.Info(
740			"starting etcd server",
741			zap.String("local-member-id", s.ID().String()),
742			zap.String("local-server-version", version.Version),
743			zap.String("cluster-version", "to_be_decided"),
744		)
745	}
746
747	// TODO: if this is an empty log, writes all peer infos
748	// into the first entry
749	go s.run()
750}
751
752func (s *EtcdServer) purgeFile() {
753	lg := s.getLogger()
754	var dberrc, serrc, werrc <-chan error
755	var dbdonec, sdonec, wdonec <-chan struct{}
756	if s.Cfg.MaxSnapFiles > 0 {
757		dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
758		sdonec, serrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
759	}
760	if s.Cfg.MaxWALFiles > 0 {
761		wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
762	}
763
764	select {
765	case e := <-dberrc:
766		lg.Fatal("failed to purge snap db file", zap.Error(e))
767	case e := <-serrc:
768		lg.Fatal("failed to purge snap file", zap.Error(e))
769	case e := <-werrc:
770		lg.Fatal("failed to purge wal file", zap.Error(e))
771	case <-s.stopping:
772		if dbdonec != nil {
773			<-dbdonec
774		}
775		if sdonec != nil {
776			<-sdonec
777		}
778		if wdonec != nil {
779			<-wdonec
780		}
781		return
782	}
783}
784
785func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
786
787func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
788
789type ServerPeer interface {
790	ServerV2
791	RaftHandler() http.Handler
792	LeaseHandler() http.Handler
793}
794
795func (s *EtcdServer) LeaseHandler() http.Handler {
796	if s.lessor == nil {
797		return nil
798	}
799	return leasehttp.NewHandler(s.lessor, s.ApplyWait)
800}
801
802func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
803
804// Process takes a raft message and applies it to the server's raft state
805// machine, respecting any timeout of the given context.
806func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
807	lg := s.getLogger()
808	if s.cluster.IsIDRemoved(types.ID(m.From)) {
809		lg.Warn(
810			"rejected Raft message from removed member",
811			zap.String("local-member-id", s.ID().String()),
812			zap.String("removed-member-id", types.ID(m.From).String()),
813		)
814		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
815	}
816	if m.Type == raftpb.MsgApp {
817		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
818	}
819	return s.r.Step(ctx, m)
820}
821
822func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
823
824func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
825
826// ReportSnapshot reports snapshot sent status to the raft state machine,
827// and clears the used snapshot from the snapshot store.
828func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
829	s.r.ReportSnapshot(id, status)
830}
831
832type etcdProgress struct {
833	confState raftpb.ConfState
834	snapi     uint64
835	appliedt  uint64
836	appliedi  uint64
837}
838
839// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
840// and helps decouple state machine logic from Raft algorithms.
841// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
842type raftReadyHandler struct {
843	getLead              func() (lead uint64)
844	updateLead           func(lead uint64)
845	updateLeadership     func(newLeader bool)
846	updateCommittedIndex func(uint64)
847}
848
849func (s *EtcdServer) run() {
850	lg := s.getLogger()
851
852	sn, err := s.r.raftStorage.Snapshot()
853	if err != nil {
854		lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
855	}
856
857	// asynchronously accept apply packets, dispatch progress in-order
858	sched := schedule.NewFIFOScheduler()
859
860	var (
861		smu   sync.RWMutex
862		syncC <-chan time.Time
863	)
864	setSyncC := func(ch <-chan time.Time) {
865		smu.Lock()
866		syncC = ch
867		smu.Unlock()
868	}
869	getSyncC := func() (ch <-chan time.Time) {
870		smu.RLock()
871		ch = syncC
872		smu.RUnlock()
873		return
874	}
875	rh := &raftReadyHandler{
876		getLead:    func() (lead uint64) { return s.getLead() },
877		updateLead: func(lead uint64) { s.setLead(lead) },
878		updateLeadership: func(newLeader bool) {
879			if !s.isLeader() {
880				if s.lessor != nil {
881					s.lessor.Demote()
882				}
883				if s.compactor != nil {
884					s.compactor.Pause()
885				}
886				setSyncC(nil)
887			} else {
888				if newLeader {
889					t := time.Now()
890					s.leadTimeMu.Lock()
891					s.leadElectedTime = t
892					s.leadTimeMu.Unlock()
893				}
894				setSyncC(s.SyncTicker.C)
895				if s.compactor != nil {
896					s.compactor.Resume()
897				}
898			}
899			if newLeader {
900				s.leaderChangedMu.Lock()
901				lc := s.leaderChanged
902				s.leaderChanged = make(chan struct{})
903				close(lc)
904				s.leaderChangedMu.Unlock()
905			}
906			// TODO: remove the nil checking
907			// current test utility does not provide the stats
908			if s.stats != nil {
909				s.stats.BecomeLeader()
910			}
911		},
912		updateCommittedIndex: func(ci uint64) {
913			cci := s.getCommittedIndex()
914			if ci > cci {
915				s.setCommittedIndex(ci)
916			}
917		},
918	}
919	s.r.start(rh)
920
921	ep := etcdProgress{
922		confState: sn.Metadata.ConfState,
923		snapi:     sn.Metadata.Index,
924		appliedt:  sn.Metadata.Term,
925		appliedi:  sn.Metadata.Index,
926	}
927
928	defer func() {
929		s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
930		close(s.stopping)
931		s.wgMu.Unlock()
932		s.cancel()
933
934		sched.Stop()
935
936		// wait for gouroutines before closing raft so wal stays open
937		s.wg.Wait()
938
939		s.SyncTicker.Stop()
940
941		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
942		// by adding a peer after raft stops the transport
943		s.r.stop()
944
945		// kv, lessor and backend can be nil if running without v3 enabled
946		// or running unit tests.
947		if s.lessor != nil {
948			s.lessor.Stop()
949		}
950		if s.kv != nil {
951			s.kv.Close()
952		}
953		if s.authStore != nil {
954			s.authStore.Close()
955		}
956		if s.be != nil {
957			s.be.Close()
958		}
959		if s.compactor != nil {
960			s.compactor.Stop()
961		}
962		close(s.done)
963	}()
964
965	var expiredLeaseC <-chan []*lease.Lease
966	if s.lessor != nil {
967		expiredLeaseC = s.lessor.ExpiredLeasesC()
968	}
969
970	for {
971		select {
972		case ap := <-s.r.apply():
973			f := func(context.Context) { s.applyAll(&ep, &ap) }
974			sched.Schedule(f)
975		case leases := <-expiredLeaseC:
976			s.goAttach(func() {
977				// Increases throughput of expired leases deletion process through parallelization
978				c := make(chan struct{}, maxPendingRevokes)
979				for _, lease := range leases {
980					select {
981					case c <- struct{}{}:
982					case <-s.stopping:
983						return
984					}
985					lid := lease.ID
986					s.goAttach(func() {
987						ctx := s.authStore.WithRoot(s.ctx)
988						_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
989						if lerr == nil {
990							leaseExpired.Inc()
991						} else {
992							lg.Warn(
993								"failed to revoke lease",
994								zap.String("lease-id", fmt.Sprintf("%016x", lid)),
995								zap.Error(lerr),
996							)
997						}
998
999						<-c
1000					})
1001				}
1002			})
1003		case err := <-s.errorc:
1004			lg.Warn("server error", zap.Error(err))
1005			lg.Warn("data-dir used by this member must be removed")
1006			return
1007		case <-getSyncC():
1008			if s.v2store.HasTTLKeys() {
1009				s.sync(s.Cfg.ReqTimeout())
1010			}
1011		case <-s.stop:
1012			return
1013		}
1014	}
1015}
1016
1017func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
1018	s.applySnapshot(ep, apply)
1019	s.applyEntries(ep, apply)
1020
1021	proposalsApplied.Set(float64(ep.appliedi))
1022	s.applyWait.Trigger(ep.appliedi)
1023
1024	// wait for the raft routine to finish the disk writes before triggering a
1025	// snapshot. or applied index might be greater than the last index in raft
1026	// storage, since the raft routine might be slower than apply routine.
1027	<-apply.notifyc
1028
1029	s.triggerSnapshot(ep)
1030	select {
1031	// snapshot requested via send()
1032	case m := <-s.r.msgSnapC:
1033		merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
1034		s.sendMergedSnap(merged)
1035	default:
1036	}
1037}
1038
1039func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
1040	if raft.IsEmptySnap(apply.snapshot) {
1041		return
1042	}
1043	applySnapshotInProgress.Inc()
1044
1045	lg := s.getLogger()
1046	lg.Info(
1047		"applying snapshot",
1048		zap.Uint64("current-snapshot-index", ep.snapi),
1049		zap.Uint64("current-applied-index", ep.appliedi),
1050		zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1051		zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1052	)
1053	defer func() {
1054		lg.Info(
1055			"applied snapshot",
1056			zap.Uint64("current-snapshot-index", ep.snapi),
1057			zap.Uint64("current-applied-index", ep.appliedi),
1058			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1059			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1060		)
1061		applySnapshotInProgress.Dec()
1062	}()
1063
1064	if apply.snapshot.Metadata.Index <= ep.appliedi {
1065		lg.Panic(
1066			"unexpected leader snapshot from outdated index",
1067			zap.Uint64("current-snapshot-index", ep.snapi),
1068			zap.Uint64("current-applied-index", ep.appliedi),
1069			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1070			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1071		)
1072	}
1073
1074	// wait for raftNode to persist snapshot onto the disk
1075	<-apply.notifyc
1076
1077	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
1078	if err != nil {
1079		lg.Panic("failed to open snapshot backend", zap.Error(err))
1080	}
1081
1082	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
1083	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
1084	if s.lessor != nil {
1085		lg.Info("restoring lease store")
1086
1087		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
1088
1089		lg.Info("restored lease store")
1090	}
1091
1092	lg.Info("restoring mvcc store")
1093
1094	if err := s.kv.Restore(newbe); err != nil {
1095		lg.Panic("failed to restore mvcc store", zap.Error(err))
1096	}
1097
1098	s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex())
1099	lg.Info("restored mvcc store")
1100
1101	// Closing old backend might block until all the txns
1102	// on the backend are finished.
1103	// We do not want to wait on closing the old backend.
1104	s.bemu.Lock()
1105	oldbe := s.be
1106	go func() {
1107		lg.Info("closing old backend file")
1108		defer func() {
1109			lg.Info("closed old backend file")
1110		}()
1111		if err := oldbe.Close(); err != nil {
1112			lg.Panic("failed to close old backend", zap.Error(err))
1113		}
1114	}()
1115
1116	s.be = newbe
1117	s.bemu.Unlock()
1118
1119	lg.Info("restoring alarm store")
1120
1121	if err := s.restoreAlarms(); err != nil {
1122		lg.Panic("failed to restore alarm store", zap.Error(err))
1123	}
1124
1125	lg.Info("restored alarm store")
1126
1127	if s.authStore != nil {
1128		lg.Info("restoring auth store")
1129
1130		s.authStore.Recover(newbe)
1131
1132		lg.Info("restored auth store")
1133	}
1134
1135	lg.Info("restoring v2 store")
1136	if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
1137		lg.Panic("failed to restore v2 store", zap.Error(err))
1138	}
1139
1140	lg.Info("restored v2 store")
1141
1142	s.cluster.SetBackend(newbe)
1143
1144	lg.Info("restoring cluster configuration")
1145
1146	s.cluster.Recover(api.UpdateCapability)
1147
1148	lg.Info("restored cluster configuration")
1149	lg.Info("removing old peers from network")
1150
1151	// recover raft transport
1152	s.r.transport.RemoveAllPeers()
1153
1154	lg.Info("removed old peers from network")
1155	lg.Info("adding peers from new cluster configuration")
1156
1157	for _, m := range s.cluster.Members() {
1158		if m.ID == s.ID() {
1159			continue
1160		}
1161		s.r.transport.AddPeer(m.ID, m.PeerURLs)
1162	}
1163
1164	lg.Info("added peers from new cluster configuration")
1165
1166	ep.appliedt = apply.snapshot.Metadata.Term
1167	ep.appliedi = apply.snapshot.Metadata.Index
1168	ep.snapi = ep.appliedi
1169	ep.confState = apply.snapshot.Metadata.ConfState
1170}
1171
1172func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
1173	if len(apply.entries) == 0 {
1174		return
1175	}
1176	firsti := apply.entries[0].Index
1177	if firsti > ep.appliedi+1 {
1178		lg := s.getLogger()
1179		lg.Panic(
1180			"unexpected committed entry index",
1181			zap.Uint64("current-applied-index", ep.appliedi),
1182			zap.Uint64("first-committed-entry-index", firsti),
1183		)
1184	}
1185	var ents []raftpb.Entry
1186	if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
1187		ents = apply.entries[ep.appliedi+1-firsti:]
1188	}
1189	if len(ents) == 0 {
1190		return
1191	}
1192	var shouldstop bool
1193	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
1194		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
1195	}
1196}
1197
1198func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1199	if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
1200		return
1201	}
1202
1203	lg := s.getLogger()
1204	lg.Info(
1205		"triggering snapshot",
1206		zap.String("local-member-id", s.ID().String()),
1207		zap.Uint64("local-member-applied-index", ep.appliedi),
1208		zap.Uint64("local-member-snapshot-index", ep.snapi),
1209		zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1210	)
1211
1212	s.snapshot(ep.appliedi, ep.confState)
1213	ep.snapi = ep.appliedi
1214}
1215
1216func (s *EtcdServer) hasMultipleVotingMembers() bool {
1217	return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
1218}
1219
1220func (s *EtcdServer) isLeader() bool {
1221	return uint64(s.ID()) == s.Lead()
1222}
1223
1224// MoveLeader transfers the leader to the given transferee.
1225func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1226	if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
1227		return ErrBadLeaderTransferee
1228	}
1229
1230	now := time.Now()
1231	interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1232
1233	lg := s.getLogger()
1234	lg.Info(
1235		"leadership transfer starting",
1236		zap.String("local-member-id", s.ID().String()),
1237		zap.String("current-leader-member-id", types.ID(lead).String()),
1238		zap.String("transferee-member-id", types.ID(transferee).String()),
1239	)
1240
1241	s.r.TransferLeadership(ctx, lead, transferee)
1242	for s.Lead() != transferee {
1243		select {
1244		case <-ctx.Done(): // time out
1245			return ErrTimeoutLeaderTransfer
1246		case <-time.After(interval):
1247		}
1248	}
1249
1250	// TODO: drain all requests, or drop all messages to the old leader
1251	lg.Info(
1252		"leadership transfer finished",
1253		zap.String("local-member-id", s.ID().String()),
1254		zap.String("old-leader-member-id", types.ID(lead).String()),
1255		zap.String("new-leader-member-id", types.ID(transferee).String()),
1256		zap.Duration("took", time.Since(now)),
1257	)
1258	return nil
1259}
1260
1261// TransferLeadership transfers the leader to the chosen transferee.
1262func (s *EtcdServer) TransferLeadership() error {
1263	lg := s.getLogger()
1264	if !s.isLeader() {
1265		lg.Info(
1266			"skipped leadership transfer; local server is not leader",
1267			zap.String("local-member-id", s.ID().String()),
1268			zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1269		)
1270		return nil
1271	}
1272
1273	if !s.hasMultipleVotingMembers() {
1274		lg.Info(
1275			"skipped leadership transfer for single voting member cluster",
1276			zap.String("local-member-id", s.ID().String()),
1277			zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1278		)
1279		return nil
1280	}
1281
1282	transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
1283	if !ok {
1284		return ErrUnhealthy
1285	}
1286
1287	tm := s.Cfg.ReqTimeout()
1288	ctx, cancel := context.WithTimeout(s.ctx, tm)
1289	err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1290	cancel()
1291	return err
1292}
1293
1294// HardStop stops the server without coordination with other members in the cluster.
1295func (s *EtcdServer) HardStop() {
1296	select {
1297	case s.stop <- struct{}{}:
1298	case <-s.done:
1299		return
1300	}
1301	<-s.done
1302}
1303
1304// Stop stops the server gracefully, and shuts down the running goroutine.
1305// Stop should be called after a Start(s), otherwise it will block forever.
1306// When stopping leader, Stop transfers its leadership to one of its peers
1307// before stopping the server.
1308// Stop terminates the Server and performs any necessary finalization.
1309// Do and Process cannot be called after Stop has been invoked.
1310func (s *EtcdServer) Stop() {
1311	lg := s.getLogger()
1312	if err := s.TransferLeadership(); err != nil {
1313		lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
1314	}
1315	s.HardStop()
1316}
1317
1318// ReadyNotify returns a channel that will be closed when the server
1319// is ready to serve client requests
1320func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1321
1322func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1323	select {
1324	case <-time.After(d):
1325	case <-s.done:
1326	}
1327	select {
1328	case s.errorc <- err:
1329	default:
1330	}
1331}
1332
1333// StopNotify returns a channel that receives a empty struct
1334// when the server is stopped.
1335func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1336
1337func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1338
1339func (s *EtcdServer) LeaderStats() []byte {
1340	lead := s.getLead()
1341	if lead != uint64(s.id) {
1342		return nil
1343	}
1344	return s.lstats.JSON()
1345}
1346
1347func (s *EtcdServer) StoreStats() []byte { return s.v2store.JsonStats() }
1348
1349func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1350	if s.authStore == nil {
1351		// In the context of ordinary etcd process, s.authStore will never be nil.
1352		// This branch is for handling cases in server_test.go
1353		return nil
1354	}
1355
1356	// Note that this permission check is done in the API layer,
1357	// so TOCTOU problem can be caused potentially in a schedule like this:
1358	// update membership with user A -> revoke root role of A -> apply membership change
1359	// in the state machine layer
1360	// However, both of membership change and role management requires the root privilege.
1361	// So careful operation by admins can prevent the problem.
1362	authInfo, err := s.AuthInfoFromCtx(ctx)
1363	if err != nil {
1364		return err
1365	}
1366
1367	return s.AuthStore().IsAdminPermitted(authInfo)
1368}
1369
1370func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1371	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1372		return nil, err
1373	}
1374
1375	// TODO: move Member to protobuf type
1376	b, err := json.Marshal(memb)
1377	if err != nil {
1378		return nil, err
1379	}
1380
1381	// by default StrictReconfigCheck is enabled; reject new members if unhealthy.
1382	if err := s.mayAddMember(memb); err != nil {
1383		return nil, err
1384	}
1385
1386	cc := raftpb.ConfChange{
1387		Type:    raftpb.ConfChangeAddNode,
1388		NodeID:  uint64(memb.ID),
1389		Context: b,
1390	}
1391
1392	if memb.IsLearner {
1393		cc.Type = raftpb.ConfChangeAddLearnerNode
1394	}
1395
1396	return s.configure(ctx, cc)
1397}
1398
1399func (s *EtcdServer) mayAddMember(memb membership.Member) error {
1400	lg := s.getLogger()
1401	if !s.Cfg.StrictReconfigCheck {
1402		return nil
1403	}
1404
1405	// protect quorum when adding voting member
1406	if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
1407		lg.Warn(
1408			"rejecting member add request; not enough healthy members",
1409			zap.String("local-member-id", s.ID().String()),
1410			zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1411			zap.Error(ErrNotEnoughStartedMembers),
1412		)
1413		return ErrNotEnoughStartedMembers
1414	}
1415
1416	if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) {
1417		lg.Warn(
1418			"rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
1419			zap.String("local-member-id", s.ID().String()),
1420			zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1421			zap.Error(ErrUnhealthy),
1422		)
1423		return ErrUnhealthy
1424	}
1425
1426	return nil
1427}
1428
1429func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1430	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1431		return nil, err
1432	}
1433
1434	// by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1435	if err := s.mayRemoveMember(types.ID(id)); err != nil {
1436		return nil, err
1437	}
1438
1439	cc := raftpb.ConfChange{
1440		Type:   raftpb.ConfChangeRemoveNode,
1441		NodeID: id,
1442	}
1443	return s.configure(ctx, cc)
1444}
1445
1446// PromoteMember promotes a learner node to a voting node.
1447func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1448	// only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
1449	// fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
1450	// other than ErrNotLeader, return the error.
1451	resp, err := s.promoteMember(ctx, id)
1452	if err == nil {
1453		learnerPromoteSucceed.Inc()
1454		return resp, nil
1455	}
1456	if err != ErrNotLeader {
1457		learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
1458		return resp, err
1459	}
1460
1461	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
1462	defer cancel()
1463	// forward to leader
1464	for cctx.Err() == nil {
1465		leader, err := s.waitLeader(cctx)
1466		if err != nil {
1467			return nil, err
1468		}
1469		for _, url := range leader.PeerURLs {
1470			resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
1471			if err == nil {
1472				return resp, nil
1473			}
1474			// If member promotion failed, return early. Otherwise keep retry.
1475			if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
1476				return nil, err
1477			}
1478		}
1479	}
1480
1481	if cctx.Err() == context.DeadlineExceeded {
1482		return nil, ErrTimeout
1483	}
1484	return nil, ErrCanceled
1485}
1486
1487// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
1488// request to raft.
1489// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
1490// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
1491// local node is leader (therefore has enough information) but decided the learner node is not ready
1492// to be promoted.
1493func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1494	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1495		return nil, err
1496	}
1497
1498	// check if we can promote this learner.
1499	if err := s.mayPromoteMember(types.ID(id)); err != nil {
1500		return nil, err
1501	}
1502
1503	// build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
1504	promoteChangeContext := membership.ConfigChangeContext{
1505		Member: membership.Member{
1506			ID: types.ID(id),
1507		},
1508		IsPromote: true,
1509	}
1510
1511	b, err := json.Marshal(promoteChangeContext)
1512	if err != nil {
1513		return nil, err
1514	}
1515
1516	cc := raftpb.ConfChange{
1517		Type:    raftpb.ConfChangeAddNode,
1518		NodeID:  id,
1519		Context: b,
1520	}
1521
1522	return s.configure(ctx, cc)
1523}
1524
1525func (s *EtcdServer) mayPromoteMember(id types.ID) error {
1526	lg := s.getLogger()
1527	err := s.isLearnerReady(uint64(id))
1528	if err != nil {
1529		return err
1530	}
1531
1532	if !s.Cfg.StrictReconfigCheck {
1533		return nil
1534	}
1535	if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
1536		lg.Warn(
1537			"rejecting member promote request; not enough healthy members",
1538			zap.String("local-member-id", s.ID().String()),
1539			zap.String("requested-member-remove-id", id.String()),
1540			zap.Error(ErrNotEnoughStartedMembers),
1541		)
1542		return ErrNotEnoughStartedMembers
1543	}
1544
1545	return nil
1546}
1547
1548// check whether the learner catches up with leader or not.
1549// Note: it will return nil if member is not found in cluster or if member is not learner.
1550// These two conditions will be checked before apply phase later.
1551func (s *EtcdServer) isLearnerReady(id uint64) error {
1552	rs := s.raftStatus()
1553
1554	// leader's raftStatus.Progress is not nil
1555	if rs.Progress == nil {
1556		return ErrNotLeader
1557	}
1558
1559	var learnerMatch uint64
1560	isFound := false
1561	leaderID := rs.ID
1562	for memberID, progress := range rs.Progress {
1563		if id == memberID {
1564			// check its status
1565			learnerMatch = progress.Match
1566			isFound = true
1567			break
1568		}
1569	}
1570
1571	if isFound {
1572		leaderMatch := rs.Progress[leaderID].Match
1573		// the learner's Match not caught up with leader yet
1574		if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
1575			return ErrLearnerNotReady
1576		}
1577	}
1578
1579	return nil
1580}
1581
1582func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1583	if !s.Cfg.StrictReconfigCheck {
1584		return nil
1585	}
1586
1587	lg := s.getLogger()
1588	isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
1589	// no need to check quorum when removing non-voting member
1590	if isLearner {
1591		return nil
1592	}
1593
1594	if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
1595		lg.Warn(
1596			"rejecting member remove request; not enough healthy members",
1597			zap.String("local-member-id", s.ID().String()),
1598			zap.String("requested-member-remove-id", id.String()),
1599			zap.Error(ErrNotEnoughStartedMembers),
1600		)
1601		return ErrNotEnoughStartedMembers
1602	}
1603
1604	// downed member is safe to remove since it's not part of the active quorum
1605	if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1606		return nil
1607	}
1608
1609	// protect quorum if some members are down
1610	m := s.cluster.VotingMembers()
1611	active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1612	if (active - 1) < 1+((len(m)-1)/2) {
1613		lg.Warn(
1614			"rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
1615			zap.String("local-member-id", s.ID().String()),
1616			zap.String("requested-member-remove", id.String()),
1617			zap.Int("active-peers", active),
1618			zap.Error(ErrUnhealthy),
1619		)
1620		return ErrUnhealthy
1621	}
1622
1623	return nil
1624}
1625
1626func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1627	b, merr := json.Marshal(memb)
1628	if merr != nil {
1629		return nil, merr
1630	}
1631
1632	if err := s.checkMembershipOperationPermission(ctx); err != nil {
1633		return nil, err
1634	}
1635	cc := raftpb.ConfChange{
1636		Type:    raftpb.ConfChangeUpdateNode,
1637		NodeID:  uint64(memb.ID),
1638		Context: b,
1639	}
1640	return s.configure(ctx, cc)
1641}
1642
1643func (s *EtcdServer) setCommittedIndex(v uint64) {
1644	atomic.StoreUint64(&s.committedIndex, v)
1645}
1646
1647func (s *EtcdServer) getCommittedIndex() uint64 {
1648	return atomic.LoadUint64(&s.committedIndex)
1649}
1650
1651func (s *EtcdServer) setAppliedIndex(v uint64) {
1652	atomic.StoreUint64(&s.appliedIndex, v)
1653}
1654
1655func (s *EtcdServer) getAppliedIndex() uint64 {
1656	return atomic.LoadUint64(&s.appliedIndex)
1657}
1658
1659func (s *EtcdServer) setTerm(v uint64) {
1660	atomic.StoreUint64(&s.term, v)
1661}
1662
1663func (s *EtcdServer) getTerm() uint64 {
1664	return atomic.LoadUint64(&s.term)
1665}
1666
1667func (s *EtcdServer) setLead(v uint64) {
1668	atomic.StoreUint64(&s.lead, v)
1669}
1670
1671func (s *EtcdServer) getLead() uint64 {
1672	return atomic.LoadUint64(&s.lead)
1673}
1674
1675func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
1676	s.leaderChangedMu.RLock()
1677	defer s.leaderChangedMu.RUnlock()
1678	return s.leaderChanged
1679}
1680
1681// RaftStatusGetter represents etcd server and Raft progress.
1682type RaftStatusGetter interface {
1683	ID() types.ID
1684	Leader() types.ID
1685	CommittedIndex() uint64
1686	AppliedIndex() uint64
1687	Term() uint64
1688}
1689
1690func (s *EtcdServer) ID() types.ID { return s.id }
1691
1692func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
1693
1694func (s *EtcdServer) Lead() uint64 { return s.getLead() }
1695
1696func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
1697
1698func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
1699
1700func (s *EtcdServer) Term() uint64 { return s.getTerm() }
1701
1702type confChangeResponse struct {
1703	membs []*membership.Member
1704	err   error
1705}
1706
1707// configure sends a configuration change through consensus and
1708// then waits for it to be applied to the server. It
1709// will block until the change is performed or there is an error.
1710func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1711	lg := s.getLogger()
1712	cc.ID = s.reqIDGen.Next()
1713	ch := s.w.Register(cc.ID)
1714
1715	start := time.Now()
1716	if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1717		s.w.Trigger(cc.ID, nil)
1718		return nil, err
1719	}
1720
1721	select {
1722	case x := <-ch:
1723		if x == nil {
1724			lg.Panic("failed to configure")
1725		}
1726		resp := x.(*confChangeResponse)
1727		lg.Info(
1728			"applied a configuration change through raft",
1729			zap.String("local-member-id", s.ID().String()),
1730			zap.String("raft-conf-change", cc.Type.String()),
1731			zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
1732		)
1733		return resp.membs, resp.err
1734
1735	case <-ctx.Done():
1736		s.w.Trigger(cc.ID, nil) // GC wait
1737		return nil, s.parseProposeCtxErr(ctx.Err(), start)
1738
1739	case <-s.stopping:
1740		return nil, ErrStopped
1741	}
1742}
1743
1744// sync proposes a SYNC request and is non-blocking.
1745// This makes no guarantee that the request will be proposed or performed.
1746// The request will be canceled after the given timeout.
1747func (s *EtcdServer) sync(timeout time.Duration) {
1748	req := pb.Request{
1749		Method: "SYNC",
1750		ID:     s.reqIDGen.Next(),
1751		Time:   time.Now().UnixNano(),
1752	}
1753	data := pbutil.MustMarshal(&req)
1754	// There is no promise that node has leader when do SYNC request,
1755	// so it uses goroutine to propose.
1756	ctx, cancel := context.WithTimeout(s.ctx, timeout)
1757	s.goAttach(func() {
1758		s.r.Propose(ctx, data)
1759		cancel()
1760	})
1761}
1762
1763// publishV3 registers server information into the cluster using v3 request. The
1764// information is the JSON representation of this server's member struct, updated
1765// with the static clientURLs of the server.
1766// The function keeps attempting to register until it succeeds,
1767// or its server is stopped.
1768// TODO: replace publish() in 3.6
1769func (s *EtcdServer) publishV3(timeout time.Duration) {
1770	req := &membershippb.ClusterMemberAttrSetRequest{
1771		Member_ID: uint64(s.id),
1772		MemberAttributes: &membershippb.Attributes{
1773			Name:       s.attributes.Name,
1774			ClientUrls: s.attributes.ClientURLs,
1775		},
1776	}
1777	lg := s.getLogger()
1778	for {
1779		select {
1780		case <-s.stopping:
1781			lg.Warn(
1782				"stopped publish because server is stopping",
1783				zap.String("local-member-id", s.ID().String()),
1784				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1785				zap.Duration("publish-timeout", timeout),
1786			)
1787			return
1788
1789		default:
1790		}
1791
1792		ctx, cancel := context.WithTimeout(s.ctx, timeout)
1793		_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
1794		cancel()
1795		switch err {
1796		case nil:
1797			close(s.readych)
1798			lg.Info(
1799				"published local member to cluster through raft",
1800				zap.String("local-member-id", s.ID().String()),
1801				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1802				zap.String("cluster-id", s.cluster.ID().String()),
1803				zap.Duration("publish-timeout", timeout),
1804			)
1805			return
1806
1807		default:
1808			lg.Warn(
1809				"failed to publish local member to cluster through raft",
1810				zap.String("local-member-id", s.ID().String()),
1811				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1812				zap.Duration("publish-timeout", timeout),
1813				zap.Error(err),
1814			)
1815		}
1816	}
1817}
1818
1819// publish registers server information into the cluster. The information
1820// is the JSON representation of this server's member struct, updated with the
1821// static clientURLs of the server.
1822// The function keeps attempting to register until it succeeds,
1823// or its server is stopped.
1824//
1825// Use v2 store to encode member attributes, and apply through Raft
1826// but does not go through v2 API endpoint, which means even with v2
1827// client handler disabled (e.g. --enable-v2=false), cluster can still
1828// process publish requests through rafthttp
1829// TODO: Deprecate v2 store in 3.6
1830func (s *EtcdServer) publish(timeout time.Duration) {
1831	lg := s.getLogger()
1832	b, err := json.Marshal(s.attributes)
1833	if err != nil {
1834		lg.Panic("failed to marshal JSON", zap.Error(err))
1835		return
1836	}
1837	req := pb.Request{
1838		Method: "PUT",
1839		Path:   membership.MemberAttributesStorePath(s.id),
1840		Val:    string(b),
1841	}
1842
1843	for {
1844		ctx, cancel := context.WithTimeout(s.ctx, timeout)
1845		_, err := s.Do(ctx, req)
1846		cancel()
1847		switch err {
1848		case nil:
1849			close(s.readych)
1850			lg.Info(
1851				"published local member to cluster through raft",
1852				zap.String("local-member-id", s.ID().String()),
1853				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1854				zap.String("request-path", req.Path),
1855				zap.String("cluster-id", s.cluster.ID().String()),
1856				zap.Duration("publish-timeout", timeout),
1857			)
1858			return
1859
1860		case ErrStopped:
1861			lg.Warn(
1862				"stopped publish because server is stopped",
1863				zap.String("local-member-id", s.ID().String()),
1864				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1865				zap.Duration("publish-timeout", timeout),
1866				zap.Error(err),
1867			)
1868			return
1869
1870		default:
1871			lg.Warn(
1872				"failed to publish local member to cluster through raft",
1873				zap.String("local-member-id", s.ID().String()),
1874				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
1875				zap.String("request-path", req.Path),
1876				zap.Duration("publish-timeout", timeout),
1877				zap.Error(err),
1878			)
1879		}
1880	}
1881}
1882
1883func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
1884	atomic.AddInt64(&s.inflightSnapshots, 1)
1885
1886	lg := s.getLogger()
1887	fields := []zap.Field{
1888		zap.String("from", s.ID().String()),
1889		zap.String("to", types.ID(merged.To).String()),
1890		zap.Int64("bytes", merged.TotalSize),
1891		zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
1892	}
1893
1894	now := time.Now()
1895	s.r.transport.SendSnapshot(merged)
1896	lg.Info("sending merged snapshot", fields...)
1897
1898	s.goAttach(func() {
1899		select {
1900		case ok := <-merged.CloseNotify():
1901			// delay releasing inflight snapshot for another 30 seconds to
1902			// block log compaction.
1903			// If the follower still fails to catch up, it is probably just too slow
1904			// to catch up. We cannot avoid the snapshot cycle anyway.
1905			if ok {
1906				select {
1907				case <-time.After(releaseDelayAfterSnapshot):
1908				case <-s.stopping:
1909				}
1910			}
1911
1912			atomic.AddInt64(&s.inflightSnapshots, -1)
1913
1914			lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
1915
1916		case <-s.stopping:
1917			lg.Warn("canceled sending merged snapshot; server stopping", fields...)
1918			return
1919		}
1920	})
1921}
1922
1923// apply takes entries received from Raft (after it has been committed) and
1924// applies them to the current state of the EtcdServer.
1925// The given entries should not be empty.
1926func (s *EtcdServer) apply(
1927	es []raftpb.Entry,
1928	confState *raftpb.ConfState,
1929) (appliedt uint64, appliedi uint64, shouldStop bool) {
1930	for i := range es {
1931		e := es[i]
1932		switch e.Type {
1933		case raftpb.EntryNormal:
1934			s.applyEntryNormal(&e)
1935			s.setAppliedIndex(e.Index)
1936			s.setTerm(e.Term)
1937
1938		case raftpb.EntryConfChange:
1939			// set the consistent index of current executing entry
1940			if e.Index > s.consistIndex.ConsistentIndex() {
1941				s.consistIndex.SetConsistentIndex(e.Index)
1942			}
1943			var cc raftpb.ConfChange
1944			pbutil.MustUnmarshal(&cc, e.Data)
1945			removedSelf, err := s.applyConfChange(cc, confState)
1946			s.setAppliedIndex(e.Index)
1947			s.setTerm(e.Term)
1948			shouldStop = shouldStop || removedSelf
1949			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
1950
1951		default:
1952			lg := s.getLogger()
1953			lg.Panic(
1954				"unknown entry type; must be either EntryNormal or EntryConfChange",
1955				zap.String("type", e.Type.String()),
1956			)
1957		}
1958		appliedi, appliedt = e.Index, e.Term
1959	}
1960	return appliedt, appliedi, shouldStop
1961}
1962
1963// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
1964func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
1965	shouldApplyV3 := false
1966	index := s.consistIndex.ConsistentIndex()
1967	if e.Index > index {
1968		// set the consistent index of current executing entry
1969		s.consistIndex.SetConsistentIndex(e.Index)
1970		shouldApplyV3 = true
1971	}
1972	s.lg.Debug("apply entry normal",
1973		zap.Uint64("consistent-index", index),
1974		zap.Uint64("entry-index", e.Index),
1975		zap.Bool("should-applyV3", shouldApplyV3))
1976
1977	// raft state machine may generate noop entry when leader confirmation.
1978	// skip it in advance to avoid some potential bug in the future
1979	if len(e.Data) == 0 {
1980		select {
1981		case s.forceVersionC <- struct{}{}:
1982		default:
1983		}
1984		// promote lessor when the local member is leader and finished
1985		// applying all entries from the last term.
1986		if s.isLeader() {
1987			s.lessor.Promote(s.Cfg.electionTimeout())
1988		}
1989		return
1990	}
1991
1992	var raftReq pb.InternalRaftRequest
1993	if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
1994		var r pb.Request
1995		rp := &r
1996		pbutil.MustUnmarshal(rp, e.Data)
1997		s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
1998		return
1999	}
2000	if raftReq.V2 != nil {
2001		req := (*RequestV2)(raftReq.V2)
2002		s.w.Trigger(req.ID, s.applyV2Request(req))
2003		return
2004	}
2005	// do not re-apply applied entries.
2006	if !shouldApplyV3 {
2007		return
2008	}
2009
2010	id := raftReq.ID
2011	if id == 0 {
2012		id = raftReq.Header.ID
2013	}
2014
2015	var ar *applyResult
2016	needResult := s.w.IsRegistered(id)
2017	if needResult || !noSideEffect(&raftReq) {
2018		if !needResult && raftReq.Txn != nil {
2019			removeNeedlessRangeReqs(raftReq.Txn)
2020		}
2021		ar = s.applyV3.Apply(&raftReq)
2022	}
2023
2024	if ar == nil {
2025		return
2026	}
2027
2028	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
2029		s.w.Trigger(id, ar)
2030		return
2031	}
2032
2033	lg := s.getLogger()
2034	lg.Warn(
2035		"message exceeded backend quota; raising alarm",
2036		zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
2037		zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
2038		zap.Error(ar.err),
2039	)
2040
2041	s.goAttach(func() {
2042		a := &pb.AlarmRequest{
2043			MemberID: uint64(s.ID()),
2044			Action:   pb.AlarmRequest_ACTIVATE,
2045			Alarm:    pb.AlarmType_NOSPACE,
2046		}
2047		s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
2048		s.w.Trigger(id, ar)
2049	})
2050}
2051
2052// applyConfChange applies a ConfChange to the server. It is only
2053// invoked with a ConfChange that has already passed through Raft
2054func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
2055	if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
2056		cc.NodeID = raft.None
2057		s.r.ApplyConfChange(cc)
2058		return false, err
2059	}
2060
2061	lg := s.getLogger()
2062	*confState = *s.r.ApplyConfChange(cc)
2063	switch cc.Type {
2064	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
2065		confChangeContext := new(membership.ConfigChangeContext)
2066		if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
2067			lg.Panic("failed to unmarshal member", zap.Error(err))
2068		}
2069		if cc.NodeID != uint64(confChangeContext.Member.ID) {
2070			lg.Panic(
2071				"got different member ID",
2072				zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2073				zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
2074			)
2075		}
2076		if confChangeContext.IsPromote {
2077			s.cluster.PromoteMember(confChangeContext.Member.ID)
2078		} else {
2079			s.cluster.AddMember(&confChangeContext.Member)
2080
2081			if confChangeContext.Member.ID != s.id {
2082				s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
2083			}
2084		}
2085
2086		// update the isLearner metric when this server id is equal to the id in raft member confChange
2087		if confChangeContext.Member.ID == s.id {
2088			if cc.Type == raftpb.ConfChangeAddLearnerNode {
2089				isLearner.Set(1)
2090			} else {
2091				isLearner.Set(0)
2092			}
2093		}
2094
2095	case raftpb.ConfChangeRemoveNode:
2096		id := types.ID(cc.NodeID)
2097		s.cluster.RemoveMember(id)
2098		if id == s.id {
2099			return true, nil
2100		}
2101		s.r.transport.RemovePeer(id)
2102
2103	case raftpb.ConfChangeUpdateNode:
2104		m := new(membership.Member)
2105		if err := json.Unmarshal(cc.Context, m); err != nil {
2106			lg.Panic("failed to unmarshal member", zap.Error(err))
2107		}
2108		if cc.NodeID != uint64(m.ID) {
2109			lg.Panic(
2110				"got different member ID",
2111				zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2112				zap.String("member-id-from-message", m.ID.String()),
2113			)
2114		}
2115		s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
2116		if m.ID != s.id {
2117			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
2118		}
2119	}
2120	return false, nil
2121}
2122
2123// TODO: non-blocking snapshot
2124func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2125	clone := s.v2store.Clone()
2126	// commit kv to write metadata (for example: consistent index) to disk.
2127	// KV().commit() updates the consistent index in backend.
2128	// All operations that update consistent index must be called sequentially
2129	// from applyAll function.
2130	// So KV().Commit() cannot run in parallel with apply. It has to be called outside
2131	// the go routine created below.
2132	s.KV().Commit()
2133
2134	s.goAttach(func() {
2135		lg := s.getLogger()
2136
2137		d, err := clone.SaveNoCopy()
2138		// TODO: current store will never fail to do a snapshot
2139		// what should we do if the store might fail?
2140		if err != nil {
2141			lg.Panic("failed to save v2 store", zap.Error(err))
2142		}
2143		snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2144		if err != nil {
2145			// the snapshot was done asynchronously with the progress of raft.
2146			// raft might have already got a newer snapshot.
2147			if err == raft.ErrSnapOutOfDate {
2148				return
2149			}
2150			lg.Panic("failed to create snapshot", zap.Error(err))
2151		}
2152		// SaveSnap saves the snapshot and releases the locked wal files
2153		// to the snapshot index.
2154		if err = s.r.storage.SaveSnap(snap); err != nil {
2155			lg.Panic("failed to save snapshot", zap.Error(err))
2156		}
2157		lg.Info(
2158			"saved snapshot",
2159			zap.Uint64("snapshot-index", snap.Metadata.Index),
2160		)
2161
2162		// When sending a snapshot, etcd will pause compaction.
2163		// After receives a snapshot, the slow follower needs to get all the entries right after
2164		// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
2165		// the snapshot sent might already be compacted. It happens when the snapshot takes long time
2166		// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
2167		if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
2168			lg.Info("skip compaction since there is an inflight snapshot")
2169			return
2170		}
2171
2172		// keep some in memory log entries for slow followers.
2173		compacti := uint64(1)
2174		if snapi > s.Cfg.SnapshotCatchUpEntries {
2175			compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2176		}
2177
2178		err = s.r.raftStorage.Compact(compacti)
2179		if err != nil {
2180			// the compaction was done asynchronously with the progress of raft.
2181			// raft log might already been compact.
2182			if err == raft.ErrCompacted {
2183				return
2184			}
2185			lg.Panic("failed to compact", zap.Error(err))
2186		}
2187		lg.Info(
2188			"compacted Raft logs",
2189			zap.Uint64("compact-index", compacti),
2190		)
2191	})
2192}
2193
2194// CutPeer drops messages to the specified peer.
2195func (s *EtcdServer) CutPeer(id types.ID) {
2196	tr, ok := s.r.transport.(*rafthttp.Transport)
2197	if ok {
2198		tr.CutPeer(id)
2199	}
2200}
2201
2202// MendPeer recovers the message dropping behavior of the given peer.
2203func (s *EtcdServer) MendPeer(id types.ID) {
2204	tr, ok := s.r.transport.(*rafthttp.Transport)
2205	if ok {
2206		tr.MendPeer(id)
2207	}
2208}
2209
2210func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
2211
2212func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
2213
2214func (s *EtcdServer) ClusterVersion() *semver.Version {
2215	if s.cluster == nil {
2216		return nil
2217	}
2218	return s.cluster.Version()
2219}
2220
2221// monitorVersions checks the member's version every monitorVersionInterval.
2222// It updates the cluster version if all members agrees on a higher one.
2223// It prints out log if there is a member with a higher version than the
2224// local version.
2225func (s *EtcdServer) monitorVersions() {
2226	for {
2227		select {
2228		case <-s.forceVersionC:
2229		case <-time.After(monitorVersionInterval):
2230		case <-s.stopping:
2231			return
2232		}
2233
2234		if s.Leader() != s.ID() {
2235			continue
2236		}
2237
2238		v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt))
2239		if v != nil {
2240			// only keep major.minor version for comparison
2241			v = &semver.Version{
2242				Major: v.Major,
2243				Minor: v.Minor,
2244			}
2245		}
2246
2247		// if the current version is nil:
2248		// 1. use the decided version if possible
2249		// 2. or use the min cluster version
2250		if s.cluster.Version() == nil {
2251			verStr := version.MinClusterVersion
2252			if v != nil {
2253				verStr = v.String()
2254			}
2255			s.goAttach(func() { s.updateClusterVersion(verStr) })
2256			continue
2257		}
2258
2259		// update cluster version only if the decided version is greater than
2260		// the current cluster version
2261		if v != nil && s.cluster.Version().LessThan(*v) {
2262			s.goAttach(func() { s.updateClusterVersion(v.String()) })
2263		}
2264	}
2265}
2266
2267func (s *EtcdServer) updateClusterVersion(ver string) {
2268	lg := s.getLogger()
2269
2270	if s.cluster.Version() == nil {
2271		lg.Info(
2272			"setting up initial cluster version",
2273			zap.String("cluster-version", version.Cluster(ver)),
2274		)
2275	} else {
2276		lg.Info(
2277			"updating cluster version",
2278			zap.String("from", version.Cluster(s.cluster.Version().String())),
2279			zap.String("to", version.Cluster(ver)),
2280		)
2281	}
2282
2283	req := membershippb.ClusterVersionSetRequest{Ver: ver}
2284
2285	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
2286	_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
2287	cancel()
2288
2289	switch err {
2290	case nil:
2291		lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
2292		return
2293
2294	case ErrStopped:
2295		lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
2296		return
2297
2298	default:
2299		lg.Warn("failed to update cluster version", zap.Error(err))
2300	}
2301}
2302
2303func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
2304	switch err {
2305	case context.Canceled:
2306		return ErrCanceled
2307
2308	case context.DeadlineExceeded:
2309		s.leadTimeMu.RLock()
2310		curLeadElected := s.leadElectedTime
2311		s.leadTimeMu.RUnlock()
2312		prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
2313		if start.After(prevLeadLost) && start.Before(curLeadElected) {
2314			return ErrTimeoutDueToLeaderFail
2315		}
2316		lead := types.ID(s.getLead())
2317		switch lead {
2318		case types.ID(raft.None):
2319			// TODO: return error to specify it happens because the cluster does not have leader now
2320		case s.ID():
2321			if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
2322				return ErrTimeoutDueToConnectionLost
2323			}
2324		default:
2325			if !isConnectedSince(s.r.transport, start, lead) {
2326				return ErrTimeoutDueToConnectionLost
2327			}
2328		}
2329		return ErrTimeout
2330
2331	default:
2332		return err
2333	}
2334}
2335
2336func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
2337func (s *EtcdServer) Backend() backend.Backend {
2338	s.bemu.Lock()
2339	defer s.bemu.Unlock()
2340	return s.be
2341}
2342
2343func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
2344
2345func (s *EtcdServer) restoreAlarms() error {
2346	s.applyV3 = s.newApplierV3()
2347	as, err := v3alarm.NewAlarmStore(s.lg, s)
2348	if err != nil {
2349		return err
2350	}
2351	s.alarmStore = as
2352	if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
2353		s.applyV3 = newApplierV3Capped(s.applyV3)
2354	}
2355	if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
2356		s.applyV3 = newApplierV3Corrupt(s.applyV3)
2357	}
2358	return nil
2359}
2360
2361// goAttach creates a goroutine on a given function and tracks it using
2362// the etcdserver waitgroup.
2363func (s *EtcdServer) goAttach(f func()) {
2364	s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
2365	defer s.wgMu.RUnlock()
2366	select {
2367	case <-s.stopping:
2368		lg := s.getLogger()
2369		lg.Warn("server has stopped; skipping goAttach")
2370		return
2371	default:
2372	}
2373
2374	// now safe to add since waitgroup wait has not started yet
2375	s.wg.Add(1)
2376	go func() {
2377		defer s.wg.Done()
2378		f()
2379	}()
2380}
2381
2382func (s *EtcdServer) Alarms() []*pb.AlarmMember {
2383	return s.alarmStore.Get(pb.AlarmType_NONE)
2384}
2385
2386func (s *EtcdServer) Logger() *zap.Logger {
2387	return s.lg
2388}
2389
2390// IsLearner returns if the local member is raft learner
2391func (s *EtcdServer) IsLearner() bool {
2392	return s.cluster.IsLocalMemberLearner()
2393}
2394
2395// IsMemberExist returns if the member with the given id exists in cluster.
2396func (s *EtcdServer) IsMemberExist(id types.ID) bool {
2397	return s.cluster.IsMemberExist(id)
2398}
2399
2400// raftStatus returns the raft status of this etcd node.
2401func (s *EtcdServer) raftStatus() raft.Status {
2402	return s.r.Node.Status()
2403}
2404