1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package etcdserver 16 17import ( 18 "encoding/json" 19 "expvar" 20 "fmt" 21 "math" 22 "math/rand" 23 "net/http" 24 "os" 25 "path" 26 "regexp" 27 "sync" 28 "sync/atomic" 29 "time" 30 31 "github.com/coreos/etcd/alarm" 32 "github.com/coreos/etcd/auth" 33 "github.com/coreos/etcd/compactor" 34 "github.com/coreos/etcd/discovery" 35 "github.com/coreos/etcd/etcdserver/api" 36 "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" 37 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 38 "github.com/coreos/etcd/etcdserver/membership" 39 "github.com/coreos/etcd/etcdserver/stats" 40 "github.com/coreos/etcd/lease" 41 "github.com/coreos/etcd/mvcc" 42 "github.com/coreos/etcd/mvcc/backend" 43 "github.com/coreos/etcd/pkg/fileutil" 44 "github.com/coreos/etcd/pkg/idutil" 45 "github.com/coreos/etcd/pkg/pbutil" 46 "github.com/coreos/etcd/pkg/runtime" 47 "github.com/coreos/etcd/pkg/schedule" 48 "github.com/coreos/etcd/pkg/types" 49 "github.com/coreos/etcd/pkg/wait" 50 "github.com/coreos/etcd/raft" 51 "github.com/coreos/etcd/raft/raftpb" 52 "github.com/coreos/etcd/rafthttp" 53 "github.com/coreos/etcd/snap" 54 "github.com/coreos/etcd/store" 55 "github.com/coreos/etcd/version" 56 "github.com/coreos/etcd/wal" 57 58 "github.com/coreos/go-semver/semver" 59 "github.com/coreos/pkg/capnslog" 60 "github.com/prometheus/client_golang/prometheus" 61 "golang.org/x/net/context" 62) 63 64const ( 65 DefaultSnapCount = 100000 66 67 StoreClusterPrefix = "/0" 68 StoreKeysPrefix = "/1" 69 70 // HealthInterval is the minimum time the cluster should be healthy 71 // before accepting add member requests. 72 HealthInterval = 5 * time.Second 73 74 purgeFileInterval = 30 * time.Second 75 // monitorVersionInterval should be smaller than the timeout 76 // on the connection. Or we will not be able to reuse the connection 77 // (since it will timeout). 78 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second 79 80 // max number of in-flight snapshot messages etcdserver allows to have 81 // This number is more than enough for most clusters with 5 machines. 82 maxInFlightMsgSnap = 16 83 84 releaseDelayAfterSnapshot = 30 * time.Second 85 86 // maxPendingRevokes is the maximum number of outstanding expired lease revocations. 87 maxPendingRevokes = 16 88 recommendedMaxRequestBytes = 10 * 1024 * 1024 89) 90 91var ( 92 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver") 93 94 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")) 95) 96 97func init() { 98 rand.Seed(time.Now().UnixNano()) 99 100 expvar.Publish( 101 "file_descriptor_limit", 102 expvar.Func( 103 func() interface{} { 104 n, _ := runtime.FDLimit() 105 return n 106 }, 107 ), 108 ) 109} 110 111type Response struct { 112 Event *store.Event 113 Watcher store.Watcher 114 err error 115} 116 117type Server interface { 118 // Start performs any initialization of the Server necessary for it to 119 // begin serving requests. It must be called before Do or Process. 120 // Start must be non-blocking; any long-running server functionality 121 // should be implemented in goroutines. 122 Start() 123 // Stop terminates the Server and performs any necessary finalization. 124 // Do and Process cannot be called after Stop has been invoked. 125 Stop() 126 // ID returns the ID of the Server. 127 ID() types.ID 128 // Leader returns the ID of the leader Server. 129 Leader() types.ID 130 // Do takes a request and attempts to fulfill it, returning a Response. 131 Do(ctx context.Context, r pb.Request) (Response, error) 132 // Process takes a raft message and applies it to the server's raft state 133 // machine, respecting any timeout of the given context. 134 Process(ctx context.Context, m raftpb.Message) error 135 // AddMember attempts to add a member into the cluster. It will return 136 // ErrIDRemoved if member ID is removed from the cluster, or return 137 // ErrIDExists if member ID exists in the cluster. 138 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) 139 // RemoveMember attempts to remove a member from the cluster. It will 140 // return ErrIDRemoved if member ID is removed from the cluster, or return 141 // ErrIDNotFound if member ID is not in the cluster. 142 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) 143 144 // UpdateMember attempts to update an existing member in the cluster. It will 145 // return ErrIDNotFound if the member ID does not exist. 146 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) 147 148 // ClusterVersion is the cluster-wide minimum major.minor version. 149 // Cluster version is set to the min version that an etcd member is 150 // compatible with when first bootstrap. 151 // 152 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum). 153 // 154 // During a rolling upgrades, the ClusterVersion will be updated 155 // automatically after a sync. (5 second by default) 156 // 157 // The API/raft component can utilize ClusterVersion to determine if 158 // it can accept a client request or a raft RPC. 159 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and 160 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since 161 // this feature is introduced post 2.0. 162 ClusterVersion() *semver.Version 163} 164 165// EtcdServer is the production implementation of the Server interface 166type EtcdServer struct { 167 // inflightSnapshots holds count the number of snapshots currently inflight. 168 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. 169 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. 170 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. 171 // consistIndex used to hold the offset of current executing entry 172 // It is initialized to 0 before executing any entry. 173 consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. 174 Cfg *ServerConfig 175 176 readych chan struct{} 177 r raftNode 178 179 snapCount uint64 180 181 w wait.Wait 182 183 readMu sync.RWMutex 184 // read routine notifies etcd server that it waits for reading by sending an empty struct to 185 // readwaitC 186 readwaitc chan struct{} 187 // readNotifier is used to notify the read routine that it can process the request 188 // when there is no error 189 readNotifier *notifier 190 191 // stop signals the run goroutine should shutdown. 192 stop chan struct{} 193 // stopping is closed by run goroutine on shutdown. 194 stopping chan struct{} 195 // done is closed when all goroutines from start() complete. 196 done chan struct{} 197 198 errorc chan error 199 id types.ID 200 attributes membership.Attributes 201 202 cluster *membership.RaftCluster 203 204 store store.Store 205 snapshotter *snap.Snapshotter 206 207 applyV2 ApplierV2 208 209 // applyV3 is the applier with auth and quotas 210 applyV3 applierV3 211 // applyV3Base is the core applier without auth or quotas 212 applyV3Base applierV3 213 applyWait wait.WaitTime 214 215 kv mvcc.ConsistentWatchableKV 216 lessor lease.Lessor 217 bemu sync.Mutex 218 be backend.Backend 219 authStore auth.AuthStore 220 alarmStore *alarm.AlarmStore 221 222 stats *stats.ServerStats 223 lstats *stats.LeaderStats 224 225 SyncTicker *time.Ticker 226 // compactor is used to auto-compact the KV. 227 compactor *compactor.Periodic 228 229 // peerRt used to send requests (version, lease) to peers. 230 peerRt http.RoundTripper 231 reqIDGen *idutil.Generator 232 233 // forceVersionC is used to force the version monitor loop 234 // to detect the cluster version immediately. 235 forceVersionC chan struct{} 236 237 // wgMu blocks concurrent waitgroup mutation while server stopping 238 wgMu sync.RWMutex 239 // wg is used to wait for the go routines that depends on the server state 240 // to exit when stopping the server. 241 wg sync.WaitGroup 242 243 // ctx is used for etcd-initiated requests that may need to be canceled 244 // on etcd server shutdown. 245 ctx context.Context 246 cancel context.CancelFunc 247 248 leadTimeMu sync.RWMutex 249 leadElectedTime time.Time 250} 251 252// NewServer creates a new EtcdServer from the supplied configuration. The 253// configuration is considered static for the lifetime of the EtcdServer. 254func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { 255 st := store.New(StoreClusterPrefix, StoreKeysPrefix) 256 257 var ( 258 w *wal.WAL 259 n raft.Node 260 s *raft.MemoryStorage 261 id types.ID 262 cl *membership.RaftCluster 263 ) 264 265 if cfg.MaxRequestBytes > recommendedMaxRequestBytes { 266 plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes) 267 } 268 269 if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil { 270 return nil, fmt.Errorf("cannot access data directory: %v", terr) 271 } 272 273 haveWAL := wal.Exist(cfg.WALDir()) 274 275 if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil { 276 plog.Fatalf("create snapshot directory error: %v", err) 277 } 278 ss := snap.New(cfg.SnapDir()) 279 280 bepath := cfg.backendPath() 281 beExist := fileutil.Exist(bepath) 282 be := openBackend(cfg) 283 284 defer func() { 285 if err != nil { 286 be.Close() 287 } 288 }() 289 290 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) 291 if err != nil { 292 return nil, err 293 } 294 var ( 295 remotes []*membership.Member 296 snapshot *raftpb.Snapshot 297 ) 298 299 switch { 300 case !haveWAL && !cfg.NewCluster: 301 if err = cfg.VerifyJoinExisting(); err != nil { 302 return nil, err 303 } 304 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) 305 if err != nil { 306 return nil, err 307 } 308 existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) 309 if gerr != nil { 310 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) 311 } 312 if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { 313 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) 314 } 315 if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { 316 return nil, fmt.Errorf("incompatible with current running cluster") 317 } 318 319 remotes = existingCluster.Members() 320 cl.SetID(existingCluster.ID()) 321 cl.SetStore(st) 322 cl.SetBackend(be) 323 cfg.Print() 324 id, n, s, w = startNode(cfg, cl, nil) 325 case !haveWAL && cfg.NewCluster: 326 if err = cfg.VerifyBootstrap(); err != nil { 327 return nil, err 328 } 329 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) 330 if err != nil { 331 return nil, err 332 } 333 m := cl.MemberByName(cfg.Name) 334 if isMemberBootstrapped(cl, cfg.Name, prt, cfg.bootstrapTimeout()) { 335 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) 336 } 337 if cfg.ShouldDiscover() { 338 var str string 339 str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) 340 if err != nil { 341 return nil, &DiscoveryError{Op: "join", Err: err} 342 } 343 var urlsmap types.URLsMap 344 urlsmap, err = types.NewURLsMap(str) 345 if err != nil { 346 return nil, err 347 } 348 if checkDuplicateURL(urlsmap) { 349 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) 350 } 351 if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil { 352 return nil, err 353 } 354 } 355 cl.SetStore(st) 356 cl.SetBackend(be) 357 cfg.PrintWithInitial() 358 id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) 359 case haveWAL: 360 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { 361 return nil, fmt.Errorf("cannot write to member directory: %v", err) 362 } 363 364 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { 365 return nil, fmt.Errorf("cannot write to WAL directory: %v", err) 366 } 367 368 if cfg.ShouldDiscover() { 369 plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) 370 } 371 snapshot, err = ss.Load() 372 if err != nil && err != snap.ErrNoSnapshot { 373 return nil, err 374 } 375 if snapshot != nil { 376 if err = st.Recovery(snapshot.Data); err != nil { 377 plog.Panicf("recovered store from snapshot error: %v", err) 378 } 379 plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) 380 if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil { 381 plog.Panicf("recovering backend from snapshot error: %v", err) 382 } 383 } 384 cfg.Print() 385 if !cfg.ForceNewCluster { 386 id, cl, n, s, w = restartNode(cfg, snapshot) 387 } else { 388 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) 389 } 390 cl.SetStore(st) 391 cl.SetBackend(be) 392 cl.Recover(api.UpdateCapability) 393 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { 394 os.RemoveAll(bepath) 395 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) 396 } 397 default: 398 return nil, fmt.Errorf("unsupported bootstrap config") 399 } 400 401 if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { 402 return nil, fmt.Errorf("cannot access member directory: %v", terr) 403 } 404 405 sstats := stats.NewServerStats(cfg.Name, id.String()) 406 lstats := stats.NewLeaderStats(id.String()) 407 408 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond 409 srv = &EtcdServer{ 410 readych: make(chan struct{}), 411 Cfg: cfg, 412 snapCount: cfg.SnapCount, 413 errorc: make(chan error, 1), 414 store: st, 415 snapshotter: ss, 416 r: *newRaftNode( 417 raftNodeConfig{ 418 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, 419 Node: n, 420 heartbeat: heartbeat, 421 raftStorage: s, 422 storage: NewStorage(w, ss), 423 }, 424 ), 425 id: id, 426 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, 427 cluster: cl, 428 stats: sstats, 429 lstats: lstats, 430 SyncTicker: time.NewTicker(500 * time.Millisecond), 431 peerRt: prt, 432 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), 433 forceVersionC: make(chan struct{}), 434 } 435 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) 436 437 srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} 438 439 srv.be = be 440 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat 441 442 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. 443 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. 444 srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) 445 srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) 446 if beExist { 447 kvindex := srv.kv.ConsistentIndex() 448 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade 449 // etcd from pre-3.0 release. 450 if snapshot != nil && kvindex < snapshot.Metadata.Index { 451 if kvindex != 0 { 452 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index) 453 } 454 plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index) 455 } 456 } 457 newSrv := srv // since srv == nil in defer if srv is returned as nil 458 defer func() { 459 // closing backend without first closing kv can cause 460 // resumed compactions to fail with closed tx errors 461 if err != nil { 462 newSrv.kv.Close() 463 } 464 }() 465 466 srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) 467 tp, err := auth.NewTokenProvider(cfg.AuthToken, 468 func(index uint64) <-chan struct{} { 469 return srv.applyWait.Wait(index) 470 }, 471 ) 472 if err != nil { 473 plog.Errorf("failed to create token provider: %s", err) 474 return nil, err 475 } 476 srv.authStore = auth.NewAuthStore(srv.be, tp) 477 if h := cfg.AutoCompactionRetention; h != 0 { 478 srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) 479 srv.compactor.Run() 480 } 481 482 srv.applyV3Base = &applierV3backend{srv} 483 if err = srv.restoreAlarms(); err != nil { 484 return nil, err 485 } 486 487 // TODO: move transport initialization near the definition of remote 488 tr := &rafthttp.Transport{ 489 TLSInfo: cfg.PeerTLSInfo, 490 DialTimeout: cfg.peerDialTimeout(), 491 ID: id, 492 URLs: cfg.PeerURLs, 493 ClusterID: cl.ID(), 494 Raft: srv, 495 Snapshotter: ss, 496 ServerStats: sstats, 497 LeaderStats: lstats, 498 ErrorC: srv.errorc, 499 } 500 if err = tr.Start(); err != nil { 501 return nil, err 502 } 503 // add all remotes into transport 504 for _, m := range remotes { 505 if m.ID != id { 506 tr.AddRemote(m.ID, m.PeerURLs) 507 } 508 } 509 for _, m := range cl.Members() { 510 if m.ID != id { 511 tr.AddPeer(m.ID, m.PeerURLs) 512 } 513 } 514 srv.r.transport = tr 515 516 return srv, nil 517} 518 519func (s *EtcdServer) adjustTicks() { 520 clusterN := len(s.cluster.Members()) 521 522 // single-node fresh start, or single-node recovers from snapshot 523 if clusterN == 1 { 524 ticks := s.Cfg.ElectionTicks - 1 525 plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks) 526 s.r.advanceTicks(ticks) 527 return 528 } 529 530 if !s.Cfg.InitialElectionTickAdvance { 531 plog.Infof("skipping initial election tick advance (election tick %d)", s.Cfg.ElectionTicks) 532 return 533 } 534 535 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec 536 // until peer connection reports; otherwise: 537 // 1. all connections failed, or 538 // 2. no active peers, or 539 // 3. restarted single-node with no snapshot 540 // then, do nothing, because advancing ticks would have no effect 541 waitTime := rafthttp.ConnReadTimeout 542 itv := 50 * time.Millisecond 543 for i := int64(0); i < int64(waitTime/itv); i++ { 544 select { 545 case <-time.After(itv): 546 case <-s.stopping: 547 return 548 } 549 550 peerN := s.r.transport.ActivePeers() 551 if peerN > 1 { 552 // multi-node received peer connection reports 553 // adjust ticks, in case slow leader message receive 554 ticks := s.Cfg.ElectionTicks - 2 555 plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN) 556 s.r.advanceTicks(ticks) 557 return 558 } 559 } 560} 561 562// Start performs any initialization of the Server necessary for it to 563// begin serving requests. It must be called before Do or Process. 564// Start must be non-blocking; any long-running server functionality 565// should be implemented in goroutines. 566func (s *EtcdServer) Start() { 567 s.start() 568 s.goAttach(func() { s.adjustTicks() }) 569 s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) 570 s.goAttach(s.purgeFile) 571 s.goAttach(func() { monitorFileDescriptor(s.stopping) }) 572 s.goAttach(s.monitorVersions) 573 s.goAttach(s.linearizableReadLoop) 574} 575 576// start prepares and starts server in a new goroutine. It is no longer safe to 577// modify a server's fields after it has been sent to Start. 578// This function is just used for testing. 579func (s *EtcdServer) start() { 580 if s.snapCount == 0 { 581 plog.Infof("set snapshot count to default %d", DefaultSnapCount) 582 s.snapCount = DefaultSnapCount 583 } 584 s.w = wait.New() 585 s.applyWait = wait.NewTimeList() 586 s.done = make(chan struct{}) 587 s.stop = make(chan struct{}) 588 s.stopping = make(chan struct{}) 589 s.ctx, s.cancel = context.WithCancel(context.Background()) 590 s.readwaitc = make(chan struct{}, 1) 591 s.readNotifier = newNotifier() 592 if s.ClusterVersion() != nil { 593 plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) 594 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1) 595 } else { 596 plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version) 597 } 598 // TODO: if this is an empty log, writes all peer infos 599 // into the first entry 600 go s.run() 601} 602 603func (s *EtcdServer) purgeFile() { 604 var dberrc, serrc, werrc <-chan error 605 var dbdonec, sdonec, wdonec <-chan struct{} 606 if s.Cfg.MaxSnapFiles > 0 { 607 dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) 608 sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) 609 } 610 if s.Cfg.MaxWALFiles > 0 { 611 wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping) 612 } 613 select { 614 case e := <-dberrc: 615 plog.Fatalf("failed to purge snap db file %v", e) 616 case e := <-serrc: 617 plog.Fatalf("failed to purge snap file %v", e) 618 case e := <-werrc: 619 plog.Fatalf("failed to purge wal file %v", e) 620 case <-s.stopping: 621 if dbdonec != nil { 622 <-dbdonec 623 } 624 if sdonec != nil { 625 <-sdonec 626 } 627 if wdonec != nil { 628 <-wdonec 629 } 630 return 631 } 632} 633 634func (s *EtcdServer) ID() types.ID { return s.id } 635 636func (s *EtcdServer) Cluster() *membership.RaftCluster { return s.cluster } 637 638func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } 639 640func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } 641 642func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } 643 644func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { 645 if s.cluster.IsIDRemoved(types.ID(m.From)) { 646 plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) 647 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") 648 } 649 if m.Type == raftpb.MsgApp { 650 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) 651 } 652 return s.r.Step(ctx, m) 653} 654 655func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) } 656 657func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) } 658 659// ReportSnapshot reports snapshot sent status to the raft state machine, 660// and clears the used snapshot from the snapshot store. 661func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { 662 s.r.ReportSnapshot(id, status) 663} 664 665type etcdProgress struct { 666 confState raftpb.ConfState 667 snapi uint64 668 appliedt uint64 669 appliedi uint64 670} 671 672// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, 673// and helps decouple state machine logic from Raft algorithms. 674// TODO: add a state machine interface to apply the commit entries and do snapshot/recover 675type raftReadyHandler struct { 676 updateLeadership func(newLeader bool) 677 updateCommittedIndex func(uint64) 678} 679 680func (s *EtcdServer) run() { 681 sn, err := s.r.raftStorage.Snapshot() 682 if err != nil { 683 plog.Panicf("get snapshot from raft storage error: %v", err) 684 } 685 686 // asynchronously accept apply packets, dispatch progress in-order 687 sched := schedule.NewFIFOScheduler() 688 689 var ( 690 smu sync.RWMutex 691 syncC <-chan time.Time 692 ) 693 setSyncC := func(ch <-chan time.Time) { 694 smu.Lock() 695 syncC = ch 696 smu.Unlock() 697 } 698 getSyncC := func() (ch <-chan time.Time) { 699 smu.RLock() 700 ch = syncC 701 smu.RUnlock() 702 return 703 } 704 rh := &raftReadyHandler{ 705 updateLeadership: func(newLeader bool) { 706 if !s.isLeader() { 707 if s.lessor != nil { 708 s.lessor.Demote() 709 } 710 if s.compactor != nil { 711 s.compactor.Pause() 712 } 713 setSyncC(nil) 714 } else { 715 if newLeader { 716 t := time.Now() 717 s.leadTimeMu.Lock() 718 s.leadElectedTime = t 719 s.leadTimeMu.Unlock() 720 } 721 setSyncC(s.SyncTicker.C) 722 if s.compactor != nil { 723 s.compactor.Resume() 724 } 725 } 726 727 // TODO: remove the nil checking 728 // current test utility does not provide the stats 729 if s.stats != nil { 730 s.stats.BecomeLeader() 731 } 732 }, 733 updateCommittedIndex: func(ci uint64) { 734 cci := s.getCommittedIndex() 735 if ci > cci { 736 s.setCommittedIndex(ci) 737 } 738 }, 739 } 740 s.r.start(rh) 741 742 ep := etcdProgress{ 743 confState: sn.Metadata.ConfState, 744 snapi: sn.Metadata.Index, 745 appliedt: sn.Metadata.Term, 746 appliedi: sn.Metadata.Index, 747 } 748 749 defer func() { 750 s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping 751 close(s.stopping) 752 s.wgMu.Unlock() 753 s.cancel() 754 755 sched.Stop() 756 757 // wait for gouroutines before closing raft so wal stays open 758 s.wg.Wait() 759 760 s.SyncTicker.Stop() 761 762 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines 763 // by adding a peer after raft stops the transport 764 s.r.stop() 765 766 // kv, lessor and backend can be nil if running without v3 enabled 767 // or running unit tests. 768 if s.lessor != nil { 769 s.lessor.Stop() 770 } 771 if s.kv != nil { 772 s.kv.Close() 773 } 774 if s.authStore != nil { 775 s.authStore.Close() 776 } 777 if s.be != nil { 778 s.be.Close() 779 } 780 if s.compactor != nil { 781 s.compactor.Stop() 782 } 783 close(s.done) 784 }() 785 786 var expiredLeaseC <-chan []*lease.Lease 787 if s.lessor != nil { 788 expiredLeaseC = s.lessor.ExpiredLeasesC() 789 } 790 791 for { 792 select { 793 case ap := <-s.r.apply(): 794 f := func(context.Context) { s.applyAll(&ep, &ap) } 795 sched.Schedule(f) 796 case leases := <-expiredLeaseC: 797 s.goAttach(func() { 798 // Increases throughput of expired leases deletion process through parallelization 799 c := make(chan struct{}, maxPendingRevokes) 800 for _, lease := range leases { 801 select { 802 case c <- struct{}{}: 803 case <-s.stopping: 804 return 805 } 806 lid := lease.ID 807 s.goAttach(func() { 808 _, lerr := s.LeaseRevoke(s.ctx, &pb.LeaseRevokeRequest{ID: int64(lid)}) 809 if lerr == nil { 810 leaseExpired.Inc() 811 } else { 812 plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error()) 813 } 814 815 <-c 816 }) 817 } 818 }) 819 case err := <-s.errorc: 820 plog.Errorf("%s", err) 821 plog.Infof("the data-dir used by this member must be removed.") 822 return 823 case <-getSyncC(): 824 if s.store.HasTTLKeys() { 825 s.sync(s.Cfg.ReqTimeout()) 826 } 827 case <-s.stop: 828 return 829 } 830 } 831} 832 833func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { 834 s.applySnapshot(ep, apply) 835 s.applyEntries(ep, apply) 836 837 proposalsApplied.Set(float64(ep.appliedi)) 838 s.applyWait.Trigger(ep.appliedi) 839 // wait for the raft routine to finish the disk writes before triggering a 840 // snapshot. or applied index might be greater than the last index in raft 841 // storage, since the raft routine might be slower than apply routine. 842 <-apply.notifyc 843 844 s.triggerSnapshot(ep) 845 select { 846 // snapshot requested via send() 847 case m := <-s.r.msgSnapC: 848 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState) 849 s.sendMergedSnap(merged) 850 default: 851 } 852} 853 854func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { 855 if raft.IsEmptySnap(apply.snapshot) { 856 return 857 } 858 859 plog.Infof("applying snapshot at index %d...", ep.snapi) 860 defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi) 861 862 if apply.snapshot.Metadata.Index <= ep.appliedi { 863 plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1", 864 apply.snapshot.Metadata.Index, ep.appliedi) 865 } 866 867 // wait for raftNode to persist snapshot onto the disk 868 <-apply.notifyc 869 870 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) 871 if err != nil { 872 plog.Panic(err) 873 } 874 875 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. 876 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. 877 if s.lessor != nil { 878 plog.Info("recovering lessor...") 879 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) 880 plog.Info("finished recovering lessor") 881 } 882 883 plog.Info("restoring mvcc store...") 884 885 if err := s.kv.Restore(newbe); err != nil { 886 plog.Panicf("restore KV error: %v", err) 887 } 888 s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) 889 890 plog.Info("finished restoring mvcc store") 891 892 // Closing old backend might block until all the txns 893 // on the backend are finished. 894 // We do not want to wait on closing the old backend. 895 s.bemu.Lock() 896 oldbe := s.be 897 go func() { 898 plog.Info("closing old backend...") 899 defer plog.Info("finished closing old backend") 900 901 if err := oldbe.Close(); err != nil { 902 plog.Panicf("close backend error: %v", err) 903 } 904 }() 905 906 s.be = newbe 907 s.bemu.Unlock() 908 909 plog.Info("recovering alarms...") 910 if err := s.restoreAlarms(); err != nil { 911 plog.Panicf("restore alarms error: %v", err) 912 } 913 plog.Info("finished recovering alarms") 914 915 if s.authStore != nil { 916 plog.Info("recovering auth store...") 917 s.authStore.Recover(newbe) 918 plog.Info("finished recovering auth store") 919 } 920 921 plog.Info("recovering store v2...") 922 if err := s.store.Recovery(apply.snapshot.Data); err != nil { 923 plog.Panicf("recovery store error: %v", err) 924 } 925 plog.Info("finished recovering store v2") 926 927 s.cluster.SetBackend(s.be) 928 plog.Info("recovering cluster configuration...") 929 s.cluster.Recover(api.UpdateCapability) 930 plog.Info("finished recovering cluster configuration") 931 932 plog.Info("removing old peers from network...") 933 // recover raft transport 934 s.r.transport.RemoveAllPeers() 935 plog.Info("finished removing old peers from network") 936 937 plog.Info("adding peers from new cluster configuration into network...") 938 for _, m := range s.cluster.Members() { 939 if m.ID == s.ID() { 940 continue 941 } 942 s.r.transport.AddPeer(m.ID, m.PeerURLs) 943 } 944 plog.Info("finished adding peers from new cluster configuration into network...") 945 946 ep.appliedt = apply.snapshot.Metadata.Term 947 ep.appliedi = apply.snapshot.Metadata.Index 948 ep.snapi = ep.appliedi 949 ep.confState = apply.snapshot.Metadata.ConfState 950} 951 952func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) { 953 if len(apply.entries) == 0 { 954 return 955 } 956 firsti := apply.entries[0].Index 957 if firsti > ep.appliedi+1 { 958 plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi) 959 } 960 var ents []raftpb.Entry 961 if ep.appliedi+1-firsti < uint64(len(apply.entries)) { 962 ents = apply.entries[ep.appliedi+1-firsti:] 963 } 964 if len(ents) == 0 { 965 return 966 } 967 var shouldstop bool 968 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { 969 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) 970 } 971} 972 973func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { 974 if ep.appliedi-ep.snapi <= s.snapCount { 975 return 976 } 977 978 plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi) 979 s.snapshot(ep.appliedi, ep.confState) 980 ep.snapi = ep.appliedi 981} 982 983func (s *EtcdServer) isMultiNode() bool { 984 return s.cluster != nil && len(s.cluster.MemberIDs()) > 1 985} 986 987func (s *EtcdServer) isLeader() bool { 988 return uint64(s.ID()) == s.Lead() 989} 990 991// transferLeadership transfers the leader to the given transferee. 992// TODO: maybe expose to client? 993func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error { 994 now := time.Now() 995 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond 996 997 plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee)) 998 s.r.TransferLeadership(ctx, lead, transferee) 999 for s.Lead() != transferee { 1000 select { 1001 case <-ctx.Done(): // time out 1002 return ErrTimeoutLeaderTransfer 1003 case <-time.After(interval): 1004 } 1005 } 1006 1007 // TODO: drain all requests, or drop all messages to the old leader 1008 1009 plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now)) 1010 return nil 1011} 1012 1013// TransferLeadership transfers the leader to the chosen transferee. 1014func (s *EtcdServer) TransferLeadership() error { 1015 if !s.isLeader() { 1016 plog.Printf("skipped leadership transfer for stopping non-leader member") 1017 return nil 1018 } 1019 1020 if !s.isMultiNode() { 1021 plog.Printf("skipped leadership transfer for single member cluster") 1022 return nil 1023 } 1024 1025 transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs()) 1026 if !ok { 1027 return ErrUnhealthy 1028 } 1029 1030 tm := s.Cfg.ReqTimeout() 1031 ctx, cancel := context.WithTimeout(s.ctx, tm) 1032 err := s.transferLeadership(ctx, s.Lead(), uint64(transferee)) 1033 cancel() 1034 return err 1035} 1036 1037// HardStop stops the server without coordination with other members in the cluster. 1038func (s *EtcdServer) HardStop() { 1039 select { 1040 case s.stop <- struct{}{}: 1041 case <-s.done: 1042 return 1043 } 1044 <-s.done 1045} 1046 1047// Stop stops the server gracefully, and shuts down the running goroutine. 1048// Stop should be called after a Start(s), otherwise it will block forever. 1049// When stopping leader, Stop transfers its leadership to one of its peers 1050// before stopping the server. 1051func (s *EtcdServer) Stop() { 1052 if err := s.TransferLeadership(); err != nil { 1053 plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err) 1054 } 1055 s.HardStop() 1056} 1057 1058// ReadyNotify returns a channel that will be closed when the server 1059// is ready to serve client requests 1060func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych } 1061 1062func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { 1063 select { 1064 case <-time.After(d): 1065 case <-s.done: 1066 } 1067 select { 1068 case s.errorc <- err: 1069 default: 1070 } 1071} 1072 1073// StopNotify returns a channel that receives a empty struct 1074// when the server is stopped. 1075func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } 1076 1077func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } 1078 1079func (s *EtcdServer) LeaderStats() []byte { 1080 lead := atomic.LoadUint64(&s.r.lead) 1081 if lead != uint64(s.id) { 1082 return nil 1083 } 1084 return s.lstats.JSON() 1085} 1086 1087func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } 1088 1089func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error { 1090 if s.authStore == nil { 1091 // In the context of ordinary etcd process, s.authStore will never be nil. 1092 // This branch is for handling cases in server_test.go 1093 return nil 1094 } 1095 1096 // Note that this permission check is done in the API layer, 1097 // so TOCTOU problem can be caused potentially in a schedule like this: 1098 // update membership with user A -> revoke root role of A -> apply membership change 1099 // in the state machine layer 1100 // However, both of membership change and role management requires the root privilege. 1101 // So careful operation by admins can prevent the problem. 1102 authInfo, err := s.AuthInfoFromCtx(ctx) 1103 if err != nil { 1104 return err 1105 } 1106 1107 return s.AuthStore().IsAdminPermitted(authInfo) 1108} 1109 1110func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) { 1111 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1112 return nil, err 1113 } 1114 1115 if s.Cfg.StrictReconfigCheck { 1116 // by default StrictReconfigCheck is enabled; reject new members if unhealthy 1117 if !s.cluster.IsReadyToAddNewMember() { 1118 plog.Warningf("not enough started members, rejecting member add %+v", memb) 1119 return nil, ErrNotEnoughStartedMembers 1120 } 1121 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) { 1122 plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb) 1123 return nil, ErrUnhealthy 1124 } 1125 } 1126 1127 // TODO: move Member to protobuf type 1128 b, err := json.Marshal(memb) 1129 if err != nil { 1130 return nil, err 1131 } 1132 cc := raftpb.ConfChange{ 1133 Type: raftpb.ConfChangeAddNode, 1134 NodeID: uint64(memb.ID), 1135 Context: b, 1136 } 1137 return s.configure(ctx, cc) 1138} 1139 1140func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) { 1141 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1142 return nil, err 1143 } 1144 1145 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss 1146 if err := s.mayRemoveMember(types.ID(id)); err != nil { 1147 return nil, err 1148 } 1149 1150 cc := raftpb.ConfChange{ 1151 Type: raftpb.ConfChangeRemoveNode, 1152 NodeID: id, 1153 } 1154 return s.configure(ctx, cc) 1155} 1156 1157func (s *EtcdServer) mayRemoveMember(id types.ID) error { 1158 if !s.Cfg.StrictReconfigCheck { 1159 return nil 1160 } 1161 1162 if !s.cluster.IsReadyToRemoveMember(uint64(id)) { 1163 plog.Warningf("not enough started members, rejecting remove member %s", id) 1164 return ErrNotEnoughStartedMembers 1165 } 1166 1167 // downed member is safe to remove since it's not part of the active quorum 1168 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() { 1169 return nil 1170 } 1171 1172 // protect quorum if some members are down 1173 m := s.cluster.Members() 1174 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m) 1175 if (active - 1) < 1+((len(m)-1)/2) { 1176 plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id) 1177 return ErrUnhealthy 1178 } 1179 1180 return nil 1181} 1182 1183func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) { 1184 b, merr := json.Marshal(memb) 1185 if merr != nil { 1186 return nil, merr 1187 } 1188 1189 if err := s.checkMembershipOperationPermission(ctx); err != nil { 1190 return nil, err 1191 } 1192 cc := raftpb.ConfChange{ 1193 Type: raftpb.ConfChangeUpdateNode, 1194 NodeID: uint64(memb.ID), 1195 Context: b, 1196 } 1197 return s.configure(ctx, cc) 1198} 1199 1200// Implement the RaftTimer interface 1201 1202func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) } 1203 1204func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) } 1205 1206// Lead is only for testing purposes. 1207// TODO: add Raft server interface to expose raft related info: 1208// Index, Term, Lead, Committed, Applied, LastIndex, etc. 1209func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) } 1210 1211func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } 1212 1213type confChangeResponse struct { 1214 membs []*membership.Member 1215 err error 1216} 1217 1218// configure sends a configuration change through consensus and 1219// then waits for it to be applied to the server. It 1220// will block until the change is performed or there is an error. 1221func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) { 1222 cc.ID = s.reqIDGen.Next() 1223 ch := s.w.Register(cc.ID) 1224 start := time.Now() 1225 if err := s.r.ProposeConfChange(ctx, cc); err != nil { 1226 s.w.Trigger(cc.ID, nil) 1227 return nil, err 1228 } 1229 select { 1230 case x := <-ch: 1231 if x == nil { 1232 plog.Panicf("configure trigger value should never be nil") 1233 } 1234 resp := x.(*confChangeResponse) 1235 return resp.membs, resp.err 1236 case <-ctx.Done(): 1237 s.w.Trigger(cc.ID, nil) // GC wait 1238 return nil, s.parseProposeCtxErr(ctx.Err(), start) 1239 case <-s.stopping: 1240 return nil, ErrStopped 1241 } 1242} 1243 1244// sync proposes a SYNC request and is non-blocking. 1245// This makes no guarantee that the request will be proposed or performed. 1246// The request will be canceled after the given timeout. 1247func (s *EtcdServer) sync(timeout time.Duration) { 1248 req := pb.Request{ 1249 Method: "SYNC", 1250 ID: s.reqIDGen.Next(), 1251 Time: time.Now().UnixNano(), 1252 } 1253 data := pbutil.MustMarshal(&req) 1254 // There is no promise that node has leader when do SYNC request, 1255 // so it uses goroutine to propose. 1256 ctx, cancel := context.WithTimeout(s.ctx, timeout) 1257 s.goAttach(func() { 1258 s.r.Propose(ctx, data) 1259 cancel() 1260 }) 1261} 1262 1263// publish registers server information into the cluster. The information 1264// is the JSON representation of this server's member struct, updated with the 1265// static clientURLs of the server. 1266// The function keeps attempting to register until it succeeds, 1267// or its server is stopped. 1268func (s *EtcdServer) publish(timeout time.Duration) { 1269 b, err := json.Marshal(s.attributes) 1270 if err != nil { 1271 plog.Panicf("json marshal error: %v", err) 1272 return 1273 } 1274 req := pb.Request{ 1275 Method: "PUT", 1276 Path: membership.MemberAttributesStorePath(s.id), 1277 Val: string(b), 1278 } 1279 1280 for { 1281 ctx, cancel := context.WithTimeout(s.ctx, timeout) 1282 _, err := s.Do(ctx, req) 1283 cancel() 1284 switch err { 1285 case nil: 1286 close(s.readych) 1287 plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID()) 1288 return 1289 case ErrStopped: 1290 plog.Infof("aborting publish because server is stopped") 1291 return 1292 default: 1293 plog.Errorf("publish error: %v", err) 1294 } 1295 } 1296} 1297 1298func (s *EtcdServer) sendMergedSnap(merged snap.Message) { 1299 atomic.AddInt64(&s.inflightSnapshots, 1) 1300 1301 s.r.transport.SendSnapshot(merged) 1302 s.goAttach(func() { 1303 select { 1304 case ok := <-merged.CloseNotify(): 1305 // delay releasing inflight snapshot for another 30 seconds to 1306 // block log compaction. 1307 // If the follower still fails to catch up, it is probably just too slow 1308 // to catch up. We cannot avoid the snapshot cycle anyway. 1309 if ok { 1310 select { 1311 case <-time.After(releaseDelayAfterSnapshot): 1312 case <-s.stopping: 1313 } 1314 } 1315 atomic.AddInt64(&s.inflightSnapshots, -1) 1316 case <-s.stopping: 1317 return 1318 } 1319 }) 1320} 1321 1322// apply takes entries received from Raft (after it has been committed) and 1323// applies them to the current state of the EtcdServer. 1324// The given entries should not be empty. 1325func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) { 1326 for i := range es { 1327 e := es[i] 1328 switch e.Type { 1329 case raftpb.EntryNormal: 1330 s.applyEntryNormal(&e) 1331 case raftpb.EntryConfChange: 1332 // set the consistent index of current executing entry 1333 if e.Index > s.consistIndex.ConsistentIndex() { 1334 s.consistIndex.setConsistentIndex(e.Index) 1335 } 1336 var cc raftpb.ConfChange 1337 pbutil.MustUnmarshal(&cc, e.Data) 1338 removedSelf, err := s.applyConfChange(cc, confState) 1339 s.setAppliedIndex(e.Index) 1340 shouldStop = shouldStop || removedSelf 1341 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) 1342 default: 1343 plog.Panicf("entry type should be either EntryNormal or EntryConfChange") 1344 } 1345 atomic.StoreUint64(&s.r.index, e.Index) 1346 atomic.StoreUint64(&s.r.term, e.Term) 1347 appliedt = e.Term 1348 appliedi = e.Index 1349 } 1350 return appliedt, appliedi, shouldStop 1351} 1352 1353// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer 1354func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { 1355 shouldApplyV3 := false 1356 if e.Index > s.consistIndex.ConsistentIndex() { 1357 // set the consistent index of current executing entry 1358 s.consistIndex.setConsistentIndex(e.Index) 1359 shouldApplyV3 = true 1360 } 1361 defer s.setAppliedIndex(e.Index) 1362 1363 // raft state machine may generate noop entry when leader confirmation. 1364 // skip it in advance to avoid some potential bug in the future 1365 if len(e.Data) == 0 { 1366 select { 1367 case s.forceVersionC <- struct{}{}: 1368 default: 1369 } 1370 // promote lessor when the local member is leader and finished 1371 // applying all entries from the last term. 1372 if s.isLeader() { 1373 s.lessor.Promote(s.Cfg.electionTimeout()) 1374 } 1375 return 1376 } 1377 1378 var raftReq pb.InternalRaftRequest 1379 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible 1380 var r pb.Request 1381 pbutil.MustUnmarshal(&r, e.Data) 1382 s.w.Trigger(r.ID, s.applyV2Request(&r)) 1383 return 1384 } 1385 if raftReq.V2 != nil { 1386 req := raftReq.V2 1387 s.w.Trigger(req.ID, s.applyV2Request(req)) 1388 return 1389 } 1390 1391 // do not re-apply applied entries. 1392 if !shouldApplyV3 { 1393 return 1394 } 1395 1396 id := raftReq.ID 1397 if id == 0 { 1398 id = raftReq.Header.ID 1399 } 1400 1401 var ar *applyResult 1402 needResult := s.w.IsRegistered(id) 1403 if needResult || !noSideEffect(&raftReq) { 1404 if !needResult && raftReq.Txn != nil { 1405 removeNeedlessRangeReqs(raftReq.Txn) 1406 } 1407 ar = s.applyV3.Apply(&raftReq) 1408 } 1409 1410 if ar == nil { 1411 return 1412 } 1413 1414 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { 1415 s.w.Trigger(id, ar) 1416 return 1417 } 1418 1419 plog.Errorf("applying raft message exceeded backend quota") 1420 s.goAttach(func() { 1421 a := &pb.AlarmRequest{ 1422 MemberID: uint64(s.ID()), 1423 Action: pb.AlarmRequest_ACTIVATE, 1424 Alarm: pb.AlarmType_NOSPACE, 1425 } 1426 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) 1427 s.w.Trigger(id, ar) 1428 }) 1429} 1430 1431// applyConfChange applies a ConfChange to the server. It is only 1432// invoked with a ConfChange that has already passed through Raft 1433func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { 1434 if err := s.cluster.ValidateConfigurationChange(cc); err != nil { 1435 cc.NodeID = raft.None 1436 s.r.ApplyConfChange(cc) 1437 return false, err 1438 } 1439 *confState = *s.r.ApplyConfChange(cc) 1440 switch cc.Type { 1441 case raftpb.ConfChangeAddNode: 1442 m := new(membership.Member) 1443 if err := json.Unmarshal(cc.Context, m); err != nil { 1444 plog.Panicf("unmarshal member should never fail: %v", err) 1445 } 1446 if cc.NodeID != uint64(m.ID) { 1447 plog.Panicf("nodeID should always be equal to member ID") 1448 } 1449 s.cluster.AddMember(m) 1450 if m.ID != s.id { 1451 s.r.transport.AddPeer(m.ID, m.PeerURLs) 1452 } 1453 case raftpb.ConfChangeRemoveNode: 1454 id := types.ID(cc.NodeID) 1455 s.cluster.RemoveMember(id) 1456 if id == s.id { 1457 return true, nil 1458 } 1459 s.r.transport.RemovePeer(id) 1460 case raftpb.ConfChangeUpdateNode: 1461 m := new(membership.Member) 1462 if err := json.Unmarshal(cc.Context, m); err != nil { 1463 plog.Panicf("unmarshal member should never fail: %v", err) 1464 } 1465 if cc.NodeID != uint64(m.ID) { 1466 plog.Panicf("nodeID should always be equal to member ID") 1467 } 1468 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) 1469 if m.ID != s.id { 1470 s.r.transport.UpdatePeer(m.ID, m.PeerURLs) 1471 } 1472 } 1473 return false, nil 1474} 1475 1476// TODO: non-blocking snapshot 1477func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { 1478 clone := s.store.Clone() 1479 // commit kv to write metadata (for example: consistent index) to disk. 1480 // KV().commit() updates the consistent index in backend. 1481 // All operations that update consistent index must be called sequentially 1482 // from applyAll function. 1483 // So KV().Commit() cannot run in parallel with apply. It has to be called outside 1484 // the go routine created below. 1485 s.KV().Commit() 1486 1487 s.goAttach(func() { 1488 d, err := clone.SaveNoCopy() 1489 // TODO: current store will never fail to do a snapshot 1490 // what should we do if the store might fail? 1491 if err != nil { 1492 plog.Panicf("store save should never fail: %v", err) 1493 } 1494 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) 1495 if err != nil { 1496 // the snapshot was done asynchronously with the progress of raft. 1497 // raft might have already got a newer snapshot. 1498 if err == raft.ErrSnapOutOfDate { 1499 return 1500 } 1501 plog.Panicf("unexpected create snapshot error %v", err) 1502 } 1503 // SaveSnap saves the snapshot and releases the locked wal files 1504 // to the snapshot index. 1505 if err = s.r.storage.SaveSnap(snap); err != nil { 1506 plog.Fatalf("save snapshot error: %v", err) 1507 } 1508 plog.Infof("saved snapshot at index %d", snap.Metadata.Index) 1509 1510 // When sending a snapshot, etcd will pause compaction. 1511 // After receives a snapshot, the slow follower needs to get all the entries right after 1512 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after 1513 // the snapshot sent might already be compacted. It happens when the snapshot takes long time 1514 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. 1515 if atomic.LoadInt64(&s.inflightSnapshots) != 0 { 1516 plog.Infof("skip compaction since there is an inflight snapshot") 1517 return 1518 } 1519 1520 // keep some in memory log entries for slow followers. 1521 compacti := uint64(1) 1522 if snapi > numberOfCatchUpEntries { 1523 compacti = snapi - numberOfCatchUpEntries 1524 } 1525 err = s.r.raftStorage.Compact(compacti) 1526 if err != nil { 1527 // the compaction was done asynchronously with the progress of raft. 1528 // raft log might already been compact. 1529 if err == raft.ErrCompacted { 1530 return 1531 } 1532 plog.Panicf("unexpected compaction error %v", err) 1533 } 1534 plog.Infof("compacted raft log at %d", compacti) 1535 }) 1536} 1537 1538// CutPeer drops messages to the specified peer. 1539func (s *EtcdServer) CutPeer(id types.ID) { 1540 tr, ok := s.r.transport.(*rafthttp.Transport) 1541 if ok { 1542 tr.CutPeer(id) 1543 } 1544} 1545 1546// MendPeer recovers the message dropping behavior of the given peer. 1547func (s *EtcdServer) MendPeer(id types.ID) { 1548 tr, ok := s.r.transport.(*rafthttp.Transport) 1549 if ok { 1550 tr.MendPeer(id) 1551 } 1552} 1553 1554func (s *EtcdServer) PauseSending() { s.r.pauseSending() } 1555 1556func (s *EtcdServer) ResumeSending() { s.r.resumeSending() } 1557 1558func (s *EtcdServer) ClusterVersion() *semver.Version { 1559 if s.cluster == nil { 1560 return nil 1561 } 1562 return s.cluster.Version() 1563} 1564 1565// monitorVersions checks the member's version every monitorVersionInterval. 1566// It updates the cluster version if all members agrees on a higher one. 1567// It prints out log if there is a member with a higher version than the 1568// local version. 1569func (s *EtcdServer) monitorVersions() { 1570 for { 1571 select { 1572 case <-s.forceVersionC: 1573 case <-time.After(monitorVersionInterval): 1574 case <-s.stopping: 1575 return 1576 } 1577 1578 if s.Leader() != s.ID() { 1579 continue 1580 } 1581 1582 v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt)) 1583 if v != nil { 1584 // only keep major.minor version for comparison 1585 v = &semver.Version{ 1586 Major: v.Major, 1587 Minor: v.Minor, 1588 } 1589 } 1590 1591 // if the current version is nil: 1592 // 1. use the decided version if possible 1593 // 2. or use the min cluster version 1594 if s.cluster.Version() == nil { 1595 verStr := version.MinClusterVersion 1596 if v != nil { 1597 verStr = v.String() 1598 } 1599 s.goAttach(func() { s.updateClusterVersion(verStr) }) 1600 continue 1601 } 1602 1603 // update cluster version only if the decided version is greater than 1604 // the current cluster version 1605 if v != nil && s.cluster.Version().LessThan(*v) { 1606 s.goAttach(func() { s.updateClusterVersion(v.String()) }) 1607 } 1608 } 1609} 1610 1611func (s *EtcdServer) updateClusterVersion(ver string) { 1612 if s.cluster.Version() == nil { 1613 plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver)) 1614 } else { 1615 plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver)) 1616 } 1617 req := pb.Request{ 1618 Method: "PUT", 1619 Path: membership.StoreClusterVersionKey(), 1620 Val: ver, 1621 } 1622 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) 1623 _, err := s.Do(ctx, req) 1624 cancel() 1625 switch err { 1626 case nil: 1627 return 1628 case ErrStopped: 1629 plog.Infof("aborting update cluster version because server is stopped") 1630 return 1631 default: 1632 plog.Errorf("error updating cluster version (%v)", err) 1633 } 1634} 1635 1636func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { 1637 switch err { 1638 case context.Canceled: 1639 return ErrCanceled 1640 case context.DeadlineExceeded: 1641 s.leadTimeMu.RLock() 1642 curLeadElected := s.leadElectedTime 1643 s.leadTimeMu.RUnlock() 1644 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) 1645 if start.After(prevLeadLost) && start.Before(curLeadElected) { 1646 return ErrTimeoutDueToLeaderFail 1647 } 1648 1649 lead := types.ID(atomic.LoadUint64(&s.r.lead)) 1650 switch lead { 1651 case types.ID(raft.None): 1652 // TODO: return error to specify it happens because the cluster does not have leader now 1653 case s.ID(): 1654 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) { 1655 return ErrTimeoutDueToConnectionLost 1656 } 1657 default: 1658 if !isConnectedSince(s.r.transport, start, lead) { 1659 return ErrTimeoutDueToConnectionLost 1660 } 1661 } 1662 1663 return ErrTimeout 1664 default: 1665 return err 1666 } 1667} 1668 1669func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } 1670func (s *EtcdServer) Backend() backend.Backend { 1671 s.bemu.Lock() 1672 defer s.bemu.Unlock() 1673 return s.be 1674} 1675 1676func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore } 1677 1678func (s *EtcdServer) restoreAlarms() error { 1679 s.applyV3 = s.newApplierV3() 1680 as, err := alarm.NewAlarmStore(s) 1681 if err != nil { 1682 return err 1683 } 1684 s.alarmStore = as 1685 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 { 1686 s.applyV3 = newApplierV3Capped(s.applyV3) 1687 } 1688 return nil 1689} 1690 1691func (s *EtcdServer) getAppliedIndex() uint64 { 1692 return atomic.LoadUint64(&s.appliedIndex) 1693} 1694 1695func (s *EtcdServer) setAppliedIndex(v uint64) { 1696 atomic.StoreUint64(&s.appliedIndex, v) 1697} 1698 1699func (s *EtcdServer) getCommittedIndex() uint64 { 1700 return atomic.LoadUint64(&s.committedIndex) 1701} 1702 1703func (s *EtcdServer) setCommittedIndex(v uint64) { 1704 atomic.StoreUint64(&s.committedIndex, v) 1705} 1706 1707// goAttach creates a goroutine on a given function and tracks it using 1708// the etcdserver waitgroup. 1709func (s *EtcdServer) goAttach(f func()) { 1710 s.wgMu.RLock() // this blocks with ongoing close(s.stopping) 1711 defer s.wgMu.RUnlock() 1712 select { 1713 case <-s.stopping: 1714 plog.Warning("server has stopped (skipping goAttach)") 1715 return 1716 default: 1717 } 1718 1719 // now safe to add since waitgroup wait has not started yet 1720 s.wg.Add(1) 1721 go func() { 1722 defer s.wg.Done() 1723 f() 1724 }() 1725} 1726