1package nomad 2 3import ( 4 "context" 5 "crypto/tls" 6 "fmt" 7 "io/ioutil" 8 "net" 9 "net/rpc" 10 "os" 11 "path/filepath" 12 "sort" 13 "strconv" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "github.com/armon/go-metrics" 19 "github.com/hashicorp/consul/agent/consul/autopilot" 20 consulapi "github.com/hashicorp/consul/api" 21 "github.com/hashicorp/consul/lib" 22 log "github.com/hashicorp/go-hclog" 23 multierror "github.com/hashicorp/go-multierror" 24 lru "github.com/hashicorp/golang-lru" 25 "github.com/hashicorp/nomad/command/agent/consul" 26 "github.com/hashicorp/nomad/helper/codec" 27 "github.com/hashicorp/nomad/helper/pool" 28 "github.com/hashicorp/nomad/helper/stats" 29 "github.com/hashicorp/nomad/helper/tlsutil" 30 "github.com/hashicorp/nomad/nomad/deploymentwatcher" 31 "github.com/hashicorp/nomad/nomad/drainer" 32 "github.com/hashicorp/nomad/nomad/state" 33 "github.com/hashicorp/nomad/nomad/structs" 34 "github.com/hashicorp/nomad/nomad/structs/config" 35 "github.com/hashicorp/nomad/scheduler" 36 "github.com/hashicorp/raft" 37 raftboltdb "github.com/hashicorp/raft-boltdb" 38 "github.com/hashicorp/serf/serf" 39) 40 41const ( 42 // datacenterQueryLimit sets the max number of DCs that a Nomad 43 // Server will query to find bootstrap_expect servers. 44 datacenterQueryLimit = 25 45 46 // maxStaleLeadership is the maximum time we will permit this Nomad 47 // Server to go without seeing a valid Raft leader. 48 maxStaleLeadership = 15 * time.Second 49 50 // peersPollInterval is used as the polling interval between attempts 51 // to query Consul for Nomad Servers. 52 peersPollInterval = 45 * time.Second 53 54 // peersPollJitter is used to provide a slight amount of variance to 55 // the retry interval when querying Consul Servers 56 peersPollJitterFactor = 2 57 58 raftState = "raft/" 59 serfSnapshot = "serf/snapshot" 60 snapshotsRetained = 2 61 62 // serverRPCCache controls how long we keep an idle connection open to a server 63 serverRPCCache = 2 * time.Minute 64 65 // serverMaxStreams controls how many idle streams we keep open to a server 66 serverMaxStreams = 64 67 68 // raftLogCacheSize is the maximum number of logs to cache in-memory. 69 // This is used to reduce disk I/O for the recently committed entries. 70 raftLogCacheSize = 512 71 72 // raftRemoveGracePeriod is how long we wait to allow a RemovePeer 73 // to replicate to gracefully leave the cluster. 74 raftRemoveGracePeriod = 5 * time.Second 75 76 // defaultConsulDiscoveryInterval is how often to poll Consul for new 77 // servers if there is no leader. 78 defaultConsulDiscoveryInterval time.Duration = 3 * time.Second 79 80 // defaultConsulDiscoveryIntervalRetry is how often to poll Consul for 81 // new servers if there is no leader and the last Consul query failed. 82 defaultConsulDiscoveryIntervalRetry time.Duration = 9 * time.Second 83 84 // aclCacheSize is the number of ACL objects to keep cached. ACLs have a parsing and 85 // construction cost, so we keep the hot objects cached to reduce the ACL token resolution time. 86 aclCacheSize = 512 87) 88 89// Server is Nomad server which manages the job queues, 90// schedulers, and notification bus for agents. 91type Server struct { 92 config *Config 93 94 logger log.InterceptLogger 95 96 // Connection pool to other Nomad servers 97 connPool *pool.ConnPool 98 99 // The raft instance is used among Nomad nodes within the 100 // region to protect operations that require strong consistency 101 leaderCh <-chan bool 102 raft *raft.Raft 103 raftLayer *RaftLayer 104 raftStore *raftboltdb.BoltStore 105 raftInmem *raft.InmemStore 106 raftTransport *raft.NetworkTransport 107 108 // autopilot is the Autopilot instance for this server. 109 autopilot *autopilot.Autopilot 110 111 // fsm is the state machine used with Raft 112 fsm *nomadFSM 113 114 // rpcListener is used to listen for incoming connections 115 rpcListener net.Listener 116 listenerCh chan struct{} 117 118 // tlsWrap is used to wrap outbound connections using TLS. It should be 119 // accessed using the lock. 120 tlsWrap tlsutil.RegionWrapper 121 tlsWrapLock sync.RWMutex 122 123 // TODO(alex,hclog): Can I move more into the handler? 124 // rpcHandler is used to serve and handle RPCs 125 *rpcHandler 126 127 // rpcServer is the static RPC server that is used by the local agent. 128 rpcServer *rpc.Server 129 130 // clientRpcAdvertise is the advertised RPC address for Nomad clients to connect 131 // to this server 132 clientRpcAdvertise net.Addr 133 134 // serverRpcAdvertise is the advertised RPC address for Nomad servers to connect 135 // to this server 136 serverRpcAdvertise net.Addr 137 138 // rpcTLS is the TLS config for incoming TLS requests 139 rpcTLS *tls.Config 140 rpcCancel context.CancelFunc 141 142 // staticEndpoints is the set of static endpoints that can be reused across 143 // all RPC connections 144 staticEndpoints endpoints 145 146 // streamingRpcs is the registry holding our streaming RPC handlers. 147 streamingRpcs *structs.StreamingRpcRegistry 148 149 // nodeConns is the set of multiplexed node connections we have keyed by 150 // NodeID 151 nodeConns map[string][]*nodeConnState 152 nodeConnsLock sync.RWMutex 153 154 // peers is used to track the known Nomad servers. This is 155 // used for region forwarding and clustering. 156 peers map[string][]*serverParts 157 localPeers map[raft.ServerAddress]*serverParts 158 peerLock sync.RWMutex 159 160 // serf is the Serf cluster containing only Nomad 161 // servers. This is used for multi-region federation 162 // and automatic clustering within regions. 163 serf *serf.Serf 164 165 // reconcileCh is used to pass events from the serf handler 166 // into the leader manager. Mostly used to handle when servers 167 // join/leave from the region. 168 reconcileCh chan serf.Member 169 170 // used to track when the server is ready to serve consistent reads, updated atomically 171 readyForConsistentReads int32 172 173 // eventCh is used to receive events from the serf cluster 174 eventCh chan serf.Event 175 176 // BlockedEvals is used to manage evaluations that are blocked on node 177 // capacity changes. 178 blockedEvals *BlockedEvals 179 180 // deploymentWatcher is used to watch deployments and their allocations and 181 // make the required calls to continue to transition the deployment. 182 deploymentWatcher *deploymentwatcher.Watcher 183 184 // nodeDrainer is used to drain allocations from nodes. 185 nodeDrainer *drainer.NodeDrainer 186 187 // evalBroker is used to manage the in-progress evaluations 188 // that are waiting to be brokered to a sub-scheduler 189 evalBroker *EvalBroker 190 191 // periodicDispatcher is used to track and create evaluations for periodic jobs. 192 periodicDispatcher *PeriodicDispatch 193 194 // planner is used to mange the submitted allocation plans that are waiting 195 // to be accessed by the leader 196 *planner 197 198 // nodeHeartbeater is used to track expiration times of node heartbeats. If it 199 // detects an expired node, the node status is updated to be 'down'. 200 *nodeHeartbeater 201 202 // consulCatalog is used for discovering other Nomad Servers via Consul 203 consulCatalog consul.CatalogAPI 204 205 // vault is the client for communicating with Vault. 206 vault VaultClient 207 208 // Worker used for processing 209 workers []*Worker 210 211 // aclCache is used to maintain the parsed ACL objects 212 aclCache *lru.TwoQueueCache 213 214 // leaderAcl is the management ACL token that is valid when resolved by the 215 // current leader. 216 leaderAcl string 217 leaderAclLock sync.Mutex 218 219 // statsFetcher is used by autopilot to check the status of the other 220 // Nomad router. 221 statsFetcher *StatsFetcher 222 223 // EnterpriseState is used to fill in state for Pro/Ent builds 224 EnterpriseState 225 226 left bool 227 shutdown bool 228 shutdownLock sync.Mutex 229 230 shutdownCtx context.Context 231 shutdownCancel context.CancelFunc 232 shutdownCh <-chan struct{} 233} 234 235// Holds the RPC endpoints 236type endpoints struct { 237 Status *Status 238 Node *Node 239 Job *Job 240 Eval *Eval 241 Plan *Plan 242 Alloc *Alloc 243 Deployment *Deployment 244 Region *Region 245 Search *Search 246 Periodic *Periodic 247 System *System 248 Operator *Operator 249 ACL *ACL 250 Enterprise *EnterpriseEndpoints 251 252 // Client endpoints 253 ClientStats *ClientStats 254 FileSystem *FileSystem 255 Agent *Agent 256 ClientAllocations *ClientAllocations 257} 258 259// NewServer is used to construct a new Nomad server from the 260// configuration, potentially returning an error 261func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) { 262 // Check the protocol version 263 if err := config.CheckVersion(); err != nil { 264 return nil, err 265 } 266 267 // Create an eval broker 268 evalBroker, err := NewEvalBroker( 269 config.EvalNackTimeout, 270 config.EvalNackInitialReenqueueDelay, 271 config.EvalNackSubsequentReenqueueDelay, 272 config.EvalDeliveryLimit) 273 if err != nil { 274 return nil, err 275 } 276 277 // Configure TLS 278 tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true) 279 if err != nil { 280 return nil, err 281 } 282 incomingTLS, tlsWrap, err := getTLSConf(config.TLSConfig.EnableRPC, tlsConf) 283 if err != nil { 284 return nil, err 285 } 286 287 // Create the ACL object cache 288 aclCache, err := lru.New2Q(aclCacheSize) 289 if err != nil { 290 return nil, err 291 } 292 293 // Create the logger 294 logger := config.Logger.ResetNamedIntercept("nomad") 295 296 // Create the server 297 s := &Server{ 298 config: config, 299 consulCatalog: consulCatalog, 300 connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap), 301 logger: logger, 302 tlsWrap: tlsWrap, 303 rpcServer: rpc.NewServer(), 304 streamingRpcs: structs.NewStreamingRpcRegistry(), 305 nodeConns: make(map[string][]*nodeConnState), 306 peers: make(map[string][]*serverParts), 307 localPeers: make(map[raft.ServerAddress]*serverParts), 308 reconcileCh: make(chan serf.Member, 32), 309 eventCh: make(chan serf.Event, 256), 310 evalBroker: evalBroker, 311 blockedEvals: NewBlockedEvals(evalBroker, logger), 312 rpcTLS: incomingTLS, 313 aclCache: aclCache, 314 } 315 316 s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) 317 s.shutdownCh = s.shutdownCtx.Done() 318 319 // Create the RPC handler 320 s.rpcHandler = newRpcHandler(s) 321 322 // Create the planner 323 planner, err := newPlanner(s) 324 if err != nil { 325 return nil, err 326 } 327 s.planner = planner 328 329 // Create the node heartbeater 330 s.nodeHeartbeater = newNodeHeartbeater(s) 331 332 // Create the periodic dispatcher for launching periodic jobs. 333 s.periodicDispatcher = NewPeriodicDispatch(s.logger, s) 334 335 // Initialize the stats fetcher that autopilot will use. 336 s.statsFetcher = NewStatsFetcher(s.logger, s.connPool, s.config.Region) 337 338 // Setup Vault 339 if err := s.setupVaultClient(); err != nil { 340 s.Shutdown() 341 s.logger.Error("failed to setup Vault client", "error", err) 342 return nil, fmt.Errorf("Failed to setup Vault client: %v", err) 343 } 344 345 // Initialize the RPC layer 346 if err := s.setupRPC(tlsWrap); err != nil { 347 s.Shutdown() 348 s.logger.Error("failed to start RPC layer", "error", err) 349 return nil, fmt.Errorf("Failed to start RPC layer: %v", err) 350 } 351 352 // Initialize the Raft server 353 if err := s.setupRaft(); err != nil { 354 s.Shutdown() 355 s.logger.Error("failed to start Raft", "error", err) 356 return nil, fmt.Errorf("Failed to start Raft: %v", err) 357 } 358 359 // Initialize the wan Serf 360 s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot) 361 if err != nil { 362 s.Shutdown() 363 s.logger.Error("failed to start serf WAN", "error", err) 364 return nil, fmt.Errorf("Failed to start serf: %v", err) 365 } 366 367 // Initialize the scheduling workers 368 if err := s.setupWorkers(); err != nil { 369 s.Shutdown() 370 s.logger.Error("failed to start workers", "error", err) 371 return nil, fmt.Errorf("Failed to start workers: %v", err) 372 } 373 374 // Setup the Consul syncer 375 if err := s.setupConsulSyncer(); err != nil { 376 s.logger.Error("failed to create server consul syncer", "error", err) 377 return nil, fmt.Errorf("failed to create server Consul syncer: %v", err) 378 } 379 380 // Setup the deployment watcher. 381 if err := s.setupDeploymentWatcher(); err != nil { 382 s.logger.Error("failed to create deployment watcher", "error", err) 383 return nil, fmt.Errorf("failed to create deployment watcher: %v", err) 384 } 385 386 // Setup the node drainer. 387 s.setupNodeDrainer() 388 389 // Setup the enterprise state 390 if err := s.setupEnterprise(config); err != nil { 391 return nil, err 392 } 393 394 // Monitor leadership changes 395 go s.monitorLeadership() 396 397 // Start ingesting events for Serf 398 go s.serfEventHandler() 399 400 // start the RPC listener for the server 401 s.startRPCListener() 402 403 // Emit metrics for the eval broker 404 go evalBroker.EmitStats(time.Second, s.shutdownCh) 405 406 // Emit metrics for the plan queue 407 go s.planQueue.EmitStats(time.Second, s.shutdownCh) 408 409 // Emit metrics for the blocked eval tracker. 410 go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) 411 412 // Emit metrics for the Vault client. 413 go s.vault.EmitStats(time.Second, s.shutdownCh) 414 415 // Emit metrics 416 go s.heartbeatStats() 417 418 // Emit raft and state store metrics 419 go s.EmitRaftStats(10*time.Second, s.shutdownCh) 420 421 // Start enterprise background workers 422 s.startEnterpriseBackground() 423 424 // Done 425 return s, nil 426} 427 428// startRPCListener starts the server's the RPC listener 429func (s *Server) startRPCListener() { 430 ctx, cancel := context.WithCancel(context.Background()) 431 s.rpcCancel = cancel 432 go s.listen(ctx) 433} 434 435// createRPCListener creates the server's RPC listener 436func (s *Server) createRPCListener() (*net.TCPListener, error) { 437 s.listenerCh = make(chan struct{}) 438 listener, err := net.ListenTCP("tcp", s.config.RPCAddr) 439 if err != nil { 440 s.logger.Error("failed to initialize TLS listener", "error", err) 441 return listener, err 442 } 443 444 s.rpcListener = listener 445 return listener, nil 446} 447 448// getTLSConf gets the server's TLS configuration based on the config supplied 449// by the operator 450func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) { 451 var tlsWrap tlsutil.RegionWrapper 452 var incomingTLS *tls.Config 453 if enableRPC { 454 tw, err := tlsConf.OutgoingTLSWrapper() 455 if err != nil { 456 return nil, nil, err 457 } 458 tlsWrap = tw 459 460 itls, err := tlsConf.IncomingTLSConfig() 461 if err != nil { 462 return nil, nil, err 463 } 464 incomingTLS = itls 465 } 466 return incomingTLS, tlsWrap, nil 467} 468 469// reloadTLSConnections updates a server's TLS configuration and reloads RPC 470// connections. 471func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { 472 s.logger.Info("reloading server connections due to configuration changes") 473 474 // Check if we can reload the RPC listener 475 if s.rpcListener == nil || s.rpcCancel == nil { 476 s.logger.Warn("unable to reload configuration due to uninitialized rpc listner") 477 return fmt.Errorf("can't reload uninitialized RPC listener") 478 } 479 480 tlsConf, err := tlsutil.NewTLSConfiguration(newTLSConfig, true, true) 481 if err != nil { 482 s.logger.Error("unable to create TLS configuration", "error", err) 483 return err 484 } 485 486 incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf) 487 if err != nil { 488 s.logger.Error("unable to reset TLS context", "error", err) 489 return err 490 } 491 492 // Store the new tls wrapper. 493 s.tlsWrapLock.Lock() 494 s.tlsWrap = tlsWrap 495 s.tlsWrapLock.Unlock() 496 497 // Keeping configuration in sync is important for other places that require 498 // access to config information, such as rpc.go, where we decide on what kind 499 // of network connections to accept depending on the server configuration 500 s.config.TLSConfig = newTLSConfig 501 502 // Kill any old listeners 503 s.rpcCancel() 504 505 s.rpcTLS = incomingTLS 506 s.connPool.ReloadTLS(tlsWrap) 507 508 if err := s.rpcListener.Close(); err != nil { 509 s.logger.Error("unable to close rpc listener", "error", err) 510 return err 511 } 512 513 // Wait for the old listener to exit 514 <-s.listenerCh 515 516 // Create the new listener with the update TLS config 517 listener, err := s.createRPCListener() 518 if err != nil { 519 listener.Close() 520 return err 521 } 522 523 // Start the new RPC listener 524 s.startRPCListener() 525 526 // Close and reload existing Raft connections 527 wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) 528 s.raftLayer.ReloadTLS(wrapper) 529 s.raftTransport.CloseStreams() 530 531 s.logger.Debug("finished reloading server connections") 532 return nil 533} 534 535// Shutdown is used to shutdown the server 536func (s *Server) Shutdown() error { 537 s.logger.Info("shutting down server") 538 s.shutdownLock.Lock() 539 defer s.shutdownLock.Unlock() 540 541 if s.shutdown { 542 return nil 543 } 544 545 s.shutdown = true 546 s.shutdownCancel() 547 548 if s.serf != nil { 549 s.serf.Shutdown() 550 } 551 552 if s.raft != nil { 553 s.raftTransport.Close() 554 s.raftLayer.Close() 555 future := s.raft.Shutdown() 556 if err := future.Error(); err != nil { 557 s.logger.Warn("error shutting down raft", "error", err) 558 } 559 if s.raftStore != nil { 560 s.raftStore.Close() 561 } 562 } 563 564 // Shutdown the RPC listener 565 if s.rpcListener != nil { 566 s.rpcListener.Close() 567 } 568 569 // Close the connection pool 570 s.connPool.Shutdown() 571 572 // Close the fsm 573 if s.fsm != nil { 574 s.fsm.Close() 575 } 576 577 // Stop Vault token renewal 578 if s.vault != nil { 579 s.vault.Stop() 580 } 581 582 return nil 583} 584 585// IsShutdown checks if the server is shutdown 586func (s *Server) IsShutdown() bool { 587 select { 588 case <-s.shutdownCh: 589 return true 590 default: 591 return false 592 } 593} 594 595// Leave is used to prepare for a graceful shutdown of the server 596func (s *Server) Leave() error { 597 s.logger.Info("server starting leave") 598 s.left = true 599 600 // Check the number of known peers 601 numPeers, err := s.numPeers() 602 if err != nil { 603 s.logger.Error("failed to check raft peers during leave", "error", err) 604 return err 605 } 606 607 addr := s.raftTransport.LocalAddr() 608 609 // If we are the current leader, and we have any other peers (cluster has multiple 610 // servers), we should do a RemovePeer to safely reduce the quorum size. If we are 611 // not the leader, then we should issue our leave intention and wait to be removed 612 // for some sane period of time. 613 isLeader := s.IsLeader() 614 if isLeader && numPeers > 1 { 615 minRaftProtocol, err := s.autopilot.MinRaftProtocol() 616 if err != nil { 617 return err 618 } 619 620 if minRaftProtocol >= 2 && s.config.RaftConfig.ProtocolVersion >= 3 { 621 future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0) 622 if err := future.Error(); err != nil { 623 s.logger.Error("failed to remove ourself as raft peer", "error", err) 624 } 625 } else { 626 future := s.raft.RemovePeer(addr) 627 if err := future.Error(); err != nil { 628 s.logger.Error("failed to remove ourself as raft peer", "error", err) 629 } 630 } 631 } 632 633 // Leave the gossip pool 634 if s.serf != nil { 635 if err := s.serf.Leave(); err != nil { 636 s.logger.Error("failed to leave Serf cluster", "error", err) 637 } 638 } 639 640 // If we were not leader, wait to be safely removed from the cluster. 641 // We must wait to allow the raft replication to take place, otherwise 642 // an immediate shutdown could cause a loss of quorum. 643 if !isLeader { 644 left := false 645 limit := time.Now().Add(raftRemoveGracePeriod) 646 for !left && time.Now().Before(limit) { 647 // Sleep a while before we check. 648 time.Sleep(50 * time.Millisecond) 649 650 // Get the latest configuration. 651 future := s.raft.GetConfiguration() 652 if err := future.Error(); err != nil { 653 s.logger.Error("failed to get raft configuration", "error", err) 654 break 655 } 656 657 // See if we are no longer included. 658 left = true 659 for _, server := range future.Configuration().Servers { 660 if server.Address == addr { 661 left = false 662 break 663 } 664 } 665 } 666 667 // TODO (alexdadgar) With the old Raft library we used to force the 668 // peers set to empty when a graceful leave occurred. This would 669 // keep voting spam down if the server was restarted, but it was 670 // dangerous because the peers was inconsistent with the logs and 671 // snapshots, so it wasn't really safe in all cases for the server 672 // to become leader. This is now safe, but the log spam is noisy. 673 // The next new version of the library will have a "you are not a 674 // peer stop it" behavior that should address this. We will have 675 // to evaluate during the RC period if this interim situation is 676 // not too confusing for operators. 677 678 // TODO (alexdadgar) When we take a later new version of the Raft 679 // library it won't try to complete replication, so this peer 680 // may not realize that it has been removed. Need to revisit this 681 // and the warning here. 682 if !left { 683 s.logger.Warn("failed to leave raft configuration gracefully, timeout") 684 } 685 } 686 return nil 687} 688 689// Reload handles a config reload specific to server-only configuration. Not 690// all config fields can handle a reload. 691func (s *Server) Reload(newConfig *Config) error { 692 if newConfig == nil { 693 return fmt.Errorf("Reload given a nil config") 694 } 695 696 var mErr multierror.Error 697 698 // Handle the Vault reload. Vault should never be nil but just guard. 699 if s.vault != nil { 700 if err := s.vault.SetConfig(newConfig.VaultConfig); err != nil { 701 multierror.Append(&mErr, err) 702 } 703 } 704 705 shouldReloadTLS, err := tlsutil.ShouldReloadRPCConnections(s.config.TLSConfig, newConfig.TLSConfig) 706 if err != nil { 707 s.logger.Error("error checking whether to reload TLS configuration", "error", err) 708 } 709 710 if shouldReloadTLS { 711 if err := s.reloadTLSConnections(newConfig.TLSConfig); err != nil { 712 s.logger.Error("error reloading server TLS configuration", "error", err) 713 multierror.Append(&mErr, err) 714 } 715 } 716 717 return mErr.ErrorOrNil() 718} 719 720// setupBootstrapHandler() creates the closure necessary to support a Consul 721// fallback handler. 722func (s *Server) setupBootstrapHandler() error { 723 // peersTimeout is used to indicate to the Consul Syncer that the 724 // current Nomad Server has a stale peer set. peersTimeout will time 725 // out if the Consul Syncer bootstrapFn has not observed a Raft 726 // leader in maxStaleLeadership. If peersTimeout has been triggered, 727 // the Consul Syncer will begin querying Consul for other Nomad 728 // Servers. 729 // 730 // NOTE: time.Timer is used vs time.Time in order to handle clock 731 // drift because time.Timer is implemented as a monotonic clock. 732 var peersTimeout *time.Timer = time.NewTimer(0) 733 734 // consulQueryCount is the number of times the bootstrapFn has been 735 // called, regardless of success. 736 var consulQueryCount uint64 737 738 // leadershipTimedOut is a helper method that returns true if the 739 // peersTimeout timer has expired. 740 leadershipTimedOut := func() bool { 741 select { 742 case <-peersTimeout.C: 743 return true 744 default: 745 return false 746 } 747 } 748 749 // The bootstrapFn callback handler is used to periodically poll 750 // Consul to look up the Nomad Servers in Consul. In the event the 751 // server has been brought up without a `retry-join` configuration 752 // and this Server is partitioned from the rest of the cluster, 753 // periodically poll Consul to reattach this Server to other servers 754 // in the same region and automatically reform a quorum (assuming the 755 // correct number of servers required for quorum are present). 756 bootstrapFn := func() error { 757 // If there is a raft leader, do nothing 758 if s.raft.Leader() != "" { 759 peersTimeout.Reset(maxStaleLeadership) 760 return nil 761 } 762 763 // (ab)use serf.go's behavior of setting BootstrapExpect to 764 // zero if we have bootstrapped. If we have bootstrapped 765 bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) 766 if bootstrapExpect == 0 { 767 // This Nomad Server has been bootstrapped. Rely on 768 // the peersTimeout firing as a guard to prevent 769 // aggressive querying of Consul. 770 if !leadershipTimedOut() { 771 return nil 772 } 773 } else { 774 if consulQueryCount > 0 && !leadershipTimedOut() { 775 return nil 776 } 777 778 // This Nomad Server has not been bootstrapped, reach 779 // out to Consul if our peer list is less than 780 // `bootstrap_expect`. 781 raftPeers, err := s.numPeers() 782 if err != nil { 783 peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) 784 return nil 785 } 786 787 // The necessary number of Nomad Servers required for 788 // quorum has been reached, we do not need to poll 789 // Consul. Let the normal timeout-based strategy 790 // take over. 791 if raftPeers >= int(bootstrapExpect) { 792 peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) 793 return nil 794 } 795 } 796 consulQueryCount++ 797 798 s.logger.Debug("lost contact with Nomad quorum, falling back to Consul for server list") 799 800 dcs, err := s.consulCatalog.Datacenters() 801 if err != nil { 802 peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) 803 return fmt.Errorf("server.nomad: unable to query Consul datacenters: %v", err) 804 } 805 if len(dcs) > 2 { 806 // Query the local DC first, then shuffle the 807 // remaining DCs. If additional calls to bootstrapFn 808 // are necessary, this Nomad Server will eventually 809 // walk all datacenter until it finds enough hosts to 810 // form a quorum. 811 shuffleStrings(dcs[1:]) 812 dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] 813 } 814 815 nomadServerServiceName := s.config.ConsulConfig.ServerServiceName 816 var mErr multierror.Error 817 const defaultMaxNumNomadServers = 8 818 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) 819 localNode := s.serf.Memberlist().LocalNode() 820 for _, dc := range dcs { 821 consulOpts := &consulapi.QueryOptions{ 822 AllowStale: true, 823 Datacenter: dc, 824 Near: "_agent", 825 WaitTime: consul.DefaultQueryWaitDuration, 826 } 827 consulServices, _, err := s.consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) 828 if err != nil { 829 err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err) 830 s.logger.Warn("failed to query Nomad service in Consul datacenter", "service_name", nomadServerServiceName, "dc", dc, "error", err) 831 mErr.Errors = append(mErr.Errors, err) 832 continue 833 } 834 835 for _, cs := range consulServices { 836 port := strconv.FormatInt(int64(cs.ServicePort), 10) 837 addr := cs.ServiceAddress 838 if addr == "" { 839 addr = cs.Address 840 } 841 if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort { 842 continue 843 } 844 serverAddr := net.JoinHostPort(addr, port) 845 nomadServerServices = append(nomadServerServices, serverAddr) 846 } 847 } 848 849 if len(nomadServerServices) == 0 { 850 if len(mErr.Errors) > 0 { 851 peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) 852 return mErr.ErrorOrNil() 853 } 854 855 // Log the error and return nil so future handlers 856 // can attempt to register the `nomad` service. 857 pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor) 858 s.logger.Trace("no Nomad Servers advertising Nomad service in Consul datacenters", "service_name", nomadServerServiceName, "datacenters", dcs, "retry", pollInterval) 859 peersTimeout.Reset(pollInterval) 860 return nil 861 } 862 863 numServersContacted, err := s.Join(nomadServerServices) 864 if err != nil { 865 peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) 866 return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) 867 } 868 869 peersTimeout.Reset(maxStaleLeadership) 870 s.logger.Info("successfully contacted Nomad servers", "num_servers", numServersContacted) 871 872 return nil 873 } 874 875 // Hacky replacement for old ConsulSyncer Periodic Handler. 876 go func() { 877 lastOk := true 878 sync := time.NewTimer(0) 879 for { 880 select { 881 case <-sync.C: 882 d := defaultConsulDiscoveryInterval 883 if err := bootstrapFn(); err != nil { 884 // Only log if it worked last time 885 if lastOk { 886 lastOk = false 887 s.logger.Error("error looking up Nomad servers in Consul", "error", err) 888 } 889 d = defaultConsulDiscoveryIntervalRetry 890 } 891 sync.Reset(d) 892 case <-s.shutdownCh: 893 return 894 } 895 } 896 }() 897 return nil 898} 899 900// setupConsulSyncer creates Server-mode consul.Syncer which periodically 901// executes callbacks on a fixed interval. 902func (s *Server) setupConsulSyncer() error { 903 if s.config.ConsulConfig.ServerAutoJoin != nil && *s.config.ConsulConfig.ServerAutoJoin { 904 if err := s.setupBootstrapHandler(); err != nil { 905 return err 906 } 907 } 908 909 return nil 910} 911 912// setupDeploymentWatcher creates a deployment watcher that consumes the RPC 913// endpoints for state information and makes transitions via Raft through a 914// shim that provides the appropriate methods. 915func (s *Server) setupDeploymentWatcher() error { 916 917 // Create the raft shim type to restrict the set of raft methods that can be 918 // made 919 raftShim := &deploymentWatcherRaftShim{ 920 apply: s.raftApply, 921 } 922 923 // Create the deployment watcher 924 s.deploymentWatcher = deploymentwatcher.NewDeploymentsWatcher( 925 s.logger, raftShim, 926 deploymentwatcher.LimitStateQueriesPerSecond, 927 deploymentwatcher.CrossDeploymentUpdateBatchDuration) 928 929 return nil 930} 931 932// setupNodeDrainer creates a node drainer which will be enabled when a server 933// becomes a leader. 934func (s *Server) setupNodeDrainer() { 935 // Create a shim around Raft requests 936 shim := drainerShim{s} 937 c := &drainer.NodeDrainerConfig{ 938 Logger: s.logger, 939 Raft: shim, 940 JobFactory: drainer.GetDrainingJobWatcher, 941 NodeFactory: drainer.GetNodeWatcherFactory(), 942 DrainDeadlineFactory: drainer.GetDeadlineNotifier, 943 StateQueriesPerSecond: drainer.LimitStateQueriesPerSecond, 944 BatchUpdateInterval: drainer.BatchUpdateInterval, 945 } 946 s.nodeDrainer = drainer.NewNodeDrainer(c) 947} 948 949// setupVaultClient is used to set up the Vault API client. 950func (s *Server) setupVaultClient() error { 951 v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors) 952 if err != nil { 953 return err 954 } 955 s.vault = v 956 return nil 957} 958 959// setupRPC is used to setup the RPC listener 960func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { 961 // Populate the static RPC server 962 s.setupRpcServer(s.rpcServer, nil) 963 964 listener, err := s.createRPCListener() 965 if err != nil { 966 listener.Close() 967 return err 968 } 969 970 if s.config.ClientRPCAdvertise != nil { 971 s.clientRpcAdvertise = s.config.ClientRPCAdvertise 972 } else { 973 s.clientRpcAdvertise = s.rpcListener.Addr() 974 } 975 976 // Verify that we have a usable advertise address 977 clientAddr, ok := s.clientRpcAdvertise.(*net.TCPAddr) 978 if !ok { 979 listener.Close() 980 return fmt.Errorf("Client RPC advertise address is not a TCP Address: %v", clientAddr) 981 } 982 if clientAddr.IP.IsUnspecified() { 983 listener.Close() 984 return fmt.Errorf("Client RPC advertise address is not advertisable: %v", clientAddr) 985 } 986 987 if s.config.ServerRPCAdvertise != nil { 988 s.serverRpcAdvertise = s.config.ServerRPCAdvertise 989 } else { 990 // Default to the Serf Advertise + RPC Port 991 serfIP := s.config.SerfConfig.MemberlistConfig.AdvertiseAddr 992 if serfIP == "" { 993 serfIP = s.config.SerfConfig.MemberlistConfig.BindAddr 994 } 995 996 addr := net.JoinHostPort(serfIP, fmt.Sprintf("%d", clientAddr.Port)) 997 resolved, err := net.ResolveTCPAddr("tcp", addr) 998 if err != nil { 999 return fmt.Errorf("Failed to resolve Server RPC advertise address: %v", err) 1000 } 1001 1002 s.serverRpcAdvertise = resolved 1003 } 1004 1005 // Verify that we have a usable advertise address 1006 serverAddr, ok := s.serverRpcAdvertise.(*net.TCPAddr) 1007 if !ok { 1008 return fmt.Errorf("Server RPC advertise address is not a TCP Address: %v", serverAddr) 1009 } 1010 if serverAddr.IP.IsUnspecified() { 1011 listener.Close() 1012 return fmt.Errorf("Server RPC advertise address is not advertisable: %v", serverAddr) 1013 } 1014 1015 wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) 1016 s.raftLayer = NewRaftLayer(s.serverRpcAdvertise, wrapper) 1017 return nil 1018} 1019 1020// setupRpcServer is used to populate an RPC server with endpoints 1021func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { 1022 // Add the static endpoints to the RPC server. 1023 if s.staticEndpoints.Status == nil { 1024 // Initialize the list just once 1025 s.staticEndpoints.ACL = &ACL{srv: s, logger: s.logger.Named("acl")} 1026 s.staticEndpoints.Alloc = &Alloc{srv: s, logger: s.logger.Named("alloc")} 1027 s.staticEndpoints.Eval = &Eval{srv: s, logger: s.logger.Named("eval")} 1028 s.staticEndpoints.Job = NewJobEndpoints(s) 1029 s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} // Add but don't register 1030 s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")} 1031 s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")} 1032 s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")} 1033 s.staticEndpoints.Plan = &Plan{srv: s, logger: s.logger.Named("plan")} 1034 s.staticEndpoints.Region = &Region{srv: s, logger: s.logger.Named("region")} 1035 s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} 1036 s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} 1037 s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} 1038 s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) 1039 1040 // Client endpoints 1041 s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} 1042 s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")} 1043 s.staticEndpoints.ClientAllocations.register() 1044 1045 // Streaming endpoints 1046 s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} 1047 s.staticEndpoints.FileSystem.register() 1048 1049 s.staticEndpoints.Agent = &Agent{srv: s} 1050 s.staticEndpoints.Agent.register() 1051 } 1052 1053 // Register the static handlers 1054 server.Register(s.staticEndpoints.ACL) 1055 server.Register(s.staticEndpoints.Alloc) 1056 server.Register(s.staticEndpoints.Eval) 1057 server.Register(s.staticEndpoints.Job) 1058 server.Register(s.staticEndpoints.Deployment) 1059 server.Register(s.staticEndpoints.Operator) 1060 server.Register(s.staticEndpoints.Periodic) 1061 server.Register(s.staticEndpoints.Plan) 1062 server.Register(s.staticEndpoints.Region) 1063 server.Register(s.staticEndpoints.Status) 1064 server.Register(s.staticEndpoints.System) 1065 server.Register(s.staticEndpoints.Search) 1066 s.staticEndpoints.Enterprise.Register(server) 1067 server.Register(s.staticEndpoints.ClientStats) 1068 server.Register(s.staticEndpoints.ClientAllocations) 1069 server.Register(s.staticEndpoints.FileSystem) 1070 1071 // Create new dynamic endpoints and add them to the RPC server. 1072 node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} 1073 1074 // Register the dynamic endpoints 1075 server.Register(node) 1076} 1077 1078// setupRaft is used to setup and initialize Raft 1079func (s *Server) setupRaft() error { 1080 // If we have an unclean exit then attempt to close the Raft store. 1081 defer func() { 1082 if s.raft == nil && s.raftStore != nil { 1083 if err := s.raftStore.Close(); err != nil { 1084 s.logger.Error("failed to close Raft store", "error", err) 1085 } 1086 } 1087 }() 1088 1089 // Create the FSM 1090 fsmConfig := &FSMConfig{ 1091 EvalBroker: s.evalBroker, 1092 Periodic: s.periodicDispatcher, 1093 Blocked: s.blockedEvals, 1094 Logger: s.logger, 1095 Region: s.Region(), 1096 } 1097 var err error 1098 s.fsm, err = NewFSM(fsmConfig) 1099 if err != nil { 1100 return err 1101 } 1102 1103 // Create a transport layer 1104 trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout, 1105 s.config.LogOutput) 1106 s.raftTransport = trans 1107 1108 // Make sure we set the Logger. 1109 logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}) 1110 s.config.RaftConfig.Logger = logger 1111 s.config.RaftConfig.LogOutput = nil 1112 1113 // Our version of Raft protocol 2 requires the LocalID to match the network 1114 // address of the transport. Raft protocol 3 uses permanent ids. 1115 s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr()) 1116 if s.config.RaftConfig.ProtocolVersion >= 3 { 1117 s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID) 1118 } 1119 1120 // Build an all in-memory setup for dev mode, otherwise prepare a full 1121 // disk-based setup. 1122 var log raft.LogStore 1123 var stable raft.StableStore 1124 var snap raft.SnapshotStore 1125 if s.config.DevMode { 1126 store := raft.NewInmemStore() 1127 s.raftInmem = store 1128 stable = store 1129 log = store 1130 snap = raft.NewDiscardSnapshotStore() 1131 1132 } else { 1133 // Create the base raft path 1134 path := filepath.Join(s.config.DataDir, raftState) 1135 if err := ensurePath(path, true); err != nil { 1136 return err 1137 } 1138 1139 // Create the BoltDB backend 1140 store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db")) 1141 if err != nil { 1142 return err 1143 } 1144 s.raftStore = store 1145 stable = store 1146 1147 // Wrap the store in a LogCache to improve performance 1148 cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) 1149 if err != nil { 1150 store.Close() 1151 return err 1152 } 1153 log = cacheStore 1154 1155 // Create the snapshot store 1156 snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput) 1157 if err != nil { 1158 if s.raftStore != nil { 1159 s.raftStore.Close() 1160 } 1161 return err 1162 } 1163 snap = snapshots 1164 1165 // For an existing cluster being upgraded to the new version of 1166 // Raft, we almost never want to run recovery based on the old 1167 // peers.json file. We create a peers.info file with a helpful 1168 // note about where peers.json went, and use that as a sentinel 1169 // to avoid ingesting the old one that first time (if we have to 1170 // create the peers.info file because it's not there, we also 1171 // blow away any existing peers.json file). 1172 peersFile := filepath.Join(path, "peers.json") 1173 peersInfoFile := filepath.Join(path, "peers.info") 1174 if _, err := os.Stat(peersInfoFile); os.IsNotExist(err) { 1175 if err := ioutil.WriteFile(peersInfoFile, []byte(peersInfoContent), 0755); err != nil { 1176 return fmt.Errorf("failed to write peers.info file: %v", err) 1177 } 1178 1179 // Blow away the peers.json file if present, since the 1180 // peers.info sentinel wasn't there. 1181 if _, err := os.Stat(peersFile); err == nil { 1182 if err := os.Remove(peersFile); err != nil { 1183 return fmt.Errorf("failed to delete peers.json, please delete manually (see peers.info for details): %v", err) 1184 } 1185 s.logger.Info("deleted peers.json file (see peers.info for details)") 1186 } 1187 } else if _, err := os.Stat(peersFile); err == nil { 1188 s.logger.Info("found peers.json file, recovering Raft configuration...") 1189 var configuration raft.Configuration 1190 if s.config.RaftConfig.ProtocolVersion < 3 { 1191 configuration, err = raft.ReadPeersJSON(peersFile) 1192 } else { 1193 configuration, err = raft.ReadConfigJSON(peersFile) 1194 } 1195 if err != nil { 1196 return fmt.Errorf("recovery failed to parse peers.json: %v", err) 1197 } 1198 tmpFsm, err := NewFSM(fsmConfig) 1199 if err != nil { 1200 return fmt.Errorf("recovery failed to make temp FSM: %v", err) 1201 } 1202 if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm, 1203 log, stable, snap, trans, configuration); err != nil { 1204 return fmt.Errorf("recovery failed: %v", err) 1205 } 1206 if err := os.Remove(peersFile); err != nil { 1207 return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err) 1208 } 1209 s.logger.Info("deleted peers.json file after successful recovery") 1210 } 1211 } 1212 1213 // If we are in bootstrap or dev mode and the state is clean then we can 1214 // bootstrap now. 1215 if s.config.Bootstrap || s.config.DevMode { 1216 hasState, err := raft.HasExistingState(log, stable, snap) 1217 if err != nil { 1218 return err 1219 } 1220 if !hasState { 1221 configuration := raft.Configuration{ 1222 Servers: []raft.Server{ 1223 { 1224 ID: s.config.RaftConfig.LocalID, 1225 Address: trans.LocalAddr(), 1226 }, 1227 }, 1228 } 1229 if err := raft.BootstrapCluster(s.config.RaftConfig, 1230 log, stable, snap, trans, configuration); err != nil { 1231 return err 1232 } 1233 } 1234 } 1235 1236 // Setup the leader channel 1237 leaderCh := make(chan bool, 1) 1238 s.config.RaftConfig.NotifyCh = leaderCh 1239 s.leaderCh = leaderCh 1240 1241 // Setup the Raft store 1242 s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans) 1243 if err != nil { 1244 return err 1245 } 1246 return nil 1247} 1248 1249// setupSerf is used to setup and initialize a Serf 1250func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) { 1251 conf.Init() 1252 conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Region) 1253 conf.Tags["role"] = "nomad" 1254 conf.Tags["region"] = s.config.Region 1255 conf.Tags["dc"] = s.config.Datacenter 1256 conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion) 1257 conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion) 1258 conf.Tags["build"] = s.config.Build 1259 conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) 1260 conf.Tags["id"] = s.config.NodeID 1261 conf.Tags["rpc_addr"] = s.clientRpcAdvertise.(*net.TCPAddr).IP.String() // Address that clients will use to RPC to servers 1262 conf.Tags["port"] = fmt.Sprintf("%d", s.serverRpcAdvertise.(*net.TCPAddr).Port) // Port servers use to RPC to one and another 1263 if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) { 1264 conf.Tags["bootstrap"] = "1" 1265 } 1266 bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) 1267 if bootstrapExpect != 0 { 1268 conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect) 1269 } 1270 if s.config.NonVoter { 1271 conf.Tags["nonvoter"] = "1" 1272 } 1273 if s.config.RedundancyZone != "" { 1274 conf.Tags[AutopilotRZTag] = s.config.RedundancyZone 1275 } 1276 if s.config.UpgradeVersion != "" { 1277 conf.Tags[AutopilotVersionTag] = s.config.UpgradeVersion 1278 } 1279 logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true}) 1280 conf.MemberlistConfig.Logger = logger 1281 conf.Logger = logger 1282 conf.MemberlistConfig.LogOutput = nil 1283 conf.LogOutput = nil 1284 conf.EventCh = ch 1285 if !s.config.DevMode { 1286 conf.SnapshotPath = filepath.Join(s.config.DataDir, path) 1287 if err := ensurePath(conf.SnapshotPath, false); err != nil { 1288 return nil, err 1289 } 1290 } 1291 conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] 1292 conf.RejoinAfterLeave = true 1293 // LeavePropagateDelay is used to make sure broadcasted leave intents propagate 1294 // This value was tuned using https://www.serf.io/docs/internals/simulator.html to 1295 // allow for convergence in 99.9% of nodes in a 10 node cluster 1296 conf.LeavePropagateDelay = 1 * time.Second 1297 conf.Merge = &serfMergeDelegate{} 1298 1299 // Until Nomad supports this fully, we disable automatic resolution. 1300 // When enabled, the Serf gossip may just turn off if we are the minority 1301 // node which is rather unexpected. 1302 conf.EnableNameConflictResolution = false 1303 return serf.Create(conf) 1304} 1305 1306// setupWorkers is used to start the scheduling workers 1307func (s *Server) setupWorkers() error { 1308 // Check if all the schedulers are disabled 1309 if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 { 1310 s.logger.Warn("no enabled schedulers") 1311 return nil 1312 } 1313 1314 // Check if the core scheduler is not enabled 1315 foundCore := false 1316 for _, sched := range s.config.EnabledSchedulers { 1317 if sched == structs.JobTypeCore { 1318 foundCore = true 1319 continue 1320 } 1321 1322 if _, ok := scheduler.BuiltinSchedulers[sched]; !ok { 1323 return fmt.Errorf("invalid configuration: unknown scheduler %q in enabled schedulers", sched) 1324 } 1325 } 1326 if !foundCore { 1327 return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore) 1328 } 1329 1330 // Start the workers 1331 for i := 0; i < s.config.NumSchedulers; i++ { 1332 if w, err := NewWorker(s); err != nil { 1333 return err 1334 } else { 1335 s.workers = append(s.workers, w) 1336 } 1337 } 1338 s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers) 1339 return nil 1340} 1341 1342// numPeers is used to check on the number of known peers, including the local 1343// node. 1344func (s *Server) numPeers() (int, error) { 1345 future := s.raft.GetConfiguration() 1346 if err := future.Error(); err != nil { 1347 return 0, err 1348 } 1349 configuration := future.Configuration() 1350 return len(configuration.Servers), nil 1351} 1352 1353// IsLeader checks if this server is the cluster leader 1354func (s *Server) IsLeader() bool { 1355 return s.raft.State() == raft.Leader 1356} 1357 1358// Join is used to have Nomad join the gossip ring 1359// The target address should be another node listening on the 1360// Serf address 1361func (s *Server) Join(addrs []string) (int, error) { 1362 return s.serf.Join(addrs, true) 1363} 1364 1365// LocalMember is used to return the local node 1366func (s *Server) LocalMember() serf.Member { 1367 return s.serf.LocalMember() 1368} 1369 1370// Members is used to return the members of the serf cluster 1371func (s *Server) Members() []serf.Member { 1372 return s.serf.Members() 1373} 1374 1375// RemoveFailedNode is used to remove a failed node from the cluster 1376func (s *Server) RemoveFailedNode(node string) error { 1377 return s.serf.RemoveFailedNode(node) 1378} 1379 1380// KeyManager returns the Serf keyring manager 1381func (s *Server) KeyManager() *serf.KeyManager { 1382 return s.serf.KeyManager() 1383} 1384 1385// Encrypted determines if gossip is encrypted 1386func (s *Server) Encrypted() bool { 1387 return s.serf.EncryptionEnabled() 1388} 1389 1390// State returns the underlying state store. This should *not* 1391// be used to modify state directly. 1392func (s *Server) State() *state.StateStore { 1393 return s.fsm.State() 1394} 1395 1396// setLeaderAcl stores the given ACL token as the current leader's ACL token. 1397func (s *Server) setLeaderAcl(token string) { 1398 s.leaderAclLock.Lock() 1399 s.leaderAcl = token 1400 s.leaderAclLock.Unlock() 1401} 1402 1403// getLeaderAcl retrieves the leader's ACL token 1404func (s *Server) getLeaderAcl() string { 1405 s.leaderAclLock.Lock() 1406 defer s.leaderAclLock.Unlock() 1407 return s.leaderAcl 1408} 1409 1410// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write 1411func (s *Server) setConsistentReadReady() { 1412 atomic.StoreInt32(&s.readyForConsistentReads, 1) 1413} 1414 1415// Atomically reset readiness state flag on leadership revoke 1416func (s *Server) resetConsistentReadReady() { 1417 atomic.StoreInt32(&s.readyForConsistentReads, 0) 1418} 1419 1420// Returns true if this server is ready to serve consistent reads 1421func (s *Server) isReadyForConsistentReads() bool { 1422 return atomic.LoadInt32(&s.readyForConsistentReads) == 1 1423} 1424 1425// Regions returns the known regions in the cluster. 1426func (s *Server) Regions() []string { 1427 s.peerLock.RLock() 1428 defer s.peerLock.RUnlock() 1429 1430 regions := make([]string, 0, len(s.peers)) 1431 for region := range s.peers { 1432 regions = append(regions, region) 1433 } 1434 sort.Strings(regions) 1435 return regions 1436} 1437 1438// RPC is used to make a local RPC call 1439func (s *Server) RPC(method string, args interface{}, reply interface{}) error { 1440 codec := &codec.InmemCodec{ 1441 Method: method, 1442 Args: args, 1443 Reply: reply, 1444 } 1445 if err := s.rpcServer.ServeRequest(codec); err != nil { 1446 return err 1447 } 1448 return codec.Err 1449} 1450 1451// StreamingRpcHandler is used to make a streaming RPC call. 1452func (s *Server) StreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) { 1453 return s.streamingRpcs.GetHandler(method) 1454} 1455 1456// Stats is used to return statistics for debugging and insight 1457// for various sub-systems 1458func (s *Server) Stats() map[string]map[string]string { 1459 toString := func(v uint64) string { 1460 return strconv.FormatUint(v, 10) 1461 } 1462 stats := map[string]map[string]string{ 1463 "nomad": { 1464 "server": "true", 1465 "leader": fmt.Sprintf("%v", s.IsLeader()), 1466 "leader_addr": string(s.raft.Leader()), 1467 "bootstrap": fmt.Sprintf("%v", s.config.Bootstrap), 1468 "known_regions": toString(uint64(len(s.peers))), 1469 }, 1470 "raft": s.raft.Stats(), 1471 "serf": s.serf.Stats(), 1472 "runtime": stats.RuntimeStats(), 1473 "vault": s.vault.Stats(), 1474 } 1475 1476 return stats 1477} 1478 1479// EmitRaftStats is used to export metrics about raft indexes and state store snapshot index 1480func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) { 1481 for { 1482 select { 1483 case <-time.After(period): 1484 lastIndex := s.raft.LastIndex() 1485 metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex)) 1486 appliedIndex := s.raft.AppliedIndex() 1487 metrics.SetGauge([]string{"raft", "appliedIndex"}, float32(appliedIndex)) 1488 stateStoreSnapshotIndex, err := s.State().LatestIndex() 1489 if err != nil { 1490 s.logger.Warn("Unable to read snapshot index from statestore, metric will not be emitted", "error", err) 1491 } else { 1492 metrics.SetGauge([]string{"state", "snapshotIndex"}, float32(stateStoreSnapshotIndex)) 1493 } 1494 case <-stopCh: 1495 return 1496 } 1497 } 1498} 1499 1500// Region returns the region of the server 1501func (s *Server) Region() string { 1502 return s.config.Region 1503} 1504 1505// Datacenter returns the data center of the server 1506func (s *Server) Datacenter() string { 1507 return s.config.Datacenter 1508} 1509 1510// GetConfig returns the config of the server for testing purposes only 1511func (s *Server) GetConfig() *Config { 1512 return s.config 1513} 1514 1515// ReplicationToken returns the token used for replication. We use a method to support 1516// dynamic reloading of this value later. 1517func (s *Server) ReplicationToken() string { 1518 return s.config.ReplicationToken 1519} 1520 1521// peersInfoContent is used to help operators understand what happened to the 1522// peers.json file. This is written to a file called peers.info in the same 1523// location. 1524const peersInfoContent = ` 1525As of Nomad 0.5.5, the peers.json file is only used for recovery 1526after an outage. The format of this file depends on what the server has 1527configured for its Raft protocol version. Please see the agent configuration 1528page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more 1529details about this parameter. 1530For Raft protocol version 2 and earlier, this should be formatted as a JSON 1531array containing the address and port of each Consul server in the cluster, like 1532this: 1533[ 1534 "10.1.0.1:8300", 1535 "10.1.0.2:8300", 1536 "10.1.0.3:8300" 1537] 1538For Raft protocol version 3 and later, this should be formatted as a JSON 1539array containing the node ID, address:port, and suffrage information of each 1540Consul server in the cluster, like this: 1541[ 1542 { 1543 "id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e", 1544 "address": "10.1.0.1:8300", 1545 "non_voter": false 1546 }, 1547 { 1548 "id": "8b6dda82-3103-11e7-93ae-92361f002671", 1549 "address": "10.1.0.2:8300", 1550 "non_voter": false 1551 }, 1552 { 1553 "id": "97e17742-3103-11e7-93ae-92361f002671", 1554 "address": "10.1.0.3:8300", 1555 "non_voter": false 1556 } 1557] 1558The "id" field is the node ID of the server. This can be found in the logs when 1559the server starts up, or in the "node-id" file inside the server's data 1560directory. 1561The "address" field is the address and port of the server. 1562The "non_voter" field controls whether the server is a non-voter, which is used 1563in some advanced Autopilot configurations, please see 1564https://www.nomadproject.io/guides/operations/outage.html for more information. If 1565"non_voter" is omitted it will default to false, which is typical for most 1566clusters. 1567 1568Under normal operation, the peers.json file will not be present. 1569 1570When Nomad starts for the first time, it will create this peers.info file and 1571delete any existing peers.json file so that recovery doesn't occur on the first 1572startup. 1573 1574Once this peers.info file is present, any peers.json file will be ingested at 1575startup, and will set the Raft peer configuration manually to recover from an 1576outage. It's crucial that all servers in the cluster are shut down before 1577creating the peers.json file, and that all servers receive the same 1578configuration. Once the peers.json file is successfully ingested and applied, it 1579will be deleted. 1580 1581Please see https://www.nomadproject.io/guides/outage.html for more information. 1582` 1583