1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "context" 19 "encoding/json" 20 "expvar" 21 "fmt" 22 "math" 23 "math/rand" 24 "net/http" 25 "os" 26 "path" 27 "regexp" 28 "sync" 29 "sync/atomic" 30 "time" 31 32 "go.etcd.io/etcd/auth" 33 "go.etcd.io/etcd/etcdserver/api" 34 "go.etcd.io/etcd/etcdserver/api/membership" 35 "go.etcd.io/etcd/etcdserver/api/membership/membershippb" 36 "go.etcd.io/etcd/etcdserver/api/rafthttp" 37 "go.etcd.io/etcd/etcdserver/api/snap" 38 "go.etcd.io/etcd/etcdserver/api/v2discovery" 39 "go.etcd.io/etcd/etcdserver/api/v2http/httptypes" 40 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 41 "go.etcd.io/etcd/etcdserver/api/v2store" 42 "go.etcd.io/etcd/etcdserver/api/v3alarm" 43 "go.etcd.io/etcd/etcdserver/api/v3compactor" 44 "go.etcd.io/etcd/etcdserver/cindex" 45 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 46 "go.etcd.io/etcd/lease" 47 "go.etcd.io/etcd/lease/leasehttp" 48 "go.etcd.io/etcd/mvcc" 49 "go.etcd.io/etcd/mvcc/backend" 50 "go.etcd.io/etcd/pkg/fileutil" 51 "go.etcd.io/etcd/pkg/idutil" 52 "go.etcd.io/etcd/pkg/pbutil" 53 "go.etcd.io/etcd/pkg/runtime" 54 "go.etcd.io/etcd/pkg/schedule" 55 "go.etcd.io/etcd/pkg/traceutil" 56 "go.etcd.io/etcd/pkg/types" 57 "go.etcd.io/etcd/pkg/wait" 58 "go.etcd.io/etcd/raft" 59 "go.etcd.io/etcd/raft/raftpb" 60 "go.etcd.io/etcd/version" 61 "go.etcd.io/etcd/wal" 62 63 "github.com/coreos/go-semver/semver" 64 humanize "github.com/dustin/go-humanize" 65 "github.com/prometheus/client_golang/prometheus" 66 "go.uber.org/zap" 67) 68 69const ( 70 DefaultSnapshotCount = 100000 71 72 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower 73 // to catch-up after compacting the raft storage entries. 74 // We expect the follower has a millisecond level latency with the leader. 75 // The max throughput is around 10K. Keep a 5K entries is enough for helping 76 // follower to catch up. 77 DefaultSnapshotCatchUpEntries uint64 = 5000 78 79 StoreClusterPrefix = "/0" 80 StoreKeysPrefix = "/1" 81 82 // HealthInterval is the minimum time the cluster should be healthy 83 // before accepting add member requests. 84 HealthInterval = 5 * time.Second 85 86 purgeFileInterval = 30 * time.Second 87 // monitorVersionInterval should be smaller than the timeout 88 // on the connection. Or we will not be able to reuse the connection 89 // (since it will timeout). 90 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second 91 92 // max number of in-flight snapshot messages etcdserver allows to have 93 // This number is more than enough for most clusters with 5 machines. 94 maxInFlightMsgSnap = 16 95 96 releaseDelayAfterSnapshot = 30 * time.Second 97 98 // maxPendingRevokes is the maximum number of outstanding expired lease revocations. 99 maxPendingRevokes = 16 100 101 recommendedMaxRequestBytes = 10 * 1024 * 1024 102 103 readyPercent = 0.9 104) 105 106var ( 107 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) 108) 109 110func init() { 111 rand.Seed(time.Now().UnixNano()) 112 113 expvar.Publish( 114 "file_descriptor_limit", 115 expvar.Func( 116 func() interface{} { 117 n, _ := runtime.FDLimit() 118 return n 119 }, 120 ), 121 ) 122} 123 124type Response struct { 125 Term uint64 126 Index uint64 127 Event *v2store.Event 128 Watcher v2store.Watcher 129 Err error 130} 131 132type ServerV2 interface { 133 Server 134 Leader() types.ID 135 136 // Do takes a V2 request and attempts to fulfill it, returning a Response. 137 Do(ctx context.Context, r pb.Request) (Response, error) 138 stats.Stats 139 ClientCertAuthEnabled() bool 140} 141 142type ServerV3 interface { 143 Server 144 RaftStatusGetter 145} 146 147func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled } 148 149type Server interface { 150 // AddMember attempts to add a member into the cluster. It will return 151 // ErrIDRemoved if member ID is removed from the cluster, or return 152 // ErrIDExists if member ID exists in the cluster. 153 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) 154 // RemoveMember attempts to remove a member from the cluster. It will 155 // return ErrIDRemoved if member ID is removed from the cluster, or return 156 // ErrIDNotFound if member ID is not in the cluster. 157 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) 158 // UpdateMember attempts to update an existing member in the cluster. It will 159 // return ErrIDNotFound if the member ID does not exist. 160 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) 161 // PromoteMember attempts to promote a non-voting node to a voting node. It will 162 // return ErrIDNotFound if the member ID does not exist. 163 // return ErrLearnerNotReady if the member are not ready. 164 // return ErrMemberNotLearner if the member is not a learner. 165 PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) 166 167 // ClusterVersion is the cluster-wide minimum major.minor version. 168 // Cluster version is set to the min version that an etcd member is 169 // compatible with when first bootstrap. 170 // 171 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum). 172 // 173 // During a rolling upgrades, the ClusterVersion will be updated 174 // automatically after a sync. (5 second by default) 175 // 176 // The API/raft component can utilize ClusterVersion to determine if 177 // it can accept a client request or a raft RPC. 178 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and 179 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since 180 // this feature is introduced post 2.0. 181 ClusterVersion() *semver.Version 182 Cluster() api.Cluster 183 Alarms() []*pb.AlarmMember 184} 185 186// EtcdServer is the production implementation of the Server interface 187type EtcdServer struct { 188 // inflightSnapshots holds count the number of snapshots currently inflight. 189 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. 190 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. 191 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. 192 term uint64 // must use atomic operations to access; keep 64-bit aligned. 193 lead uint64 // must use atomic operations to access; keep 64-bit aligned. 194 195 consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex 196 r raftNode // uses 64-bit atomics; keep 64-bit aligned. 197 198 readych chan struct{} 199 Cfg ServerConfig 200 201 lgMu *sync.RWMutex 202 lg *zap.Logger 203 204 w wait.Wait 205 206 readMu sync.RWMutex 207 // read routine notifies etcd server that it waits for reading by sending an empty struct to 208 // readwaitC 209 readwaitc chan struct{} 210 // readNotifier is used to notify the read routine that it can process the request 211 // when there is no error 212 readNotifier *notifier 213 214 // stop signals the run goroutine should shutdown. 215 stop chan struct{} 216 // stopping is closed by run goroutine on shutdown. 217 stopping chan struct{} 218 // done is closed when all goroutines from start() complete. 219 done chan struct{} 220 // leaderChanged is used to notify the linearizable read loop to drop the old read requests. 221 leaderChanged chan struct{} 222 leaderChangedMu sync.RWMutex 223 224 errorc chan error 225 id types.ID 226 attributes membership.Attributes 227 228 cluster *membership.RaftCluster 229 230 v2store v2store.Store 231 snapshotter *snap.Snapshotter 232 233 applyV2 ApplierV2 234 235 // applyV3 is the applier with auth and quotas 236 applyV3 applierV3 237 // applyV3Base is the core applier without auth or quotas 238 applyV3Base applierV3 239 // applyV3Internal is the applier for internal request 240 applyV3Internal applierV3Internal 241 applyWait wait.WaitTime 242 243 kv mvcc.ConsistentWatchableKV 244 lessor lease.Lessor 245 bemu sync.Mutex 246 be backend.Backend 247 authStore auth.AuthStore 248 alarmStore *v3alarm.AlarmStore 249 250 stats *stats.ServerStats 251 lstats *stats.LeaderStats 252 253 SyncTicker *time.Ticker 254 // compactor is used to auto-compact the KV. 255 compactor v3compactor.Compactor 256 257 // peerRt used to send requests (version, lease) to peers. 258 peerRt http.RoundTripper 259 reqIDGen *idutil.Generator 260 261 // forceVersionC is used to force the version monitor loop 262 // to detect the cluster version immediately. 263 forceVersionC chan struct{} 264 265 // wgMu blocks concurrent waitgroup mutation while server stopping 266 wgMu sync.RWMutex 267 // wg is used to wait for the goroutines that depends on the server state 268 // to exit when stopping the server. 269 wg sync.WaitGroup 270 271 // ctx is used for etcd-initiated requests that may need to be canceled 272 // on etcd server shutdown. 273 ctx context.Context 274 cancel context.CancelFunc 275 276 leadTimeMu sync.RWMutex 277 leadElectedTime time.Time 278 279 *AccessController 280} 281 282// NewServer creates a new EtcdServer from the supplied configuration. The 283// configuration is considered static for the lifetime of the EtcdServer. 284func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { 285 st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) 286 287 var ( 288 w *wal.WAL 289 n raft.Node 290 s *raft.MemoryStorage 291 id types.ID 292 cl *membership.RaftCluster 293 ) 294 295 if cfg.MaxRequestBytes > recommendedMaxRequestBytes { 296 cfg.Logger.Warn( 297 "exceeded recommended request limit", 298 zap.Uint("max-request-bytes", cfg.MaxRequestBytes), 299 zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))), 300 zap.Int("recommended-request-bytes", recommendedMaxRequestBytes), 301 zap.String("recommended-request-size", humanize.Bytes(uint64(recommendedMaxRequestBytes))), 302 ) 303 } 304 305 if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { 306 return nil, fmt.Errorf("cannot access data directory: %v", terr) 307 } 308 309 haveWAL := wal.Exist(cfg.WALDir()) 310 311 if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { 312 cfg.Logger.Fatal( 313 "failed to create snapshot directory", 314 zap.String("path", cfg.SnapDir()), 315 zap.Error(err), 316 ) 317 } 318 ss := snap.New(cfg.Logger, cfg.SnapDir()) 319 320 bepath := cfg.backendPath() 321 beExist := fileutil.Exist(bepath) 322 be := openBackend(cfg) 323 324 defer func() { 325 if err != nil { 326 be.Close() 327 } 328 }() 329 330 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) 331 if err != nil { 332 return nil, err 333 } 334 var ( 335 remotes []*membership.Member 336 snapshot *raftpb.Snapshot 337 ) 338 339 switch { 340 case !haveWAL && !cfg.NewCluster: 341 if err = cfg.VerifyJoinExisting(); err != nil { 342 return nil, err 343 } 344 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) 345 if err != nil { 346 return nil, err 347 } 348 existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt) 349 if gerr != nil { 350 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) 351 } 352 if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil { 353 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) 354 } 355 if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) { 356 return nil, fmt.Errorf("incompatible with current running cluster") 357 } 358 359 remotes = existingCluster.Members() 360 cl.SetID(types.ID(0), existingCluster.ID()) 361 cl.SetStore(st) 362 cl.SetBackend(be) 363 id, n, s, w = startNode(cfg, cl, nil) 364 cl.SetID(id, existingCluster.ID()) 365 366 case !haveWAL && cfg.NewCluster: 367 if err = cfg.VerifyBootstrap(); err != nil { 368 return nil, err 369 } 370 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) 371 if err != nil { 372 return nil, err 373 } 374 m := cl.MemberByName(cfg.Name) 375 if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.bootstrapTimeout()) { 376 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) 377 } 378 if cfg.ShouldDiscover() { 379 var str string 380 str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) 381 if err != nil { 382 return nil, &DiscoveryError{Op: "join", Err: err} 383 } 384 var urlsmap types.URLsMap 385 urlsmap, err = types.NewURLsMap(str) 386 if err != nil { 387 return nil, err 388 } 389 if checkDuplicateURL(urlsmap) { 390 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) 391 } 392 if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil { 393 return nil, err 394 } 395 } 396 cl.SetStore(st) 397 cl.SetBackend(be) 398 id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) 399 cl.SetID(id, cl.ID()) 400 401 case haveWAL: 402 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { 403 return nil, fmt.Errorf("cannot write to member directory: %v", err) 404 } 405 406 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { 407 return nil, fmt.Errorf("cannot write to WAL directory: %v", err) 408 } 409 410 if cfg.ShouldDiscover() { 411 cfg.Logger.Warn( 412 "discovery token is ignored since cluster already initialized; valid logs are found", 413 zap.String("wal-dir", cfg.WALDir()), 414 ) 415 } 416 snapshot, err = ss.Load() 417 if err != nil && err != snap.ErrNoSnapshot { 418 return nil, err 419 } 420 if snapshot != nil { 421 if err = st.Recovery(snapshot.Data); err != nil { 422 cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err)) 423 } 424 425 cfg.Logger.Info( 426 "recovered v2 store from snapshot", 427 zap.Uint64("snapshot-index", snapshot.Metadata.Index), 428 zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), 429 ) 430 431 if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil { 432 cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) 433 } 434 s1, s2 := be.Size(), be.SizeInUse() 435 cfg.Logger.Info( 436 "recovered v3 backend from snapshot", 437 zap.Int64("backend-size-bytes", s1), 438 zap.String("backend-size", humanize.Bytes(uint64(s1))), 439 zap.Int64("backend-size-in-use-bytes", s2), 440 zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), 441 ) 442 } 443 444 if !cfg.ForceNewCluster { 445 id, cl, n, s, w = restartNode(cfg, snapshot) 446 } else { 447 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) 448 } 449 450 cl.SetStore(st) 451 cl.SetBackend(be) 452 cl.Recover(api.UpdateCapability) 453 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { 454 os.RemoveAll(bepath) 455 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) 456 } 457 458 default: 459 return nil, fmt.Errorf("unsupported bootstrap config") 460 } 461 462 if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { 463 return nil, fmt.Errorf("cannot access member directory: %v", terr) 464 } 465 466 sstats := stats.NewServerStats(cfg.Name, id.String()) 467 lstats := stats.NewLeaderStats(cfg.Logger, id.String()) 468 469 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond 470 srv = &EtcdServer{ 471 readych: make(chan struct{}), 472 Cfg: cfg, 473 lgMu: new(sync.RWMutex), 474 lg: cfg.Logger, 475 errorc: make(chan error, 1), 476 v2store: st, 477 snapshotter: ss, 478 r: *newRaftNode( 479 raftNodeConfig{ 480 lg: cfg.Logger, 481 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, 482 Node: n, 483 heartbeat: heartbeat, 484 raftStorage: s, 485 storage: NewStorage(w, ss), 486 }, 487 ), 488 id: id, 489 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, 490 cluster: cl, 491 stats: sstats, 492 lstats: lstats, 493 SyncTicker: time.NewTicker(500 * time.Millisecond), 494 peerRt: prt, 495 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), 496 forceVersionC: make(chan struct{}), 497 AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, 498 consistIndex: cindex.NewConsistentIndex(be.BatchTx()), 499 } 500 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) 501 502 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} 503 504 srv.be = be 505 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat 506 507 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. 508 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. 509 srv.lessor = lease.NewLessor( 510 srv.getLogger(), 511 srv.be, 512 lease.LessorConfig{ 513 MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), 514 CheckpointInterval: cfg.LeaseCheckpointInterval, 515 ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), 516 }) 517 518 tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, 519 func(index uint64) <-chan struct{} { 520 return srv.applyWait.Wait(index) 521 }, 522 ) 523 if err != nil { 524 cfg.Logger.Warn("failed to create token provider", zap.Error(err)) 525 return nil, err 526 } 527 srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) 528 kvindex := srv.consistIndex.ConsistentIndex() 529 srv.lg.Debug("restore consistentIndex", 530 zap.Uint64("index", kvindex)) 531 if beExist { 532 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade 533 // etcd from pre-3.0 release. 534 if snapshot != nil && kvindex < snapshot.Metadata.Index { 535 if kvindex != 0 { 536 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index) 537 } 538 cfg.Logger.Warn( 539 "consistent index was never saved", 540 zap.Uint64("snapshot-index", snapshot.Metadata.Index), 541 ) 542 } 543 } 544 545 srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost)) 546 547 newSrv := srv // since srv == nil in defer if srv is returned as nil 548 defer func() { 549 // closing backend without first closing kv can cause 550 // resumed compactions to fail with closed tx errors 551 if err != nil { 552 newSrv.kv.Close() 553 } 554 }() 555 if num := cfg.AutoCompactionRetention; num != 0 { 556 srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) 557 if err != nil { 558 return nil, err 559 } 560 srv.compactor.Run() 561 } 562 563 srv.applyV3Base = srv.newApplierV3Backend() 564 srv.applyV3Internal = srv.newApplierV3Internal() 565 if err = srv.restoreAlarms(); err != nil { 566 return nil, err 567 } 568 569 if srv.Cfg.EnableLeaseCheckpoint { 570 // setting checkpointer enables lease checkpoint feature. 571 srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { 572 srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) 573 }) 574 } 575 576 // TODO: move transport initialization near the definition of remote 577 tr := &rafthttp.Transport{ 578 Logger: cfg.Logger, 579 TLSInfo: cfg.PeerTLSInfo, 580 DialTimeout: cfg.peerDialTimeout(), 581 ID: id, 582 URLs: cfg.PeerURLs, 583 ClusterID: cl.ID(), 584 Raft: srv, 585 Snapshotter: ss, 586 ServerStats: sstats, 587 LeaderStats: lstats, 588 ErrorC: srv.errorc, 589 } 590 if err = tr.Start(); err != nil { 591 return nil, err 592 } 593 // add all remotes into transport 594 for _, m := range remotes { 595 if m.ID != id { 596 tr.AddRemote(m.ID, m.PeerURLs) 597 } 598 } 599 for _, m := range cl.Members() { 600 if m.ID != id { 601 tr.AddPeer(m.ID, m.PeerURLs) 602 } 603 } 604 srv.r.transport = tr 605 606 return srv, nil 607} 608 609func (s *EtcdServer) getLogger() *zap.Logger { 610 s.lgMu.RLock() 611 l := s.lg 612 s.lgMu.RUnlock() 613 return l 614} 615 616func tickToDur(ticks int, tickMs uint) string { 617 return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond) 618} 619 620func (s *EtcdServer) adjustTicks() { 621 lg := s.getLogger() 622 clusterN := len(s.cluster.Members()) 623 624 // single-node fresh start, or single-node recovers from snapshot 625 if clusterN == 1 { 626 ticks := s.Cfg.ElectionTicks - 1 627 lg.Info( 628 "started as single-node; fast-forwarding election ticks", 629 zap.String("local-member-id", s.ID().String()), 630 zap.Int("forward-ticks", ticks), 631 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), 632 zap.Int("election-ticks", s.Cfg.ElectionTicks), 633 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), 634 ) 635 s.r.advanceTicks(ticks) 636 return 637 } 638 639 if !s.Cfg.InitialElectionTickAdvance { 640 lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) 641 return 642 } 643 lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks)) 644 645 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec 646 // until peer connection reports; otherwise: 647 // 1. all connections failed, or 648 // 2. no active peers, or 649 // 3. restarted single-node with no snapshot 650 // then, do nothing, because advancing ticks would have no effect 651 waitTime := rafthttp.ConnReadTimeout 652 itv := 50 * time.Millisecond 653 for i := int64(0); i < int64(waitTime/itv); i++ { 654 select { 655 case <-time.After(itv): 656 case <-s.stopping: 657 return 658 } 659 660 peerN := s.r.transport.ActivePeers() 661 if peerN > 1 { 662 // multi-node received peer connection reports 663 // adjust ticks, in case slow leader message receive 664 ticks := s.Cfg.ElectionTicks - 2 665 666 lg.Info( 667 "initialized peer connections; fast-forwarding election ticks", 668 zap.String("local-member-id", s.ID().String()), 669 zap.Int("forward-ticks", ticks), 670 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), 671 zap.Int("election-ticks", s.Cfg.ElectionTicks), 672 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)), 673 zap.Int("active-remote-members", peerN), 674 ) 675 676 s.r.advanceTicks(ticks) 677 return 678 } 679 } 680} 681 682// Start performs any initialization of the Server necessary for it to 683// begin serving requests. It must be called before Do or Process. 684// Start must be non-blocking; any long-running server functionality 685// should be implemented in goroutines. 686func (s *EtcdServer) Start() { 687 s.start() 688 s.goAttach(func() { s.adjustTicks() }) 689 s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) 690 s.goAttach(s.purgeFile) 691 s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) 692 s.goAttach(s.monitorVersions) 693 s.goAttach(s.linearizableReadLoop) 694 s.goAttach(s.monitorKVHash) 695} 696 697// start prepares and starts server in a new goroutine. It is no longer safe to 698// modify a server's fields after it has been sent to Start. 699// This function is just used for testing. 700func (s *EtcdServer) start() { 701 lg := s.getLogger() 702 703 if s.Cfg.SnapshotCount == 0 { 704 lg.Info( 705 "updating snapshot-count to default", 706 zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount), 707 zap.Uint64("updated-snapshot-count", DefaultSnapshotCount), 708 ) 709 s.Cfg.SnapshotCount = DefaultSnapshotCount 710 } 711 if s.Cfg.SnapshotCatchUpEntries == 0 { 712 lg.Info( 713 "updating snapshot catch-up entries to default", 714 zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries), 715 zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries), 716 ) 717 s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries 718 } 719 720 s.w = wait.New() 721 s.applyWait = wait.NewTimeList() 722 s.done = make(chan struct{}) 723 s.stop = make(chan struct{}) 724 s.stopping = make(chan struct{}) 725 s.ctx, s.cancel = context.WithCancel(context.Background()) 726 s.readwaitc = make(chan struct{}, 1) 727 s.readNotifier = newNotifier() 728 s.leaderChanged = make(chan struct{}) 729 if s.ClusterVersion() != nil { 730 lg.Info( 731 "starting etcd server", 732 zap.String("local-member-id", s.ID().String()), 733 zap.String("local-server-version", version.Version), 734 zap.String("cluster-id", s.Cluster().ID().String()), 735 zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())), 736 ) 737 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1) 738 } else { 739 lg.Info( 740 "starting etcd server", 741 zap.String("local-member-id", s.ID().String()), 742 zap.String("local-server-version", version.Version), 743 zap.String("cluster-version", "to_be_decided"), 744 ) 745 } 746 747 // TODO: if this is an empty log, writes all peer infos 748 // into the first entry 749 go s.run() 750} 751 752func (s *EtcdServer) purgeFile() { 753 lg := s.getLogger() 754 var dberrc, serrc, werrc <-chan error 755 var dbdonec, sdonec, wdonec <-chan struct{} 756 if s.Cfg.MaxSnapFiles > 0 { 757 dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) 758 sdonec, serrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) 759 } 760 if s.Cfg.MaxWALFiles > 0 { 761 wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping) 762 } 763 764 select { 765 case e := <-dberrc: 766 lg.Fatal("failed to purge snap db file", zap.Error(e)) 767 case e := <-serrc: 768 lg.Fatal("failed to purge snap file", zap.Error(e)) 769 case e := <-werrc: 770 lg.Fatal("failed to purge wal file", zap.Error(e)) 771 case <-s.stopping: 772 if dbdonec != nil { 773 <-dbdonec 774 } 775 if sdonec != nil { 776 <-sdonec 777 } 778 if wdonec != nil { 779 <-wdonec 780 } 781 return 782 } 783} 784 785func (s *EtcdServer) Cluster() api.Cluster { return s.cluster } 786 787func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } 788 789type ServerPeer interface { 790 ServerV2 791 RaftHandler() http.Handler 792 LeaseHandler() http.Handler 793} 794 795func (s *EtcdServer) LeaseHandler() http.Handler { 796 if s.lessor == nil { 797 return nil 798 } 799 return leasehttp.NewHandler(s.lessor, s.ApplyWait) 800} 801 802func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } 803 804// Process takes a raft message and applies it to the server's raft state 805// machine, respecting any timeout of the given context. 806func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { 807 lg := s.getLogger() 808 if s.cluster.IsIDRemoved(types.ID(m.From)) { 809 lg.Warn( 810 "rejected Raft message from removed member", 811 zap.String("local-member-id", s.ID().String()), 812 zap.String("removed-member-id", types.ID(m.From).String()), 813 ) 814 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") 815 } 816 if m.Type == raftpb.MsgApp { 817 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) 818 } 819 return s.r.Step(ctx, m) 820} 821 822func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) } 823 824func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } 825 826// ReportSnapshot reports snapshot sent status to the raft state machine, 827// and clears the used snapshot from the snapshot store. 828func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { 829 s.r.ReportSnapshot(id, status) 830} 831 832type etcdProgress struct { 833 confState raftpb.ConfState 834 snapi uint64 835 appliedt uint64 836 appliedi uint64 837} 838 839// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, 840// and helps decouple state machine logic from Raft algorithms. 841// TODO: add a state machine interface to apply the commit entries and do snapshot/recover 842type raftReadyHandler struct { 843 getLead func() (lead uint64) 844 updateLead func(lead uint64) 845 updateLeadership func(newLeader bool) 846 updateCommittedIndex func(uint64) 847} 848 849func (s *EtcdServer) run() { 850 lg := s.getLogger() 851 852 sn, err := s.r.raftStorage.Snapshot() 853 if err != nil { 854 lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) 855 } 856 857 // asynchronously accept apply packets, dispatch progress in-order 858 sched := schedule.NewFIFOScheduler() 859 860 var ( 861 smu sync.RWMutex 862 syncC <-chan time.Time 863 ) 864 setSyncC := func(ch <-chan time.Time) { 865 smu.Lock() 866 syncC = ch 867 smu.Unlock() 868 } 869 getSyncC := func() (ch <-chan time.Time) { 870 smu.RLock() 871 ch = syncC 872 smu.RUnlock() 873 return 874 } 875 rh := &raftReadyHandler{ 876 getLead: func() (lead uint64) { return s.getLead() }, 877 updateLead: func(lead uint64) { s.setLead(lead) }, 878 updateLeadership: func(newLeader bool) { 879 if !s.isLeader() { 880 if s.lessor != nil { 881 s.lessor.Demote() 882 } 883 if s.compactor != nil { 884 s.compactor.Pause() 885 } 886 setSyncC(nil) 887 } else { 888 if newLeader { 889 t := time.Now() 890 s.leadTimeMu.Lock() 891 s.leadElectedTime = t 892 s.leadTimeMu.Unlock() 893 } 894 setSyncC(s.SyncTicker.C) 895 if s.compactor != nil { 896 s.compactor.Resume() 897 } 898 } 899 if newLeader { 900 s.leaderChangedMu.Lock() 901 lc := s.leaderChanged 902 s.leaderChanged = make(chan struct{}) 903 close(lc) 904 s.leaderChangedMu.Unlock() 905 } 906 // TODO: remove the nil checking 907 // current test utility does not provide the stats 908 if s.stats != nil { 909 s.stats.BecomeLeader() 910 } 911 }, 912 updateCommittedIndex: func(ci uint64) { 913 cci := s.getCommittedIndex() 914 if ci > cci { 915 s.setCommittedIndex(ci) 916 } 917 }, 918 } 919 s.r.start(rh) 920 921 ep := etcdProgress{ 922 confState: sn.Metadata.ConfState, 923 snapi: sn.Metadata.Index, 924 appliedt: sn.Metadata.Term, 925 appliedi: sn.Metadata.Index, 926 } 927 928 defer func() { 929 s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping 930 close(s.stopping) 931 s.wgMu.Unlock() 932 s.cancel() 933 934 sched.Stop() 935 936 // wait for gouroutines before closing raft so wal stays open 937 s.wg.Wait() 938 939 s.SyncTicker.Stop() 940 941 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines 942 // by adding a peer after raft stops the transport 943 s.r.stop() 944 945 // kv, lessor and backend can be nil if running without v3 enabled 946 // or running unit tests. 947 if s.lessor != nil { 948 s.lessor.Stop() 949 } 950 if s.kv != nil { 951 s.kv.Close() 952 } 953 if s.authStore != nil { 954 s.authStore.Close() 955 } 956 if s.be != nil { 957 s.be.Close() 958 } 959 if s.compactor != nil { 960 s.compactor.Stop() 961 } 962 close(s.done) 963 }() 964 965 var expiredLeaseC <-chan []*lease.Lease 966 if s.lessor != nil { 967 expiredLeaseC = s.lessor.ExpiredLeasesC() 968 } 969 970 for { 971 select { 972 case ap := <-s.r.apply(): 973 f := func(context.Context) { s.applyAll(&ep, &ap) } 974 sched.Schedule(f) 975 case leases := <-expiredLeaseC: 976 s.goAttach(func() { 977 // Increases throughput of expired leases deletion process through parallelization 978 c := make(chan struct{}, maxPendingRevokes) 979 for _, lease := range leases { 980 select { 981 case c <- struct{}{}: 982 case <-s.stopping: 983 return 984 } 985 lid := lease.ID 986 s.goAttach(func() { 987 ctx := s.authStore.WithRoot(s.ctx) 988 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) 989 if lerr == nil { 990 leaseExpired.Inc() 991 } else { 992 lg.Warn( 993 "failed to revoke lease", 994 zap.String("lease-id", fmt.Sprintf("%016x", lid)), 995 zap.Error(lerr), 996 ) 997 } 998 999 <-c 1000 }) 1001 } 1002 }) 1003 case err := <-s.errorc: 1004 lg.Warn("server error", zap.Error(err)) 1005 lg.Warn("data-dir used by this member must be removed") 1006 return 1007 case <-getSyncC(): 1008 if s.v2store.HasTTLKeys() { 1009 s.sync(s.Cfg.ReqTimeout()) 1010 } 1011 case <-s.stop: 1012 return 1013 } 1014 } 1015} 1016 1017func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { 1018 s.applySnapshot(ep, apply) 1019 s.applyEntries(ep, apply) 1020 1021 proposalsApplied.Set(float64(ep.appliedi)) 1022 s.applyWait.Trigger(ep.appliedi) 1023 1024 // wait for the raft routine to finish the disk writes before triggering a 1025 // snapshot. or applied index might be greater than the last index in raft 1026 // storage, since the raft routine might be slower than apply routine. 1027 <-apply.notifyc 1028 1029 s.triggerSnapshot(ep) 1030 select { 1031 // snapshot requested via send() 1032 case m := <-s.r.msgSnapC: 1033 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState) 1034 s.sendMergedSnap(merged) 1035 default: 1036 } 1037} 1038 1039func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { 1040 if raft.IsEmptySnap(apply.snapshot) { 1041 return 1042 } 1043 applySnapshotInProgress.Inc() 1044 1045 lg := s.getLogger() 1046 lg.Info( 1047 "applying snapshot", 1048 zap.Uint64("current-snapshot-index", ep.snapi), 1049 zap.Uint64("current-applied-index", ep.appliedi), 1050 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), 1051 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), 1052 ) 1053 defer func() { 1054 lg.Info( 1055 "applied snapshot", 1056 zap.Uint64("current-snapshot-index", ep.snapi), 1057 zap.Uint64("current-applied-index", ep.appliedi), 1058 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), 1059 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), 1060 ) 1061 applySnapshotInProgress.Dec() 1062 }() 1063 1064 if apply.snapshot.Metadata.Index <= ep.appliedi { 1065 lg.Panic( 1066 "unexpected leader snapshot from outdated index", 1067 zap.Uint64("current-snapshot-index", ep.snapi), 1068 zap.Uint64("current-applied-index", ep.appliedi), 1069 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index), 1070 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term), 1071 ) 1072 } 1073 1074 // wait for raftNode to persist snapshot onto the disk 1075 <-apply.notifyc 1076 1077 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) 1078 if err != nil { 1079 lg.Panic("failed to open snapshot backend", zap.Error(err)) 1080 } 1081 1082 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. 1083 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. 1084 if s.lessor != nil { 1085 lg.Info("restoring lease store") 1086 1087 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) }) 1088 1089 lg.Info("restored lease store") 1090 } 1091 1092 lg.Info("restoring mvcc store") 1093 1094 if err := s.kv.Restore(newbe); err != nil { 1095 lg.Panic("failed to restore mvcc store", zap.Error(err)) 1096 } 1097 1098 s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) 1099 lg.Info("restored mvcc store") 1100 1101 // Closing old backend might block until all the txns 1102 // on the backend are finished. 1103 // We do not want to wait on closing the old backend. 1104 s.bemu.Lock() 1105 oldbe := s.be 1106 go func() { 1107 lg.Info("closing old backend file") 1108 defer func() { 1109 lg.Info("closed old backend file") 1110 }() 1111 if err := oldbe.Close(); err != nil { 1112 lg.Panic("failed to close old backend", zap.Error(err)) 1113 } 1114 }() 1115 1116 s.be = newbe 1117 s.bemu.Unlock() 1118 1119 lg.Info("restoring alarm store") 1120 1121 if err := s.restoreAlarms(); err != nil { 1122 lg.Panic("failed to restore alarm store", zap.Error(err)) 1123 } 1124 1125 lg.Info("restored alarm store") 1126 1127 if s.authStore != nil { 1128 lg.Info("restoring auth store") 1129 1130 s.authStore.Recover(newbe) 1131 1132 lg.Info("restored auth store") 1133 } 1134 1135 lg.Info("restoring v2 store") 1136 if err := s.v2store.Recovery(apply.snapshot.Data); err != nil { 1137 lg.Panic("failed to restore v2 store", zap.Error(err)) 1138 } 1139 1140 lg.Info("restored v2 store") 1141 1142 s.cluster.SetBackend(newbe) 1143 1144 lg.Info("restoring cluster configuration") 1145 1146 s.cluster.Recover(api.UpdateCapability) 1147 1148 lg.Info("restored cluster configuration") 1149 lg.Info("removing old peers from network") 1150 1151 // recover raft transport 1152 s.r.transport.RemoveAllPeers() 1153 1154 lg.Info("removed old peers from network") 1155 lg.Info("adding peers from new cluster configuration") 1156 1157 for _, m := range s.cluster.Members() { 1158 if m.ID == s.ID() { 1159 continue 1160 } 1161 s.r.transport.AddPeer(m.ID, m.PeerURLs) 1162 } 1163 1164 lg.Info("added peers from new cluster configuration") 1165 1166 ep.appliedt = apply.snapshot.Metadata.Term 1167 ep.appliedi = apply.snapshot.Metadata.Index 1168 ep.snapi = ep.appliedi 1169 ep.confState = apply.snapshot.Metadata.ConfState 1170} 1171 1172func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { 1173 if len(apply.entries) == 0 { 1174 return 1175 } 1176 firsti := apply.entries[0].Index 1177 if firsti > ep.appliedi+1 { 1178 lg := s.getLogger() 1179 lg.Panic( 1180 "unexpected committed entry index", 1181 zap.Uint64("current-applied-index", ep.appliedi), 1182 zap.Uint64("first-committed-entry-index", firsti), 1183 ) 1184 } 1185 var ents []raftpb.Entry 1186 if ep.appliedi+1-firsti < uint64(len(apply.entries)) { 1187 ents = apply.entries[ep.appliedi+1-firsti:] 1188 } 1189 if len(ents) == 0 { 1190 return 1191 } 1192 var shouldstop bool 1193 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { 1194 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) 1195 } 1196} 1197 1198func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { 1199 if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount { 1200 return 1201 } 1202 1203 lg := s.getLogger() 1204 lg.Info( 1205 "triggering snapshot", 1206 zap.String("local-member-id", s.ID().String()), 1207 zap.Uint64("local-member-applied-index", ep.appliedi), 1208 zap.Uint64("local-member-snapshot-index", ep.snapi), 1209 zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), 1210 ) 1211 1212 s.snapshot(ep.appliedi, ep.confState) 1213 ep.snapi = ep.appliedi 1214} 1215 1216func (s *EtcdServer) hasMultipleVotingMembers() bool { 1217 return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1 1218} 1219 1220func (s *EtcdServer) isLeader() bool { 1221 return uint64(s.ID()) == s.Lead() 1222} 1223 1224// MoveLeader transfers the leader to the given transferee. 1225func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error { 1226 if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner { 1227 return ErrBadLeaderTransferee 1228 } 1229 1230 now := time.Now() 1231 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond 1232 1233 lg := s.getLogger() 1234 lg.Info( 1235 "leadership transfer starting", 1236 zap.String("local-member-id", s.ID().String()), 1237 zap.String("current-leader-member-id", types.ID(lead).String()), 1238 zap.String("transferee-member-id", types.ID(transferee).String()), 1239 ) 1240 1241 s.r.TransferLeadership(ctx, lead, transferee) 1242 for s.Lead() != transferee { 1243 select { 1244 case <-ctx.Done(): // time out 1245 return ErrTimeoutLeaderTransfer 1246 case <-time.After(interval): 1247 } 1248 } 1249 1250 // TODO: drain all requests, or drop all messages to the old leader 1251 lg.Info( 1252 "leadership transfer finished", 1253 zap.String("local-member-id", s.ID().String()), 1254 zap.String("old-leader-member-id", types.ID(lead).String()), 1255 zap.String("new-leader-member-id", types.ID(transferee).String()), 1256 zap.Duration("took", time.Since(now)), 1257 ) 1258 return nil 1259} 1260 1261// TransferLeadership transfers the leader to the chosen transferee. 1262func (s *EtcdServer) TransferLeadership() error { 1263 lg := s.getLogger() 1264 if !s.isLeader() { 1265 lg.Info( 1266 "skipped leadership transfer; local server is not leader", 1267 zap.String("local-member-id", s.ID().String()), 1268 zap.String("current-leader-member-id", types.ID(s.Lead()).String()), 1269 ) 1270 return nil 1271 } 1272 1273 if !s.hasMultipleVotingMembers() { 1274 lg.Info( 1275 "skipped leadership transfer for single voting member cluster", 1276 zap.String("local-member-id", s.ID().String()), 1277 zap.String("current-leader-member-id", types.ID(s.Lead()).String()), 1278 ) 1279 return nil 1280 } 1281 1282 transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs()) 1283 if !ok { 1284 return ErrUnhealthy 1285 } 1286 1287 tm := s.Cfg.ReqTimeout() 1288 ctx, cancel := context.WithTimeout(s.ctx, tm) 1289 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee)) 1290 cancel() 1291 return err 1292} 1293 1294// HardStop stops the server without coordination with other members in the cluster. 1295func (s *EtcdServer) HardStop() { 1296 select { 1297 case s.stop <- struct{}{}: 1298 case <-s.done: 1299 return 1300 } 1301 <-s.done 1302} 1303 1304// Stop stops the server gracefully, and shuts down the running goroutine. 1305// Stop should be called after a Start(s), otherwise it will block forever. 1306// When stopping leader, Stop transfers its leadership to one of its peers 1307// before stopping the server. 1308// Stop terminates the Server and performs any necessary finalization. 1309// Do and Process cannot be called after Stop has been invoked. 1310func (s *EtcdServer) Stop() { 1311 lg := s.getLogger() 1312 if err := s.TransferLeadership(); err != nil { 1313 lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err)) 1314 } 1315 s.HardStop() 1316} 1317 1318// ReadyNotify returns a channel that will be closed when the server 1319// is ready to serve client requests 1320func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych } 1321 1322func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { 1323 select { 1324 case <-time.After(d): 1325 case <-s.done: 1326 } 1327 select { 1328 case s.errorc <- err: 1329 default: 1330 } 1331} 1332 1333// StopNotify returns a channel that receives a empty struct 1334// when the server is stopped. 1335func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } 1336 1337func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } 1338 1339func (s *EtcdServer) LeaderStats() []byte { 1340 lead := s.getLead() 1341 if lead != uint64(s.id) { 1342 return nil 1343 } 1344 return s.lstats.JSON() 1345} 1346 1347func (s *EtcdServer) StoreStats() []byte { return s.v2store.JsonStats() } 1348 1349func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error { 1350 if s.authStore == nil { 1351 // In the context of ordinary etcd process, s.authStore will never be nil. 1352 // This branch is for handling cases in server_test.go 1353 return nil 1354 } 1355 1356 // Note that this permission check is done in the API layer, 1357 // so TOCTOU problem can be caused potentially in a schedule like this: 1358 // update membership with user A -> revoke root role of A -> apply membership change 1359 // in the state machine layer 1360 // However, both of membership change and role management requires the root privilege. 1361 // So careful operation by admins can prevent the problem. 1362 authInfo, err := s.AuthInfoFromCtx(ctx) 1363 if err != nil { 1364 return err 1365 } 1366 1367 return s.AuthStore().IsAdminPermitted(authInfo) 1368} 1369 1370func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) { 1371 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1372 return nil, err 1373 } 1374 1375 // TODO: move Member to protobuf type 1376 b, err := json.Marshal(memb) 1377 if err != nil { 1378 return nil, err 1379 } 1380 1381 // by default StrictReconfigCheck is enabled; reject new members if unhealthy. 1382 if err := s.mayAddMember(memb); err != nil { 1383 return nil, err 1384 } 1385 1386 cc := raftpb.ConfChange{ 1387 Type: raftpb.ConfChangeAddNode, 1388 NodeID: uint64(memb.ID), 1389 Context: b, 1390 } 1391 1392 if memb.IsLearner { 1393 cc.Type = raftpb.ConfChangeAddLearnerNode 1394 } 1395 1396 return s.configure(ctx, cc) 1397} 1398 1399func (s *EtcdServer) mayAddMember(memb membership.Member) error { 1400 lg := s.getLogger() 1401 if !s.Cfg.StrictReconfigCheck { 1402 return nil 1403 } 1404 1405 // protect quorum when adding voting member 1406 if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() { 1407 lg.Warn( 1408 "rejecting member add request; not enough healthy members", 1409 zap.String("local-member-id", s.ID().String()), 1410 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), 1411 zap.Error(ErrNotEnoughStartedMembers), 1412 ) 1413 return ErrNotEnoughStartedMembers 1414 } 1415 1416 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) { 1417 lg.Warn( 1418 "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", 1419 zap.String("local-member-id", s.ID().String()), 1420 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), 1421 zap.Error(ErrUnhealthy), 1422 ) 1423 return ErrUnhealthy 1424 } 1425 1426 return nil 1427} 1428 1429func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) { 1430 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1431 return nil, err 1432 } 1433 1434 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss 1435 if err := s.mayRemoveMember(types.ID(id)); err != nil { 1436 return nil, err 1437 } 1438 1439 cc := raftpb.ConfChange{ 1440 Type: raftpb.ConfChangeRemoveNode, 1441 NodeID: id, 1442 } 1443 return s.configure(ctx, cc) 1444} 1445 1446// PromoteMember promotes a learner node to a voting node. 1447func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { 1448 // only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call 1449 // fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error 1450 // other than ErrNotLeader, return the error. 1451 resp, err := s.promoteMember(ctx, id) 1452 if err == nil { 1453 learnerPromoteSucceed.Inc() 1454 return resp, nil 1455 } 1456 if err != ErrNotLeader { 1457 learnerPromoteFailed.WithLabelValues(err.Error()).Inc() 1458 return resp, err 1459 } 1460 1461 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout()) 1462 defer cancel() 1463 // forward to leader 1464 for cctx.Err() == nil { 1465 leader, err := s.waitLeader(cctx) 1466 if err != nil { 1467 return nil, err 1468 } 1469 for _, url := range leader.PeerURLs { 1470 resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt) 1471 if err == nil { 1472 return resp, nil 1473 } 1474 // If member promotion failed, return early. Otherwise keep retry. 1475 if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner { 1476 return nil, err 1477 } 1478 } 1479 } 1480 1481 if cctx.Err() == context.DeadlineExceeded { 1482 return nil, ErrTimeout 1483 } 1484 return nil, ErrCanceled 1485} 1486 1487// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote 1488// request to raft. 1489// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have 1490// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the 1491// local node is leader (therefore has enough information) but decided the learner node is not ready 1492// to be promoted. 1493func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { 1494 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1495 return nil, err 1496 } 1497 1498 // check if we can promote this learner. 1499 if err := s.mayPromoteMember(types.ID(id)); err != nil { 1500 return nil, err 1501 } 1502 1503 // build the context for the promote confChange. mark IsLearner to false and IsPromote to true. 1504 promoteChangeContext := membership.ConfigChangeContext{ 1505 Member: membership.Member{ 1506 ID: types.ID(id), 1507 }, 1508 IsPromote: true, 1509 } 1510 1511 b, err := json.Marshal(promoteChangeContext) 1512 if err != nil { 1513 return nil, err 1514 } 1515 1516 cc := raftpb.ConfChange{ 1517 Type: raftpb.ConfChangeAddNode, 1518 NodeID: id, 1519 Context: b, 1520 } 1521 1522 return s.configure(ctx, cc) 1523} 1524 1525func (s *EtcdServer) mayPromoteMember(id types.ID) error { 1526 lg := s.getLogger() 1527 err := s.isLearnerReady(uint64(id)) 1528 if err != nil { 1529 return err 1530 } 1531 1532 if !s.Cfg.StrictReconfigCheck { 1533 return nil 1534 } 1535 if !s.cluster.IsReadyToPromoteMember(uint64(id)) { 1536 lg.Warn( 1537 "rejecting member promote request; not enough healthy members", 1538 zap.String("local-member-id", s.ID().String()), 1539 zap.String("requested-member-remove-id", id.String()), 1540 zap.Error(ErrNotEnoughStartedMembers), 1541 ) 1542 return ErrNotEnoughStartedMembers 1543 } 1544 1545 return nil 1546} 1547 1548// check whether the learner catches up with leader or not. 1549// Note: it will return nil if member is not found in cluster or if member is not learner. 1550// These two conditions will be checked before apply phase later. 1551func (s *EtcdServer) isLearnerReady(id uint64) error { 1552 rs := s.raftStatus() 1553 1554 // leader's raftStatus.Progress is not nil 1555 if rs.Progress == nil { 1556 return ErrNotLeader 1557 } 1558 1559 var learnerMatch uint64 1560 isFound := false 1561 leaderID := rs.ID 1562 for memberID, progress := range rs.Progress { 1563 if id == memberID { 1564 // check its status 1565 learnerMatch = progress.Match 1566 isFound = true 1567 break 1568 } 1569 } 1570 1571 if isFound { 1572 leaderMatch := rs.Progress[leaderID].Match 1573 // the learner's Match not caught up with leader yet 1574 if float64(learnerMatch) < float64(leaderMatch)*readyPercent { 1575 return ErrLearnerNotReady 1576 } 1577 } 1578 1579 return nil 1580} 1581 1582func (s *EtcdServer) mayRemoveMember(id types.ID) error { 1583 if !s.Cfg.StrictReconfigCheck { 1584 return nil 1585 } 1586 1587 lg := s.getLogger() 1588 isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner 1589 // no need to check quorum when removing non-voting member 1590 if isLearner { 1591 return nil 1592 } 1593 1594 if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) { 1595 lg.Warn( 1596 "rejecting member remove request; not enough healthy members", 1597 zap.String("local-member-id", s.ID().String()), 1598 zap.String("requested-member-remove-id", id.String()), 1599 zap.Error(ErrNotEnoughStartedMembers), 1600 ) 1601 return ErrNotEnoughStartedMembers 1602 } 1603 1604 // downed member is safe to remove since it's not part of the active quorum 1605 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() { 1606 return nil 1607 } 1608 1609 // protect quorum if some members are down 1610 m := s.cluster.VotingMembers() 1611 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m) 1612 if (active - 1) < 1+((len(m)-1)/2) { 1613 lg.Warn( 1614 "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum", 1615 zap.String("local-member-id", s.ID().String()), 1616 zap.String("requested-member-remove", id.String()), 1617 zap.Int("active-peers", active), 1618 zap.Error(ErrUnhealthy), 1619 ) 1620 return ErrUnhealthy 1621 } 1622 1623 return nil 1624} 1625 1626func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) { 1627 b, merr := json.Marshal(memb) 1628 if merr != nil { 1629 return nil, merr 1630 } 1631 1632 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1633 return nil, err 1634 } 1635 cc := raftpb.ConfChange{ 1636 Type: raftpb.ConfChangeUpdateNode, 1637 NodeID: uint64(memb.ID), 1638 Context: b, 1639 } 1640 return s.configure(ctx, cc) 1641} 1642 1643func (s *EtcdServer) setCommittedIndex(v uint64) { 1644 atomic.StoreUint64(&s.committedIndex, v) 1645} 1646 1647func (s *EtcdServer) getCommittedIndex() uint64 { 1648 return atomic.LoadUint64(&s.committedIndex) 1649} 1650 1651func (s *EtcdServer) setAppliedIndex(v uint64) { 1652 atomic.StoreUint64(&s.appliedIndex, v) 1653} 1654 1655func (s *EtcdServer) getAppliedIndex() uint64 { 1656 return atomic.LoadUint64(&s.appliedIndex) 1657} 1658 1659func (s *EtcdServer) setTerm(v uint64) { 1660 atomic.StoreUint64(&s.term, v) 1661} 1662 1663func (s *EtcdServer) getTerm() uint64 { 1664 return atomic.LoadUint64(&s.term) 1665} 1666 1667func (s *EtcdServer) setLead(v uint64) { 1668 atomic.StoreUint64(&s.lead, v) 1669} 1670 1671func (s *EtcdServer) getLead() uint64 { 1672 return atomic.LoadUint64(&s.lead) 1673} 1674 1675func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { 1676 s.leaderChangedMu.RLock() 1677 defer s.leaderChangedMu.RUnlock() 1678 return s.leaderChanged 1679} 1680 1681// RaftStatusGetter represents etcd server and Raft progress. 1682type RaftStatusGetter interface { 1683 ID() types.ID 1684 Leader() types.ID 1685 CommittedIndex() uint64 1686 AppliedIndex() uint64 1687 Term() uint64 1688} 1689 1690func (s *EtcdServer) ID() types.ID { return s.id } 1691 1692func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) } 1693 1694func (s *EtcdServer) Lead() uint64 { return s.getLead() } 1695 1696func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() } 1697 1698func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() } 1699 1700func (s *EtcdServer) Term() uint64 { return s.getTerm() } 1701 1702type confChangeResponse struct { 1703 membs []*membership.Member 1704 err error 1705} 1706 1707// configure sends a configuration change through consensus and 1708// then waits for it to be applied to the server. It 1709// will block until the change is performed or there is an error. 1710func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) { 1711 lg := s.getLogger() 1712 cc.ID = s.reqIDGen.Next() 1713 ch := s.w.Register(cc.ID) 1714 1715 start := time.Now() 1716 if err := s.r.ProposeConfChange(ctx, cc); err != nil { 1717 s.w.Trigger(cc.ID, nil) 1718 return nil, err 1719 } 1720 1721 select { 1722 case x := <-ch: 1723 if x == nil { 1724 lg.Panic("failed to configure") 1725 } 1726 resp := x.(*confChangeResponse) 1727 lg.Info( 1728 "applied a configuration change through raft", 1729 zap.String("local-member-id", s.ID().String()), 1730 zap.String("raft-conf-change", cc.Type.String()), 1731 zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()), 1732 ) 1733 return resp.membs, resp.err 1734 1735 case <-ctx.Done(): 1736 s.w.Trigger(cc.ID, nil) // GC wait 1737 return nil, s.parseProposeCtxErr(ctx.Err(), start) 1738 1739 case <-s.stopping: 1740 return nil, ErrStopped 1741 } 1742} 1743 1744// sync proposes a SYNC request and is non-blocking. 1745// This makes no guarantee that the request will be proposed or performed. 1746// The request will be canceled after the given timeout. 1747func (s *EtcdServer) sync(timeout time.Duration) { 1748 req := pb.Request{ 1749 Method: "SYNC", 1750 ID: s.reqIDGen.Next(), 1751 Time: time.Now().UnixNano(), 1752 } 1753 data := pbutil.MustMarshal(&req) 1754 // There is no promise that node has leader when do SYNC request, 1755 // so it uses goroutine to propose. 1756 ctx, cancel := context.WithTimeout(s.ctx, timeout) 1757 s.goAttach(func() { 1758 s.r.Propose(ctx, data) 1759 cancel() 1760 }) 1761} 1762 1763// publishV3 registers server information into the cluster using v3 request. The 1764// information is the JSON representation of this server's member struct, updated 1765// with the static clientURLs of the server. 1766// The function keeps attempting to register until it succeeds, 1767// or its server is stopped. 1768// TODO: replace publish() in 3.6 1769func (s *EtcdServer) publishV3(timeout time.Duration) { 1770 req := &membershippb.ClusterMemberAttrSetRequest{ 1771 Member_ID: uint64(s.id), 1772 MemberAttributes: &membershippb.Attributes{ 1773 Name: s.attributes.Name, 1774 ClientUrls: s.attributes.ClientURLs, 1775 }, 1776 } 1777 lg := s.getLogger() 1778 for { 1779 select { 1780 case <-s.stopping: 1781 lg.Warn( 1782 "stopped publish because server is stopping", 1783 zap.String("local-member-id", s.ID().String()), 1784 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1785 zap.Duration("publish-timeout", timeout), 1786 ) 1787 return 1788 1789 default: 1790 } 1791 1792 ctx, cancel := context.WithTimeout(s.ctx, timeout) 1793 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req}) 1794 cancel() 1795 switch err { 1796 case nil: 1797 close(s.readych) 1798 lg.Info( 1799 "published local member to cluster through raft", 1800 zap.String("local-member-id", s.ID().String()), 1801 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1802 zap.String("cluster-id", s.cluster.ID().String()), 1803 zap.Duration("publish-timeout", timeout), 1804 ) 1805 return 1806 1807 default: 1808 lg.Warn( 1809 "failed to publish local member to cluster through raft", 1810 zap.String("local-member-id", s.ID().String()), 1811 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1812 zap.Duration("publish-timeout", timeout), 1813 zap.Error(err), 1814 ) 1815 } 1816 } 1817} 1818 1819// publish registers server information into the cluster. The information 1820// is the JSON representation of this server's member struct, updated with the 1821// static clientURLs of the server. 1822// The function keeps attempting to register until it succeeds, 1823// or its server is stopped. 1824// 1825// Use v2 store to encode member attributes, and apply through Raft 1826// but does not go through v2 API endpoint, which means even with v2 1827// client handler disabled (e.g. --enable-v2=false), cluster can still 1828// process publish requests through rafthttp 1829// TODO: Deprecate v2 store in 3.6 1830func (s *EtcdServer) publish(timeout time.Duration) { 1831 lg := s.getLogger() 1832 b, err := json.Marshal(s.attributes) 1833 if err != nil { 1834 lg.Panic("failed to marshal JSON", zap.Error(err)) 1835 return 1836 } 1837 req := pb.Request{ 1838 Method: "PUT", 1839 Path: membership.MemberAttributesStorePath(s.id), 1840 Val: string(b), 1841 } 1842 1843 for { 1844 ctx, cancel := context.WithTimeout(s.ctx, timeout) 1845 _, err := s.Do(ctx, req) 1846 cancel() 1847 switch err { 1848 case nil: 1849 close(s.readych) 1850 lg.Info( 1851 "published local member to cluster through raft", 1852 zap.String("local-member-id", s.ID().String()), 1853 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1854 zap.String("request-path", req.Path), 1855 zap.String("cluster-id", s.cluster.ID().String()), 1856 zap.Duration("publish-timeout", timeout), 1857 ) 1858 return 1859 1860 case ErrStopped: 1861 lg.Warn( 1862 "stopped publish because server is stopped", 1863 zap.String("local-member-id", s.ID().String()), 1864 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1865 zap.Duration("publish-timeout", timeout), 1866 zap.Error(err), 1867 ) 1868 return 1869 1870 default: 1871 lg.Warn( 1872 "failed to publish local member to cluster through raft", 1873 zap.String("local-member-id", s.ID().String()), 1874 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), 1875 zap.String("request-path", req.Path), 1876 zap.Duration("publish-timeout", timeout), 1877 zap.Error(err), 1878 ) 1879 } 1880 } 1881} 1882 1883func (s *EtcdServer) sendMergedSnap(merged snap.Message) { 1884 atomic.AddInt64(&s.inflightSnapshots, 1) 1885 1886 lg := s.getLogger() 1887 fields := []zap.Field{ 1888 zap.String("from", s.ID().String()), 1889 zap.String("to", types.ID(merged.To).String()), 1890 zap.Int64("bytes", merged.TotalSize), 1891 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))), 1892 } 1893 1894 now := time.Now() 1895 s.r.transport.SendSnapshot(merged) 1896 lg.Info("sending merged snapshot", fields...) 1897 1898 s.goAttach(func() { 1899 select { 1900 case ok := <-merged.CloseNotify(): 1901 // delay releasing inflight snapshot for another 30 seconds to 1902 // block log compaction. 1903 // If the follower still fails to catch up, it is probably just too slow 1904 // to catch up. We cannot avoid the snapshot cycle anyway. 1905 if ok { 1906 select { 1907 case <-time.After(releaseDelayAfterSnapshot): 1908 case <-s.stopping: 1909 } 1910 } 1911 1912 atomic.AddInt64(&s.inflightSnapshots, -1) 1913 1914 lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...) 1915 1916 case <-s.stopping: 1917 lg.Warn("canceled sending merged snapshot; server stopping", fields...) 1918 return 1919 } 1920 }) 1921} 1922 1923// apply takes entries received from Raft (after it has been committed) and 1924// applies them to the current state of the EtcdServer. 1925// The given entries should not be empty. 1926func (s *EtcdServer) apply( 1927 es []raftpb.Entry, 1928 confState *raftpb.ConfState, 1929) (appliedt uint64, appliedi uint64, shouldStop bool) { 1930 for i := range es { 1931 e := es[i] 1932 switch e.Type { 1933 case raftpb.EntryNormal: 1934 s.applyEntryNormal(&e) 1935 s.setAppliedIndex(e.Index) 1936 s.setTerm(e.Term) 1937 1938 case raftpb.EntryConfChange: 1939 // set the consistent index of current executing entry 1940 if e.Index > s.consistIndex.ConsistentIndex() { 1941 s.consistIndex.SetConsistentIndex(e.Index) 1942 } 1943 var cc raftpb.ConfChange 1944 pbutil.MustUnmarshal(&cc, e.Data) 1945 removedSelf, err := s.applyConfChange(cc, confState) 1946 s.setAppliedIndex(e.Index) 1947 s.setTerm(e.Term) 1948 shouldStop = shouldStop || removedSelf 1949 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) 1950 1951 default: 1952 lg := s.getLogger() 1953 lg.Panic( 1954 "unknown entry type; must be either EntryNormal or EntryConfChange", 1955 zap.String("type", e.Type.String()), 1956 ) 1957 } 1958 appliedi, appliedt = e.Index, e.Term 1959 } 1960 return appliedt, appliedi, shouldStop 1961} 1962 1963// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer 1964func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { 1965 shouldApplyV3 := false 1966 index := s.consistIndex.ConsistentIndex() 1967 if e.Index > index { 1968 // set the consistent index of current executing entry 1969 s.consistIndex.SetConsistentIndex(e.Index) 1970 shouldApplyV3 = true 1971 } 1972 s.lg.Debug("apply entry normal", 1973 zap.Uint64("consistent-index", index), 1974 zap.Uint64("entry-index", e.Index), 1975 zap.Bool("should-applyV3", shouldApplyV3)) 1976 1977 // raft state machine may generate noop entry when leader confirmation. 1978 // skip it in advance to avoid some potential bug in the future 1979 if len(e.Data) == 0 { 1980 select { 1981 case s.forceVersionC <- struct{}{}: 1982 default: 1983 } 1984 // promote lessor when the local member is leader and finished 1985 // applying all entries from the last term. 1986 if s.isLeader() { 1987 s.lessor.Promote(s.Cfg.electionTimeout()) 1988 } 1989 return 1990 } 1991 1992 var raftReq pb.InternalRaftRequest 1993 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible 1994 var r pb.Request 1995 rp := &r 1996 pbutil.MustUnmarshal(rp, e.Data) 1997 s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp))) 1998 return 1999 } 2000 if raftReq.V2 != nil { 2001 req := (*RequestV2)(raftReq.V2) 2002 s.w.Trigger(req.ID, s.applyV2Request(req)) 2003 return 2004 } 2005 // do not re-apply applied entries. 2006 if !shouldApplyV3 { 2007 return 2008 } 2009 2010 id := raftReq.ID 2011 if id == 0 { 2012 id = raftReq.Header.ID 2013 } 2014 2015 var ar *applyResult 2016 needResult := s.w.IsRegistered(id) 2017 if needResult || !noSideEffect(&raftReq) { 2018 if !needResult && raftReq.Txn != nil { 2019 removeNeedlessRangeReqs(raftReq.Txn) 2020 } 2021 ar = s.applyV3.Apply(&raftReq) 2022 } 2023 2024 if ar == nil { 2025 return 2026 } 2027 2028 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { 2029 s.w.Trigger(id, ar) 2030 return 2031 } 2032 2033 lg := s.getLogger() 2034 lg.Warn( 2035 "message exceeded backend quota; raising alarm", 2036 zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes), 2037 zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))), 2038 zap.Error(ar.err), 2039 ) 2040 2041 s.goAttach(func() { 2042 a := &pb.AlarmRequest{ 2043 MemberID: uint64(s.ID()), 2044 Action: pb.AlarmRequest_ACTIVATE, 2045 Alarm: pb.AlarmType_NOSPACE, 2046 } 2047 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) 2048 s.w.Trigger(id, ar) 2049 }) 2050} 2051 2052// applyConfChange applies a ConfChange to the server. It is only 2053// invoked with a ConfChange that has already passed through Raft 2054func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { 2055 if err := s.cluster.ValidateConfigurationChange(cc); err != nil { 2056 cc.NodeID = raft.None 2057 s.r.ApplyConfChange(cc) 2058 return false, err 2059 } 2060 2061 lg := s.getLogger() 2062 *confState = *s.r.ApplyConfChange(cc) 2063 switch cc.Type { 2064 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: 2065 confChangeContext := new(membership.ConfigChangeContext) 2066 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { 2067 lg.Panic("failed to unmarshal member", zap.Error(err)) 2068 } 2069 if cc.NodeID != uint64(confChangeContext.Member.ID) { 2070 lg.Panic( 2071 "got different member ID", 2072 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), 2073 zap.String("member-id-from-message", confChangeContext.Member.ID.String()), 2074 ) 2075 } 2076 if confChangeContext.IsPromote { 2077 s.cluster.PromoteMember(confChangeContext.Member.ID) 2078 } else { 2079 s.cluster.AddMember(&confChangeContext.Member) 2080 2081 if confChangeContext.Member.ID != s.id { 2082 s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) 2083 } 2084 } 2085 2086 // update the isLearner metric when this server id is equal to the id in raft member confChange 2087 if confChangeContext.Member.ID == s.id { 2088 if cc.Type == raftpb.ConfChangeAddLearnerNode { 2089 isLearner.Set(1) 2090 } else { 2091 isLearner.Set(0) 2092 } 2093 } 2094 2095 case raftpb.ConfChangeRemoveNode: 2096 id := types.ID(cc.NodeID) 2097 s.cluster.RemoveMember(id) 2098 if id == s.id { 2099 return true, nil 2100 } 2101 s.r.transport.RemovePeer(id) 2102 2103 case raftpb.ConfChangeUpdateNode: 2104 m := new(membership.Member) 2105 if err := json.Unmarshal(cc.Context, m); err != nil { 2106 lg.Panic("failed to unmarshal member", zap.Error(err)) 2107 } 2108 if cc.NodeID != uint64(m.ID) { 2109 lg.Panic( 2110 "got different member ID", 2111 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), 2112 zap.String("member-id-from-message", m.ID.String()), 2113 ) 2114 } 2115 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) 2116 if m.ID != s.id { 2117 s.r.transport.UpdatePeer(m.ID, m.PeerURLs) 2118 } 2119 } 2120 return false, nil 2121} 2122 2123// TODO: non-blocking snapshot 2124func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { 2125 clone := s.v2store.Clone() 2126 // commit kv to write metadata (for example: consistent index) to disk. 2127 // KV().commit() updates the consistent index in backend. 2128 // All operations that update consistent index must be called sequentially 2129 // from applyAll function. 2130 // So KV().Commit() cannot run in parallel with apply. It has to be called outside 2131 // the go routine created below. 2132 s.KV().Commit() 2133 2134 s.goAttach(func() { 2135 lg := s.getLogger() 2136 2137 d, err := clone.SaveNoCopy() 2138 // TODO: current store will never fail to do a snapshot 2139 // what should we do if the store might fail? 2140 if err != nil { 2141 lg.Panic("failed to save v2 store", zap.Error(err)) 2142 } 2143 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) 2144 if err != nil { 2145 // the snapshot was done asynchronously with the progress of raft. 2146 // raft might have already got a newer snapshot. 2147 if err == raft.ErrSnapOutOfDate { 2148 return 2149 } 2150 lg.Panic("failed to create snapshot", zap.Error(err)) 2151 } 2152 // SaveSnap saves the snapshot and releases the locked wal files 2153 // to the snapshot index. 2154 if err = s.r.storage.SaveSnap(snap); err != nil { 2155 lg.Panic("failed to save snapshot", zap.Error(err)) 2156 } 2157 lg.Info( 2158 "saved snapshot", 2159 zap.Uint64("snapshot-index", snap.Metadata.Index), 2160 ) 2161 2162 // When sending a snapshot, etcd will pause compaction. 2163 // After receives a snapshot, the slow follower needs to get all the entries right after 2164 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after 2165 // the snapshot sent might already be compacted. It happens when the snapshot takes long time 2166 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. 2167 if atomic.LoadInt64(&s.inflightSnapshots) != 0 { 2168 lg.Info("skip compaction since there is an inflight snapshot") 2169 return 2170 } 2171 2172 // keep some in memory log entries for slow followers. 2173 compacti := uint64(1) 2174 if snapi > s.Cfg.SnapshotCatchUpEntries { 2175 compacti = snapi - s.Cfg.SnapshotCatchUpEntries 2176 } 2177 2178 err = s.r.raftStorage.Compact(compacti) 2179 if err != nil { 2180 // the compaction was done asynchronously with the progress of raft. 2181 // raft log might already been compact. 2182 if err == raft.ErrCompacted { 2183 return 2184 } 2185 lg.Panic("failed to compact", zap.Error(err)) 2186 } 2187 lg.Info( 2188 "compacted Raft logs", 2189 zap.Uint64("compact-index", compacti), 2190 ) 2191 }) 2192} 2193 2194// CutPeer drops messages to the specified peer. 2195func (s *EtcdServer) CutPeer(id types.ID) { 2196 tr, ok := s.r.transport.(*rafthttp.Transport) 2197 if ok { 2198 tr.CutPeer(id) 2199 } 2200} 2201 2202// MendPeer recovers the message dropping behavior of the given peer. 2203func (s *EtcdServer) MendPeer(id types.ID) { 2204 tr, ok := s.r.transport.(*rafthttp.Transport) 2205 if ok { 2206 tr.MendPeer(id) 2207 } 2208} 2209 2210func (s *EtcdServer) PauseSending() { s.r.pauseSending() } 2211 2212func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } 2213 2214func (s *EtcdServer) ClusterVersion() *semver.Version { 2215 if s.cluster == nil { 2216 return nil 2217 } 2218 return s.cluster.Version() 2219} 2220 2221// monitorVersions checks the member's version every monitorVersionInterval. 2222// It updates the cluster version if all members agrees on a higher one. 2223// It prints out log if there is a member with a higher version than the 2224// local version. 2225func (s *EtcdServer) monitorVersions() { 2226 for { 2227 select { 2228 case <-s.forceVersionC: 2229 case <-time.After(monitorVersionInterval): 2230 case <-s.stopping: 2231 return 2232 } 2233 2234 if s.Leader() != s.ID() { 2235 continue 2236 } 2237 2238 v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) 2239 if v != nil { 2240 // only keep major.minor version for comparison 2241 v = &semver.Version{ 2242 Major: v.Major, 2243 Minor: v.Minor, 2244 } 2245 } 2246 2247 // if the current version is nil: 2248 // 1. use the decided version if possible 2249 // 2. or use the min cluster version 2250 if s.cluster.Version() == nil { 2251 verStr := version.MinClusterVersion 2252 if v != nil { 2253 verStr = v.String() 2254 } 2255 s.goAttach(func() { s.updateClusterVersion(verStr) }) 2256 continue 2257 } 2258 2259 // update cluster version only if the decided version is greater than 2260 // the current cluster version 2261 if v != nil && s.cluster.Version().LessThan(*v) { 2262 s.goAttach(func() { s.updateClusterVersion(v.String()) }) 2263 } 2264 } 2265} 2266 2267func (s *EtcdServer) updateClusterVersion(ver string) { 2268 lg := s.getLogger() 2269 2270 if s.cluster.Version() == nil { 2271 lg.Info( 2272 "setting up initial cluster version", 2273 zap.String("cluster-version", version.Cluster(ver)), 2274 ) 2275 } else { 2276 lg.Info( 2277 "updating cluster version", 2278 zap.String("from", version.Cluster(s.cluster.Version().String())), 2279 zap.String("to", version.Cluster(ver)), 2280 ) 2281 } 2282 2283 req := membershippb.ClusterVersionSetRequest{Ver: ver} 2284 2285 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) 2286 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req}) 2287 cancel() 2288 2289 switch err { 2290 case nil: 2291 lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver))) 2292 return 2293 2294 case ErrStopped: 2295 lg.Warn("aborting cluster version update; server is stopped", zap.Error(err)) 2296 return 2297 2298 default: 2299 lg.Warn("failed to update cluster version", zap.Error(err)) 2300 } 2301} 2302 2303func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { 2304 switch err { 2305 case context.Canceled: 2306 return ErrCanceled 2307 2308 case context.DeadlineExceeded: 2309 s.leadTimeMu.RLock() 2310 curLeadElected := s.leadElectedTime 2311 s.leadTimeMu.RUnlock() 2312 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) 2313 if start.After(prevLeadLost) && start.Before(curLeadElected) { 2314 return ErrTimeoutDueToLeaderFail 2315 } 2316 lead := types.ID(s.getLead()) 2317 switch lead { 2318 case types.ID(raft.None): 2319 // TODO: return error to specify it happens because the cluster does not have leader now 2320 case s.ID(): 2321 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) { 2322 return ErrTimeoutDueToConnectionLost 2323 } 2324 default: 2325 if !isConnectedSince(s.r.transport, start, lead) { 2326 return ErrTimeoutDueToConnectionLost 2327 } 2328 } 2329 return ErrTimeout 2330 2331 default: 2332 return err 2333 } 2334} 2335 2336func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } 2337func (s *EtcdServer) Backend() backend.Backend { 2338 s.bemu.Lock() 2339 defer s.bemu.Unlock() 2340 return s.be 2341} 2342 2343func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore } 2344 2345func (s *EtcdServer) restoreAlarms() error { 2346 s.applyV3 = s.newApplierV3() 2347 as, err := v3alarm.NewAlarmStore(s.lg, s) 2348 if err != nil { 2349 return err 2350 } 2351 s.alarmStore = as 2352 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 { 2353 s.applyV3 = newApplierV3Capped(s.applyV3) 2354 } 2355 if len(as.Get(pb.AlarmType_CORRUPT)) > 0 { 2356 s.applyV3 = newApplierV3Corrupt(s.applyV3) 2357 } 2358 return nil 2359} 2360 2361// goAttach creates a goroutine on a given function and tracks it using 2362// the etcdserver waitgroup. 2363func (s *EtcdServer) goAttach(f func()) { 2364 s.wgMu.RLock() // this blocks with ongoing close(s.stopping) 2365 defer s.wgMu.RUnlock() 2366 select { 2367 case <-s.stopping: 2368 lg := s.getLogger() 2369 lg.Warn("server has stopped; skipping goAttach") 2370 return 2371 default: 2372 } 2373 2374 // now safe to add since waitgroup wait has not started yet 2375 s.wg.Add(1) 2376 go func() { 2377 defer s.wg.Done() 2378 f() 2379 }() 2380} 2381 2382func (s *EtcdServer) Alarms() []*pb.AlarmMember { 2383 return s.alarmStore.Get(pb.AlarmType_NONE) 2384} 2385 2386func (s *EtcdServer) Logger() *zap.Logger { 2387 return s.lg 2388} 2389 2390// IsLearner returns if the local member is raft learner 2391func (s *EtcdServer) IsLearner() bool { 2392 return s.cluster.IsLocalMemberLearner() 2393} 2394 2395// IsMemberExist returns if the member with the given id exists in cluster. 2396func (s *EtcdServer) IsMemberExist(id types.ID) bool { 2397 return s.cluster.IsMemberExist(id) 2398} 2399 2400// raftStatus returns the raft status of this etcd node. 2401func (s *EtcdServer) raftStatus() raft.Status { 2402 return s.r.Node.Status() 2403} 2404