1package nomad
2
3import (
4	"context"
5	"crypto/tls"
6	"fmt"
7	"io/ioutil"
8	"net"
9	"net/rpc"
10	"os"
11	"path/filepath"
12	"sort"
13	"strconv"
14	"sync"
15	"sync/atomic"
16	"time"
17
18	"github.com/armon/go-metrics"
19	"github.com/hashicorp/consul/agent/consul/autopilot"
20	consulapi "github.com/hashicorp/consul/api"
21	"github.com/hashicorp/consul/lib"
22	log "github.com/hashicorp/go-hclog"
23	multierror "github.com/hashicorp/go-multierror"
24	lru "github.com/hashicorp/golang-lru"
25	"github.com/hashicorp/nomad/command/agent/consul"
26	"github.com/hashicorp/nomad/helper/codec"
27	"github.com/hashicorp/nomad/helper/pool"
28	"github.com/hashicorp/nomad/helper/stats"
29	"github.com/hashicorp/nomad/helper/tlsutil"
30	"github.com/hashicorp/nomad/nomad/deploymentwatcher"
31	"github.com/hashicorp/nomad/nomad/drainer"
32	"github.com/hashicorp/nomad/nomad/state"
33	"github.com/hashicorp/nomad/nomad/structs"
34	"github.com/hashicorp/nomad/nomad/structs/config"
35	"github.com/hashicorp/nomad/scheduler"
36	"github.com/hashicorp/raft"
37	raftboltdb "github.com/hashicorp/raft-boltdb"
38	"github.com/hashicorp/serf/serf"
39)
40
41const (
42	// datacenterQueryLimit sets the max number of DCs that a Nomad
43	// Server will query to find bootstrap_expect servers.
44	datacenterQueryLimit = 25
45
46	// maxStaleLeadership is the maximum time we will permit this Nomad
47	// Server to go without seeing a valid Raft leader.
48	maxStaleLeadership = 15 * time.Second
49
50	// peersPollInterval is used as the polling interval between attempts
51	// to query Consul for Nomad Servers.
52	peersPollInterval = 45 * time.Second
53
54	// peersPollJitter is used to provide a slight amount of variance to
55	// the retry interval when querying Consul Servers
56	peersPollJitterFactor = 2
57
58	raftState         = "raft/"
59	serfSnapshot      = "serf/snapshot"
60	snapshotsRetained = 2
61
62	// serverRPCCache controls how long we keep an idle connection open to a server
63	serverRPCCache = 2 * time.Minute
64
65	// serverMaxStreams controls how many idle streams we keep open to a server
66	serverMaxStreams = 64
67
68	// raftLogCacheSize is the maximum number of logs to cache in-memory.
69	// This is used to reduce disk I/O for the recently committed entries.
70	raftLogCacheSize = 512
71
72	// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
73	// to replicate to gracefully leave the cluster.
74	raftRemoveGracePeriod = 5 * time.Second
75
76	// defaultConsulDiscoveryInterval is how often to poll Consul for new
77	// servers if there is no leader.
78	defaultConsulDiscoveryInterval time.Duration = 3 * time.Second
79
80	// defaultConsulDiscoveryIntervalRetry is how often to poll Consul for
81	// new servers if there is no leader and the last Consul query failed.
82	defaultConsulDiscoveryIntervalRetry time.Duration = 9 * time.Second
83
84	// aclCacheSize is the number of ACL objects to keep cached. ACLs have a parsing and
85	// construction cost, so we keep the hot objects cached to reduce the ACL token resolution time.
86	aclCacheSize = 512
87)
88
89// Server is Nomad server which manages the job queues,
90// schedulers, and notification bus for agents.
91type Server struct {
92	config *Config
93
94	logger log.InterceptLogger
95
96	// Connection pool to other Nomad servers
97	connPool *pool.ConnPool
98
99	// The raft instance is used among Nomad nodes within the
100	// region to protect operations that require strong consistency
101	leaderCh      <-chan bool
102	raft          *raft.Raft
103	raftLayer     *RaftLayer
104	raftStore     *raftboltdb.BoltStore
105	raftInmem     *raft.InmemStore
106	raftTransport *raft.NetworkTransport
107
108	// autopilot is the Autopilot instance for this server.
109	autopilot *autopilot.Autopilot
110
111	// fsm is the state machine used with Raft
112	fsm *nomadFSM
113
114	// rpcListener is used to listen for incoming connections
115	rpcListener net.Listener
116	listenerCh  chan struct{}
117
118	// tlsWrap is used to wrap outbound connections using TLS. It should be
119	// accessed using the lock.
120	tlsWrap     tlsutil.RegionWrapper
121	tlsWrapLock sync.RWMutex
122
123	// TODO(alex,hclog): Can I move more into the handler?
124	// rpcHandler is used to serve and handle RPCs
125	*rpcHandler
126
127	// rpcServer is the static RPC server that is used by the local agent.
128	rpcServer *rpc.Server
129
130	// clientRpcAdvertise is the advertised RPC address for Nomad clients to connect
131	// to this server
132	clientRpcAdvertise net.Addr
133
134	// serverRpcAdvertise is the advertised RPC address for Nomad servers to connect
135	// to this server
136	serverRpcAdvertise net.Addr
137
138	// rpcTLS is the TLS config for incoming TLS requests
139	rpcTLS    *tls.Config
140	rpcCancel context.CancelFunc
141
142	// staticEndpoints is the set of static endpoints that can be reused across
143	// all RPC connections
144	staticEndpoints endpoints
145
146	// streamingRpcs is the registry holding our streaming RPC handlers.
147	streamingRpcs *structs.StreamingRpcRegistry
148
149	// nodeConns is the set of multiplexed node connections we have keyed by
150	// NodeID
151	nodeConns     map[string][]*nodeConnState
152	nodeConnsLock sync.RWMutex
153
154	// peers is used to track the known Nomad servers. This is
155	// used for region forwarding and clustering.
156	peers      map[string][]*serverParts
157	localPeers map[raft.ServerAddress]*serverParts
158	peerLock   sync.RWMutex
159
160	// serf is the Serf cluster containing only Nomad
161	// servers. This is used for multi-region federation
162	// and automatic clustering within regions.
163	serf *serf.Serf
164
165	// reconcileCh is used to pass events from the serf handler
166	// into the leader manager. Mostly used to handle when servers
167	// join/leave from the region.
168	reconcileCh chan serf.Member
169
170	// used to track when the server is ready to serve consistent reads, updated atomically
171	readyForConsistentReads int32
172
173	// eventCh is used to receive events from the serf cluster
174	eventCh chan serf.Event
175
176	// BlockedEvals is used to manage evaluations that are blocked on node
177	// capacity changes.
178	blockedEvals *BlockedEvals
179
180	// deploymentWatcher is used to watch deployments and their allocations and
181	// make the required calls to continue to transition the deployment.
182	deploymentWatcher *deploymentwatcher.Watcher
183
184	// nodeDrainer is used to drain allocations from nodes.
185	nodeDrainer *drainer.NodeDrainer
186
187	// evalBroker is used to manage the in-progress evaluations
188	// that are waiting to be brokered to a sub-scheduler
189	evalBroker *EvalBroker
190
191	// periodicDispatcher is used to track and create evaluations for periodic jobs.
192	periodicDispatcher *PeriodicDispatch
193
194	// planner is used to mange the submitted allocation plans that are waiting
195	// to be accessed by the leader
196	*planner
197
198	// nodeHeartbeater is used to track expiration times of node heartbeats. If it
199	// detects an expired node, the node status is updated to be 'down'.
200	*nodeHeartbeater
201
202	// consulCatalog is used for discovering other Nomad Servers via Consul
203	consulCatalog consul.CatalogAPI
204
205	// vault is the client for communicating with Vault.
206	vault VaultClient
207
208	// Worker used for processing
209	workers []*Worker
210
211	// aclCache is used to maintain the parsed ACL objects
212	aclCache *lru.TwoQueueCache
213
214	// leaderAcl is the management ACL token that is valid when resolved by the
215	// current leader.
216	leaderAcl     string
217	leaderAclLock sync.Mutex
218
219	// statsFetcher is used by autopilot to check the status of the other
220	// Nomad router.
221	statsFetcher *StatsFetcher
222
223	// EnterpriseState is used to fill in state for Pro/Ent builds
224	EnterpriseState
225
226	left         bool
227	shutdown     bool
228	shutdownLock sync.Mutex
229
230	shutdownCtx    context.Context
231	shutdownCancel context.CancelFunc
232	shutdownCh     <-chan struct{}
233}
234
235// Holds the RPC endpoints
236type endpoints struct {
237	Status     *Status
238	Node       *Node
239	Job        *Job
240	Eval       *Eval
241	Plan       *Plan
242	Alloc      *Alloc
243	Deployment *Deployment
244	Region     *Region
245	Search     *Search
246	Periodic   *Periodic
247	System     *System
248	Operator   *Operator
249	ACL        *ACL
250	Enterprise *EnterpriseEndpoints
251
252	// Client endpoints
253	ClientStats       *ClientStats
254	FileSystem        *FileSystem
255	Agent             *Agent
256	ClientAllocations *ClientAllocations
257}
258
259// NewServer is used to construct a new Nomad server from the
260// configuration, potentially returning an error
261func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) {
262	// Check the protocol version
263	if err := config.CheckVersion(); err != nil {
264		return nil, err
265	}
266
267	// Create an eval broker
268	evalBroker, err := NewEvalBroker(
269		config.EvalNackTimeout,
270		config.EvalNackInitialReenqueueDelay,
271		config.EvalNackSubsequentReenqueueDelay,
272		config.EvalDeliveryLimit)
273	if err != nil {
274		return nil, err
275	}
276
277	// Configure TLS
278	tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
279	if err != nil {
280		return nil, err
281	}
282	incomingTLS, tlsWrap, err := getTLSConf(config.TLSConfig.EnableRPC, tlsConf)
283	if err != nil {
284		return nil, err
285	}
286
287	// Create the ACL object cache
288	aclCache, err := lru.New2Q(aclCacheSize)
289	if err != nil {
290		return nil, err
291	}
292
293	// Create the logger
294	logger := config.Logger.ResetNamedIntercept("nomad")
295
296	// Create the server
297	s := &Server{
298		config:        config,
299		consulCatalog: consulCatalog,
300		connPool:      pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
301		logger:        logger,
302		tlsWrap:       tlsWrap,
303		rpcServer:     rpc.NewServer(),
304		streamingRpcs: structs.NewStreamingRpcRegistry(),
305		nodeConns:     make(map[string][]*nodeConnState),
306		peers:         make(map[string][]*serverParts),
307		localPeers:    make(map[raft.ServerAddress]*serverParts),
308		reconcileCh:   make(chan serf.Member, 32),
309		eventCh:       make(chan serf.Event, 256),
310		evalBroker:    evalBroker,
311		blockedEvals:  NewBlockedEvals(evalBroker, logger),
312		rpcTLS:        incomingTLS,
313		aclCache:      aclCache,
314	}
315
316	s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
317	s.shutdownCh = s.shutdownCtx.Done()
318
319	// Create the RPC handler
320	s.rpcHandler = newRpcHandler(s)
321
322	// Create the planner
323	planner, err := newPlanner(s)
324	if err != nil {
325		return nil, err
326	}
327	s.planner = planner
328
329	// Create the node heartbeater
330	s.nodeHeartbeater = newNodeHeartbeater(s)
331
332	// Create the periodic dispatcher for launching periodic jobs.
333	s.periodicDispatcher = NewPeriodicDispatch(s.logger, s)
334
335	// Initialize the stats fetcher that autopilot will use.
336	s.statsFetcher = NewStatsFetcher(s.logger, s.connPool, s.config.Region)
337
338	// Setup Vault
339	if err := s.setupVaultClient(); err != nil {
340		s.Shutdown()
341		s.logger.Error("failed to setup Vault client", "error", err)
342		return nil, fmt.Errorf("Failed to setup Vault client: %v", err)
343	}
344
345	// Initialize the RPC layer
346	if err := s.setupRPC(tlsWrap); err != nil {
347		s.Shutdown()
348		s.logger.Error("failed to start RPC layer", "error", err)
349		return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
350	}
351
352	// Initialize the Raft server
353	if err := s.setupRaft(); err != nil {
354		s.Shutdown()
355		s.logger.Error("failed to start Raft", "error", err)
356		return nil, fmt.Errorf("Failed to start Raft: %v", err)
357	}
358
359	// Initialize the wan Serf
360	s.serf, err = s.setupSerf(config.SerfConfig, s.eventCh, serfSnapshot)
361	if err != nil {
362		s.Shutdown()
363		s.logger.Error("failed to start serf WAN", "error", err)
364		return nil, fmt.Errorf("Failed to start serf: %v", err)
365	}
366
367	// Initialize the scheduling workers
368	if err := s.setupWorkers(); err != nil {
369		s.Shutdown()
370		s.logger.Error("failed to start workers", "error", err)
371		return nil, fmt.Errorf("Failed to start workers: %v", err)
372	}
373
374	// Setup the Consul syncer
375	if err := s.setupConsulSyncer(); err != nil {
376		s.logger.Error("failed to create server consul syncer", "error", err)
377		return nil, fmt.Errorf("failed to create server Consul syncer: %v", err)
378	}
379
380	// Setup the deployment watcher.
381	if err := s.setupDeploymentWatcher(); err != nil {
382		s.logger.Error("failed to create deployment watcher", "error", err)
383		return nil, fmt.Errorf("failed to create deployment watcher: %v", err)
384	}
385
386	// Setup the node drainer.
387	s.setupNodeDrainer()
388
389	// Setup the enterprise state
390	if err := s.setupEnterprise(config); err != nil {
391		return nil, err
392	}
393
394	// Monitor leadership changes
395	go s.monitorLeadership()
396
397	// Start ingesting events for Serf
398	go s.serfEventHandler()
399
400	// start the RPC listener for the server
401	s.startRPCListener()
402
403	// Emit metrics for the eval broker
404	go evalBroker.EmitStats(time.Second, s.shutdownCh)
405
406	// Emit metrics for the plan queue
407	go s.planQueue.EmitStats(time.Second, s.shutdownCh)
408
409	// Emit metrics for the blocked eval tracker.
410	go s.blockedEvals.EmitStats(time.Second, s.shutdownCh)
411
412	// Emit metrics for the Vault client.
413	go s.vault.EmitStats(time.Second, s.shutdownCh)
414
415	// Emit metrics
416	go s.heartbeatStats()
417
418	// Emit raft and state store metrics
419	go s.EmitRaftStats(10*time.Second, s.shutdownCh)
420
421	// Start enterprise background workers
422	s.startEnterpriseBackground()
423
424	// Done
425	return s, nil
426}
427
428// startRPCListener starts the server's the RPC listener
429func (s *Server) startRPCListener() {
430	ctx, cancel := context.WithCancel(context.Background())
431	s.rpcCancel = cancel
432	go s.listen(ctx)
433}
434
435// createRPCListener creates the server's RPC listener
436func (s *Server) createRPCListener() (*net.TCPListener, error) {
437	s.listenerCh = make(chan struct{})
438	listener, err := net.ListenTCP("tcp", s.config.RPCAddr)
439	if err != nil {
440		s.logger.Error("failed to initialize TLS listener", "error", err)
441		return listener, err
442	}
443
444	s.rpcListener = listener
445	return listener, nil
446}
447
448// getTLSConf gets the server's TLS configuration based on the config supplied
449// by the operator
450func getTLSConf(enableRPC bool, tlsConf *tlsutil.Config) (*tls.Config, tlsutil.RegionWrapper, error) {
451	var tlsWrap tlsutil.RegionWrapper
452	var incomingTLS *tls.Config
453	if enableRPC {
454		tw, err := tlsConf.OutgoingTLSWrapper()
455		if err != nil {
456			return nil, nil, err
457		}
458		tlsWrap = tw
459
460		itls, err := tlsConf.IncomingTLSConfig()
461		if err != nil {
462			return nil, nil, err
463		}
464		incomingTLS = itls
465	}
466	return incomingTLS, tlsWrap, nil
467}
468
469// reloadTLSConnections updates a server's TLS configuration and reloads RPC
470// connections.
471func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error {
472	s.logger.Info("reloading server connections due to configuration changes")
473
474	// Check if we can reload the RPC listener
475	if s.rpcListener == nil || s.rpcCancel == nil {
476		s.logger.Warn("unable to reload configuration due to uninitialized rpc listner")
477		return fmt.Errorf("can't reload uninitialized RPC listener")
478	}
479
480	tlsConf, err := tlsutil.NewTLSConfiguration(newTLSConfig, true, true)
481	if err != nil {
482		s.logger.Error("unable to create TLS configuration", "error", err)
483		return err
484	}
485
486	incomingTLS, tlsWrap, err := getTLSConf(newTLSConfig.EnableRPC, tlsConf)
487	if err != nil {
488		s.logger.Error("unable to reset TLS context", "error", err)
489		return err
490	}
491
492	// Store the new tls wrapper.
493	s.tlsWrapLock.Lock()
494	s.tlsWrap = tlsWrap
495	s.tlsWrapLock.Unlock()
496
497	// Keeping configuration in sync is important for other places that require
498	// access to config information, such as rpc.go, where we decide on what kind
499	// of network connections to accept depending on the server configuration
500	s.config.TLSConfig = newTLSConfig
501
502	// Kill any old listeners
503	s.rpcCancel()
504
505	s.rpcTLS = incomingTLS
506	s.connPool.ReloadTLS(tlsWrap)
507
508	if err := s.rpcListener.Close(); err != nil {
509		s.logger.Error("unable to close rpc listener", "error", err)
510		return err
511	}
512
513	// Wait for the old listener to exit
514	<-s.listenerCh
515
516	// Create the new listener with the update TLS config
517	listener, err := s.createRPCListener()
518	if err != nil {
519		listener.Close()
520		return err
521	}
522
523	// Start the new RPC listener
524	s.startRPCListener()
525
526	// Close and reload existing Raft connections
527	wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
528	s.raftLayer.ReloadTLS(wrapper)
529	s.raftTransport.CloseStreams()
530
531	s.logger.Debug("finished reloading server connections")
532	return nil
533}
534
535// Shutdown is used to shutdown the server
536func (s *Server) Shutdown() error {
537	s.logger.Info("shutting down server")
538	s.shutdownLock.Lock()
539	defer s.shutdownLock.Unlock()
540
541	if s.shutdown {
542		return nil
543	}
544
545	s.shutdown = true
546	s.shutdownCancel()
547
548	if s.serf != nil {
549		s.serf.Shutdown()
550	}
551
552	if s.raft != nil {
553		s.raftTransport.Close()
554		s.raftLayer.Close()
555		future := s.raft.Shutdown()
556		if err := future.Error(); err != nil {
557			s.logger.Warn("error shutting down raft", "error", err)
558		}
559		if s.raftStore != nil {
560			s.raftStore.Close()
561		}
562	}
563
564	// Shutdown the RPC listener
565	if s.rpcListener != nil {
566		s.rpcListener.Close()
567	}
568
569	// Close the connection pool
570	s.connPool.Shutdown()
571
572	// Close the fsm
573	if s.fsm != nil {
574		s.fsm.Close()
575	}
576
577	// Stop Vault token renewal
578	if s.vault != nil {
579		s.vault.Stop()
580	}
581
582	return nil
583}
584
585// IsShutdown checks if the server is shutdown
586func (s *Server) IsShutdown() bool {
587	select {
588	case <-s.shutdownCh:
589		return true
590	default:
591		return false
592	}
593}
594
595// Leave is used to prepare for a graceful shutdown of the server
596func (s *Server) Leave() error {
597	s.logger.Info("server starting leave")
598	s.left = true
599
600	// Check the number of known peers
601	numPeers, err := s.numPeers()
602	if err != nil {
603		s.logger.Error("failed to check raft peers during leave", "error", err)
604		return err
605	}
606
607	addr := s.raftTransport.LocalAddr()
608
609	// If we are the current leader, and we have any other peers (cluster has multiple
610	// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
611	// not the leader, then we should issue our leave intention and wait to be removed
612	// for some sane period of time.
613	isLeader := s.IsLeader()
614	if isLeader && numPeers > 1 {
615		minRaftProtocol, err := s.autopilot.MinRaftProtocol()
616		if err != nil {
617			return err
618		}
619
620		if minRaftProtocol >= 2 && s.config.RaftConfig.ProtocolVersion >= 3 {
621			future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0)
622			if err := future.Error(); err != nil {
623				s.logger.Error("failed to remove ourself as raft peer", "error", err)
624			}
625		} else {
626			future := s.raft.RemovePeer(addr)
627			if err := future.Error(); err != nil {
628				s.logger.Error("failed to remove ourself as raft peer", "error", err)
629			}
630		}
631	}
632
633	// Leave the gossip pool
634	if s.serf != nil {
635		if err := s.serf.Leave(); err != nil {
636			s.logger.Error("failed to leave Serf cluster", "error", err)
637		}
638	}
639
640	// If we were not leader, wait to be safely removed from the cluster.
641	// We must wait to allow the raft replication to take place, otherwise
642	// an immediate shutdown could cause a loss of quorum.
643	if !isLeader {
644		left := false
645		limit := time.Now().Add(raftRemoveGracePeriod)
646		for !left && time.Now().Before(limit) {
647			// Sleep a while before we check.
648			time.Sleep(50 * time.Millisecond)
649
650			// Get the latest configuration.
651			future := s.raft.GetConfiguration()
652			if err := future.Error(); err != nil {
653				s.logger.Error("failed to get raft configuration", "error", err)
654				break
655			}
656
657			// See if we are no longer included.
658			left = true
659			for _, server := range future.Configuration().Servers {
660				if server.Address == addr {
661					left = false
662					break
663				}
664			}
665		}
666
667		// TODO (alexdadgar) With the old Raft library we used to force the
668		// peers set to empty when a graceful leave occurred. This would
669		// keep voting spam down if the server was restarted, but it was
670		// dangerous because the peers was inconsistent with the logs and
671		// snapshots, so it wasn't really safe in all cases for the server
672		// to become leader. This is now safe, but the log spam is noisy.
673		// The next new version of the library will have a "you are not a
674		// peer stop it" behavior that should address this. We will have
675		// to evaluate during the RC period if this interim situation is
676		// not too confusing for operators.
677
678		// TODO (alexdadgar) When we take a later new version of the Raft
679		// library it won't try to complete replication, so this peer
680		// may not realize that it has been removed. Need to revisit this
681		// and the warning here.
682		if !left {
683			s.logger.Warn("failed to leave raft configuration gracefully, timeout")
684		}
685	}
686	return nil
687}
688
689// Reload handles a config reload specific to server-only configuration. Not
690// all config fields can handle a reload.
691func (s *Server) Reload(newConfig *Config) error {
692	if newConfig == nil {
693		return fmt.Errorf("Reload given a nil config")
694	}
695
696	var mErr multierror.Error
697
698	// Handle the Vault reload. Vault should never be nil but just guard.
699	if s.vault != nil {
700		if err := s.vault.SetConfig(newConfig.VaultConfig); err != nil {
701			multierror.Append(&mErr, err)
702		}
703	}
704
705	shouldReloadTLS, err := tlsutil.ShouldReloadRPCConnections(s.config.TLSConfig, newConfig.TLSConfig)
706	if err != nil {
707		s.logger.Error("error checking whether to reload TLS configuration", "error", err)
708	}
709
710	if shouldReloadTLS {
711		if err := s.reloadTLSConnections(newConfig.TLSConfig); err != nil {
712			s.logger.Error("error reloading server TLS configuration", "error", err)
713			multierror.Append(&mErr, err)
714		}
715	}
716
717	return mErr.ErrorOrNil()
718}
719
720// setupBootstrapHandler() creates the closure necessary to support a Consul
721// fallback handler.
722func (s *Server) setupBootstrapHandler() error {
723	// peersTimeout is used to indicate to the Consul Syncer that the
724	// current Nomad Server has a stale peer set.  peersTimeout will time
725	// out if the Consul Syncer bootstrapFn has not observed a Raft
726	// leader in maxStaleLeadership.  If peersTimeout has been triggered,
727	// the Consul Syncer will begin querying Consul for other Nomad
728	// Servers.
729	//
730	// NOTE: time.Timer is used vs time.Time in order to handle clock
731	// drift because time.Timer is implemented as a monotonic clock.
732	var peersTimeout *time.Timer = time.NewTimer(0)
733
734	// consulQueryCount is the number of times the bootstrapFn has been
735	// called, regardless of success.
736	var consulQueryCount uint64
737
738	// leadershipTimedOut is a helper method that returns true if the
739	// peersTimeout timer has expired.
740	leadershipTimedOut := func() bool {
741		select {
742		case <-peersTimeout.C:
743			return true
744		default:
745			return false
746		}
747	}
748
749	// The bootstrapFn callback handler is used to periodically poll
750	// Consul to look up the Nomad Servers in Consul.  In the event the
751	// server has been brought up without a `retry-join` configuration
752	// and this Server is partitioned from the rest of the cluster,
753	// periodically poll Consul to reattach this Server to other servers
754	// in the same region and automatically reform a quorum (assuming the
755	// correct number of servers required for quorum are present).
756	bootstrapFn := func() error {
757		// If there is a raft leader, do nothing
758		if s.raft.Leader() != "" {
759			peersTimeout.Reset(maxStaleLeadership)
760			return nil
761		}
762
763		// (ab)use serf.go's behavior of setting BootstrapExpect to
764		// zero if we have bootstrapped.  If we have bootstrapped
765		bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
766		if bootstrapExpect == 0 {
767			// This Nomad Server has been bootstrapped.  Rely on
768			// the peersTimeout firing as a guard to prevent
769			// aggressive querying of Consul.
770			if !leadershipTimedOut() {
771				return nil
772			}
773		} else {
774			if consulQueryCount > 0 && !leadershipTimedOut() {
775				return nil
776			}
777
778			// This Nomad Server has not been bootstrapped, reach
779			// out to Consul if our peer list is less than
780			// `bootstrap_expect`.
781			raftPeers, err := s.numPeers()
782			if err != nil {
783				peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
784				return nil
785			}
786
787			// The necessary number of Nomad Servers required for
788			// quorum has been reached, we do not need to poll
789			// Consul.  Let the normal timeout-based strategy
790			// take over.
791			if raftPeers >= int(bootstrapExpect) {
792				peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
793				return nil
794			}
795		}
796		consulQueryCount++
797
798		s.logger.Debug("lost contact with Nomad quorum, falling back to Consul for server list")
799
800		dcs, err := s.consulCatalog.Datacenters()
801		if err != nil {
802			peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
803			return fmt.Errorf("server.nomad: unable to query Consul datacenters: %v", err)
804		}
805		if len(dcs) > 2 {
806			// Query the local DC first, then shuffle the
807			// remaining DCs.  If additional calls to bootstrapFn
808			// are necessary, this Nomad Server will eventually
809			// walk all datacenter until it finds enough hosts to
810			// form a quorum.
811			shuffleStrings(dcs[1:])
812			dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
813		}
814
815		nomadServerServiceName := s.config.ConsulConfig.ServerServiceName
816		var mErr multierror.Error
817		const defaultMaxNumNomadServers = 8
818		nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
819		localNode := s.serf.Memberlist().LocalNode()
820		for _, dc := range dcs {
821			consulOpts := &consulapi.QueryOptions{
822				AllowStale: true,
823				Datacenter: dc,
824				Near:       "_agent",
825				WaitTime:   consul.DefaultQueryWaitDuration,
826			}
827			consulServices, _, err := s.consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
828			if err != nil {
829				err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err)
830				s.logger.Warn("failed to query Nomad service in Consul datacenter", "service_name", nomadServerServiceName, "dc", dc, "error", err)
831				mErr.Errors = append(mErr.Errors, err)
832				continue
833			}
834
835			for _, cs := range consulServices {
836				port := strconv.FormatInt(int64(cs.ServicePort), 10)
837				addr := cs.ServiceAddress
838				if addr == "" {
839					addr = cs.Address
840				}
841				if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort {
842					continue
843				}
844				serverAddr := net.JoinHostPort(addr, port)
845				nomadServerServices = append(nomadServerServices, serverAddr)
846			}
847		}
848
849		if len(nomadServerServices) == 0 {
850			if len(mErr.Errors) > 0 {
851				peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
852				return mErr.ErrorOrNil()
853			}
854
855			// Log the error and return nil so future handlers
856			// can attempt to register the `nomad` service.
857			pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)
858			s.logger.Trace("no Nomad Servers advertising Nomad service in Consul datacenters", "service_name", nomadServerServiceName, "datacenters", dcs, "retry", pollInterval)
859			peersTimeout.Reset(pollInterval)
860			return nil
861		}
862
863		numServersContacted, err := s.Join(nomadServerServices)
864		if err != nil {
865			peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
866			return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
867		}
868
869		peersTimeout.Reset(maxStaleLeadership)
870		s.logger.Info("successfully contacted Nomad servers", "num_servers", numServersContacted)
871
872		return nil
873	}
874
875	// Hacky replacement for old ConsulSyncer Periodic Handler.
876	go func() {
877		lastOk := true
878		sync := time.NewTimer(0)
879		for {
880			select {
881			case <-sync.C:
882				d := defaultConsulDiscoveryInterval
883				if err := bootstrapFn(); err != nil {
884					// Only log if it worked last time
885					if lastOk {
886						lastOk = false
887						s.logger.Error("error looking up Nomad servers in Consul", "error", err)
888					}
889					d = defaultConsulDiscoveryIntervalRetry
890				}
891				sync.Reset(d)
892			case <-s.shutdownCh:
893				return
894			}
895		}
896	}()
897	return nil
898}
899
900// setupConsulSyncer creates Server-mode consul.Syncer which periodically
901// executes callbacks on a fixed interval.
902func (s *Server) setupConsulSyncer() error {
903	if s.config.ConsulConfig.ServerAutoJoin != nil && *s.config.ConsulConfig.ServerAutoJoin {
904		if err := s.setupBootstrapHandler(); err != nil {
905			return err
906		}
907	}
908
909	return nil
910}
911
912// setupDeploymentWatcher creates a deployment watcher that consumes the RPC
913// endpoints for state information and makes transitions via Raft through a
914// shim that provides the appropriate methods.
915func (s *Server) setupDeploymentWatcher() error {
916
917	// Create the raft shim type to restrict the set of raft methods that can be
918	// made
919	raftShim := &deploymentWatcherRaftShim{
920		apply: s.raftApply,
921	}
922
923	// Create the deployment watcher
924	s.deploymentWatcher = deploymentwatcher.NewDeploymentsWatcher(
925		s.logger, raftShim,
926		deploymentwatcher.LimitStateQueriesPerSecond,
927		deploymentwatcher.CrossDeploymentUpdateBatchDuration)
928
929	return nil
930}
931
932// setupNodeDrainer creates a node drainer which will be enabled when a server
933// becomes a leader.
934func (s *Server) setupNodeDrainer() {
935	// Create a shim around Raft requests
936	shim := drainerShim{s}
937	c := &drainer.NodeDrainerConfig{
938		Logger:                s.logger,
939		Raft:                  shim,
940		JobFactory:            drainer.GetDrainingJobWatcher,
941		NodeFactory:           drainer.GetNodeWatcherFactory(),
942		DrainDeadlineFactory:  drainer.GetDeadlineNotifier,
943		StateQueriesPerSecond: drainer.LimitStateQueriesPerSecond,
944		BatchUpdateInterval:   drainer.BatchUpdateInterval,
945	}
946	s.nodeDrainer = drainer.NewNodeDrainer(c)
947}
948
949// setupVaultClient is used to set up the Vault API client.
950func (s *Server) setupVaultClient() error {
951	v, err := NewVaultClient(s.config.VaultConfig, s.logger, s.purgeVaultAccessors)
952	if err != nil {
953		return err
954	}
955	s.vault = v
956	return nil
957}
958
959// setupRPC is used to setup the RPC listener
960func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
961	// Populate the static RPC server
962	s.setupRpcServer(s.rpcServer, nil)
963
964	listener, err := s.createRPCListener()
965	if err != nil {
966		listener.Close()
967		return err
968	}
969
970	if s.config.ClientRPCAdvertise != nil {
971		s.clientRpcAdvertise = s.config.ClientRPCAdvertise
972	} else {
973		s.clientRpcAdvertise = s.rpcListener.Addr()
974	}
975
976	// Verify that we have a usable advertise address
977	clientAddr, ok := s.clientRpcAdvertise.(*net.TCPAddr)
978	if !ok {
979		listener.Close()
980		return fmt.Errorf("Client RPC advertise address is not a TCP Address: %v", clientAddr)
981	}
982	if clientAddr.IP.IsUnspecified() {
983		listener.Close()
984		return fmt.Errorf("Client RPC advertise address is not advertisable: %v", clientAddr)
985	}
986
987	if s.config.ServerRPCAdvertise != nil {
988		s.serverRpcAdvertise = s.config.ServerRPCAdvertise
989	} else {
990		// Default to the Serf Advertise + RPC Port
991		serfIP := s.config.SerfConfig.MemberlistConfig.AdvertiseAddr
992		if serfIP == "" {
993			serfIP = s.config.SerfConfig.MemberlistConfig.BindAddr
994		}
995
996		addr := net.JoinHostPort(serfIP, fmt.Sprintf("%d", clientAddr.Port))
997		resolved, err := net.ResolveTCPAddr("tcp", addr)
998		if err != nil {
999			return fmt.Errorf("Failed to resolve Server RPC advertise address: %v", err)
1000		}
1001
1002		s.serverRpcAdvertise = resolved
1003	}
1004
1005	// Verify that we have a usable advertise address
1006	serverAddr, ok := s.serverRpcAdvertise.(*net.TCPAddr)
1007	if !ok {
1008		return fmt.Errorf("Server RPC advertise address is not a TCP Address: %v", serverAddr)
1009	}
1010	if serverAddr.IP.IsUnspecified() {
1011		listener.Close()
1012		return fmt.Errorf("Server RPC advertise address is not advertisable: %v", serverAddr)
1013	}
1014
1015	wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
1016	s.raftLayer = NewRaftLayer(s.serverRpcAdvertise, wrapper)
1017	return nil
1018}
1019
1020// setupRpcServer is used to populate an RPC server with endpoints
1021func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
1022	// Add the static endpoints to the RPC server.
1023	if s.staticEndpoints.Status == nil {
1024		// Initialize the list just once
1025		s.staticEndpoints.ACL = &ACL{srv: s, logger: s.logger.Named("acl")}
1026		s.staticEndpoints.Alloc = &Alloc{srv: s, logger: s.logger.Named("alloc")}
1027		s.staticEndpoints.Eval = &Eval{srv: s, logger: s.logger.Named("eval")}
1028		s.staticEndpoints.Job = NewJobEndpoints(s)
1029		s.staticEndpoints.Node = &Node{srv: s, logger: s.logger.Named("client")} // Add but don't register
1030		s.staticEndpoints.Deployment = &Deployment{srv: s, logger: s.logger.Named("deployment")}
1031		s.staticEndpoints.Operator = &Operator{srv: s, logger: s.logger.Named("operator")}
1032		s.staticEndpoints.Periodic = &Periodic{srv: s, logger: s.logger.Named("periodic")}
1033		s.staticEndpoints.Plan = &Plan{srv: s, logger: s.logger.Named("plan")}
1034		s.staticEndpoints.Region = &Region{srv: s, logger: s.logger.Named("region")}
1035		s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")}
1036		s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")}
1037		s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")}
1038		s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s)
1039
1040		// Client endpoints
1041		s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")}
1042		s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")}
1043		s.staticEndpoints.ClientAllocations.register()
1044
1045		// Streaming endpoints
1046		s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")}
1047		s.staticEndpoints.FileSystem.register()
1048
1049		s.staticEndpoints.Agent = &Agent{srv: s}
1050		s.staticEndpoints.Agent.register()
1051	}
1052
1053	// Register the static handlers
1054	server.Register(s.staticEndpoints.ACL)
1055	server.Register(s.staticEndpoints.Alloc)
1056	server.Register(s.staticEndpoints.Eval)
1057	server.Register(s.staticEndpoints.Job)
1058	server.Register(s.staticEndpoints.Deployment)
1059	server.Register(s.staticEndpoints.Operator)
1060	server.Register(s.staticEndpoints.Periodic)
1061	server.Register(s.staticEndpoints.Plan)
1062	server.Register(s.staticEndpoints.Region)
1063	server.Register(s.staticEndpoints.Status)
1064	server.Register(s.staticEndpoints.System)
1065	server.Register(s.staticEndpoints.Search)
1066	s.staticEndpoints.Enterprise.Register(server)
1067	server.Register(s.staticEndpoints.ClientStats)
1068	server.Register(s.staticEndpoints.ClientAllocations)
1069	server.Register(s.staticEndpoints.FileSystem)
1070
1071	// Create new dynamic endpoints and add them to the RPC server.
1072	node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")}
1073
1074	// Register the dynamic endpoints
1075	server.Register(node)
1076}
1077
1078// setupRaft is used to setup and initialize Raft
1079func (s *Server) setupRaft() error {
1080	// If we have an unclean exit then attempt to close the Raft store.
1081	defer func() {
1082		if s.raft == nil && s.raftStore != nil {
1083			if err := s.raftStore.Close(); err != nil {
1084				s.logger.Error("failed to close Raft store", "error", err)
1085			}
1086		}
1087	}()
1088
1089	// Create the FSM
1090	fsmConfig := &FSMConfig{
1091		EvalBroker: s.evalBroker,
1092		Periodic:   s.periodicDispatcher,
1093		Blocked:    s.blockedEvals,
1094		Logger:     s.logger,
1095		Region:     s.Region(),
1096	}
1097	var err error
1098	s.fsm, err = NewFSM(fsmConfig)
1099	if err != nil {
1100		return err
1101	}
1102
1103	// Create a transport layer
1104	trans := raft.NewNetworkTransport(s.raftLayer, 3, s.config.RaftTimeout,
1105		s.config.LogOutput)
1106	s.raftTransport = trans
1107
1108	// Make sure we set the Logger.
1109	logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true})
1110	s.config.RaftConfig.Logger = logger
1111	s.config.RaftConfig.LogOutput = nil
1112
1113	// Our version of Raft protocol 2 requires the LocalID to match the network
1114	// address of the transport. Raft protocol 3 uses permanent ids.
1115	s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
1116	if s.config.RaftConfig.ProtocolVersion >= 3 {
1117		s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
1118	}
1119
1120	// Build an all in-memory setup for dev mode, otherwise prepare a full
1121	// disk-based setup.
1122	var log raft.LogStore
1123	var stable raft.StableStore
1124	var snap raft.SnapshotStore
1125	if s.config.DevMode {
1126		store := raft.NewInmemStore()
1127		s.raftInmem = store
1128		stable = store
1129		log = store
1130		snap = raft.NewDiscardSnapshotStore()
1131
1132	} else {
1133		// Create the base raft path
1134		path := filepath.Join(s.config.DataDir, raftState)
1135		if err := ensurePath(path, true); err != nil {
1136			return err
1137		}
1138
1139		// Create the BoltDB backend
1140		store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
1141		if err != nil {
1142			return err
1143		}
1144		s.raftStore = store
1145		stable = store
1146
1147		// Wrap the store in a LogCache to improve performance
1148		cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
1149		if err != nil {
1150			store.Close()
1151			return err
1152		}
1153		log = cacheStore
1154
1155		// Create the snapshot store
1156		snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
1157		if err != nil {
1158			if s.raftStore != nil {
1159				s.raftStore.Close()
1160			}
1161			return err
1162		}
1163		snap = snapshots
1164
1165		// For an existing cluster being upgraded to the new version of
1166		// Raft, we almost never want to run recovery based on the old
1167		// peers.json file. We create a peers.info file with a helpful
1168		// note about where peers.json went, and use that as a sentinel
1169		// to avoid ingesting the old one that first time (if we have to
1170		// create the peers.info file because it's not there, we also
1171		// blow away any existing peers.json file).
1172		peersFile := filepath.Join(path, "peers.json")
1173		peersInfoFile := filepath.Join(path, "peers.info")
1174		if _, err := os.Stat(peersInfoFile); os.IsNotExist(err) {
1175			if err := ioutil.WriteFile(peersInfoFile, []byte(peersInfoContent), 0755); err != nil {
1176				return fmt.Errorf("failed to write peers.info file: %v", err)
1177			}
1178
1179			// Blow away the peers.json file if present, since the
1180			// peers.info sentinel wasn't there.
1181			if _, err := os.Stat(peersFile); err == nil {
1182				if err := os.Remove(peersFile); err != nil {
1183					return fmt.Errorf("failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
1184				}
1185				s.logger.Info("deleted peers.json file (see peers.info for details)")
1186			}
1187		} else if _, err := os.Stat(peersFile); err == nil {
1188			s.logger.Info("found peers.json file, recovering Raft configuration...")
1189			var configuration raft.Configuration
1190			if s.config.RaftConfig.ProtocolVersion < 3 {
1191				configuration, err = raft.ReadPeersJSON(peersFile)
1192			} else {
1193				configuration, err = raft.ReadConfigJSON(peersFile)
1194			}
1195			if err != nil {
1196				return fmt.Errorf("recovery failed to parse peers.json: %v", err)
1197			}
1198			tmpFsm, err := NewFSM(fsmConfig)
1199			if err != nil {
1200				return fmt.Errorf("recovery failed to make temp FSM: %v", err)
1201			}
1202			if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
1203				log, stable, snap, trans, configuration); err != nil {
1204				return fmt.Errorf("recovery failed: %v", err)
1205			}
1206			if err := os.Remove(peersFile); err != nil {
1207				return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
1208			}
1209			s.logger.Info("deleted peers.json file after successful recovery")
1210		}
1211	}
1212
1213	// If we are in bootstrap or dev mode and the state is clean then we can
1214	// bootstrap now.
1215	if s.config.Bootstrap || s.config.DevMode {
1216		hasState, err := raft.HasExistingState(log, stable, snap)
1217		if err != nil {
1218			return err
1219		}
1220		if !hasState {
1221			configuration := raft.Configuration{
1222				Servers: []raft.Server{
1223					{
1224						ID:      s.config.RaftConfig.LocalID,
1225						Address: trans.LocalAddr(),
1226					},
1227				},
1228			}
1229			if err := raft.BootstrapCluster(s.config.RaftConfig,
1230				log, stable, snap, trans, configuration); err != nil {
1231				return err
1232			}
1233		}
1234	}
1235
1236	// Setup the leader channel
1237	leaderCh := make(chan bool, 1)
1238	s.config.RaftConfig.NotifyCh = leaderCh
1239	s.leaderCh = leaderCh
1240
1241	// Setup the Raft store
1242	s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
1243	if err != nil {
1244		return err
1245	}
1246	return nil
1247}
1248
1249// setupSerf is used to setup and initialize a Serf
1250func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
1251	conf.Init()
1252	conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Region)
1253	conf.Tags["role"] = "nomad"
1254	conf.Tags["region"] = s.config.Region
1255	conf.Tags["dc"] = s.config.Datacenter
1256	conf.Tags["vsn"] = fmt.Sprintf("%d", structs.ApiMajorVersion)
1257	conf.Tags["mvn"] = fmt.Sprintf("%d", structs.ApiMinorVersion)
1258	conf.Tags["build"] = s.config.Build
1259	conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
1260	conf.Tags["id"] = s.config.NodeID
1261	conf.Tags["rpc_addr"] = s.clientRpcAdvertise.(*net.TCPAddr).IP.String()         // Address that clients will use to RPC to servers
1262	conf.Tags["port"] = fmt.Sprintf("%d", s.serverRpcAdvertise.(*net.TCPAddr).Port) // Port servers use to RPC to one and another
1263	if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
1264		conf.Tags["bootstrap"] = "1"
1265	}
1266	bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
1267	if bootstrapExpect != 0 {
1268		conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect)
1269	}
1270	if s.config.NonVoter {
1271		conf.Tags["nonvoter"] = "1"
1272	}
1273	if s.config.RedundancyZone != "" {
1274		conf.Tags[AutopilotRZTag] = s.config.RedundancyZone
1275	}
1276	if s.config.UpgradeVersion != "" {
1277		conf.Tags[AutopilotVersionTag] = s.config.UpgradeVersion
1278	}
1279	logger := s.logger.StandardLoggerIntercept(&log.StandardLoggerOptions{InferLevels: true})
1280	conf.MemberlistConfig.Logger = logger
1281	conf.Logger = logger
1282	conf.MemberlistConfig.LogOutput = nil
1283	conf.LogOutput = nil
1284	conf.EventCh = ch
1285	if !s.config.DevMode {
1286		conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
1287		if err := ensurePath(conf.SnapshotPath, false); err != nil {
1288			return nil, err
1289		}
1290	}
1291	conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
1292	conf.RejoinAfterLeave = true
1293	// LeavePropagateDelay is used to make sure broadcasted leave intents propagate
1294	// This value was tuned using https://www.serf.io/docs/internals/simulator.html to
1295	// allow for convergence in 99.9% of nodes in a 10 node cluster
1296	conf.LeavePropagateDelay = 1 * time.Second
1297	conf.Merge = &serfMergeDelegate{}
1298
1299	// Until Nomad supports this fully, we disable automatic resolution.
1300	// When enabled, the Serf gossip may just turn off if we are the minority
1301	// node which is rather unexpected.
1302	conf.EnableNameConflictResolution = false
1303	return serf.Create(conf)
1304}
1305
1306// setupWorkers is used to start the scheduling workers
1307func (s *Server) setupWorkers() error {
1308	// Check if all the schedulers are disabled
1309	if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 {
1310		s.logger.Warn("no enabled schedulers")
1311		return nil
1312	}
1313
1314	// Check if the core scheduler is not enabled
1315	foundCore := false
1316	for _, sched := range s.config.EnabledSchedulers {
1317		if sched == structs.JobTypeCore {
1318			foundCore = true
1319			continue
1320		}
1321
1322		if _, ok := scheduler.BuiltinSchedulers[sched]; !ok {
1323			return fmt.Errorf("invalid configuration: unknown scheduler %q in enabled schedulers", sched)
1324		}
1325	}
1326	if !foundCore {
1327		return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore)
1328	}
1329
1330	// Start the workers
1331	for i := 0; i < s.config.NumSchedulers; i++ {
1332		if w, err := NewWorker(s); err != nil {
1333			return err
1334		} else {
1335			s.workers = append(s.workers, w)
1336		}
1337	}
1338	s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers)
1339	return nil
1340}
1341
1342// numPeers is used to check on the number of known peers, including the local
1343// node.
1344func (s *Server) numPeers() (int, error) {
1345	future := s.raft.GetConfiguration()
1346	if err := future.Error(); err != nil {
1347		return 0, err
1348	}
1349	configuration := future.Configuration()
1350	return len(configuration.Servers), nil
1351}
1352
1353// IsLeader checks if this server is the cluster leader
1354func (s *Server) IsLeader() bool {
1355	return s.raft.State() == raft.Leader
1356}
1357
1358// Join is used to have Nomad join the gossip ring
1359// The target address should be another node listening on the
1360// Serf address
1361func (s *Server) Join(addrs []string) (int, error) {
1362	return s.serf.Join(addrs, true)
1363}
1364
1365// LocalMember is used to return the local node
1366func (s *Server) LocalMember() serf.Member {
1367	return s.serf.LocalMember()
1368}
1369
1370// Members is used to return the members of the serf cluster
1371func (s *Server) Members() []serf.Member {
1372	return s.serf.Members()
1373}
1374
1375// RemoveFailedNode is used to remove a failed node from the cluster
1376func (s *Server) RemoveFailedNode(node string) error {
1377	return s.serf.RemoveFailedNode(node)
1378}
1379
1380// KeyManager returns the Serf keyring manager
1381func (s *Server) KeyManager() *serf.KeyManager {
1382	return s.serf.KeyManager()
1383}
1384
1385// Encrypted determines if gossip is encrypted
1386func (s *Server) Encrypted() bool {
1387	return s.serf.EncryptionEnabled()
1388}
1389
1390// State returns the underlying state store. This should *not*
1391// be used to modify state directly.
1392func (s *Server) State() *state.StateStore {
1393	return s.fsm.State()
1394}
1395
1396// setLeaderAcl stores the given ACL token as the current leader's ACL token.
1397func (s *Server) setLeaderAcl(token string) {
1398	s.leaderAclLock.Lock()
1399	s.leaderAcl = token
1400	s.leaderAclLock.Unlock()
1401}
1402
1403// getLeaderAcl retrieves the leader's ACL token
1404func (s *Server) getLeaderAcl() string {
1405	s.leaderAclLock.Lock()
1406	defer s.leaderAclLock.Unlock()
1407	return s.leaderAcl
1408}
1409
1410// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
1411func (s *Server) setConsistentReadReady() {
1412	atomic.StoreInt32(&s.readyForConsistentReads, 1)
1413}
1414
1415// Atomically reset readiness state flag on leadership revoke
1416func (s *Server) resetConsistentReadReady() {
1417	atomic.StoreInt32(&s.readyForConsistentReads, 0)
1418}
1419
1420// Returns true if this server is ready to serve consistent reads
1421func (s *Server) isReadyForConsistentReads() bool {
1422	return atomic.LoadInt32(&s.readyForConsistentReads) == 1
1423}
1424
1425// Regions returns the known regions in the cluster.
1426func (s *Server) Regions() []string {
1427	s.peerLock.RLock()
1428	defer s.peerLock.RUnlock()
1429
1430	regions := make([]string, 0, len(s.peers))
1431	for region := range s.peers {
1432		regions = append(regions, region)
1433	}
1434	sort.Strings(regions)
1435	return regions
1436}
1437
1438// RPC is used to make a local RPC call
1439func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
1440	codec := &codec.InmemCodec{
1441		Method: method,
1442		Args:   args,
1443		Reply:  reply,
1444	}
1445	if err := s.rpcServer.ServeRequest(codec); err != nil {
1446		return err
1447	}
1448	return codec.Err
1449}
1450
1451// StreamingRpcHandler is used to make a streaming RPC call.
1452func (s *Server) StreamingRpcHandler(method string) (structs.StreamingRpcHandler, error) {
1453	return s.streamingRpcs.GetHandler(method)
1454}
1455
1456// Stats is used to return statistics for debugging and insight
1457// for various sub-systems
1458func (s *Server) Stats() map[string]map[string]string {
1459	toString := func(v uint64) string {
1460		return strconv.FormatUint(v, 10)
1461	}
1462	stats := map[string]map[string]string{
1463		"nomad": {
1464			"server":        "true",
1465			"leader":        fmt.Sprintf("%v", s.IsLeader()),
1466			"leader_addr":   string(s.raft.Leader()),
1467			"bootstrap":     fmt.Sprintf("%v", s.config.Bootstrap),
1468			"known_regions": toString(uint64(len(s.peers))),
1469		},
1470		"raft":    s.raft.Stats(),
1471		"serf":    s.serf.Stats(),
1472		"runtime": stats.RuntimeStats(),
1473		"vault":   s.vault.Stats(),
1474	}
1475
1476	return stats
1477}
1478
1479// EmitRaftStats is used to export metrics about raft indexes and state store snapshot index
1480func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) {
1481	for {
1482		select {
1483		case <-time.After(period):
1484			lastIndex := s.raft.LastIndex()
1485			metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex))
1486			appliedIndex := s.raft.AppliedIndex()
1487			metrics.SetGauge([]string{"raft", "appliedIndex"}, float32(appliedIndex))
1488			stateStoreSnapshotIndex, err := s.State().LatestIndex()
1489			if err != nil {
1490				s.logger.Warn("Unable to read snapshot index from statestore, metric will not be emitted", "error", err)
1491			} else {
1492				metrics.SetGauge([]string{"state", "snapshotIndex"}, float32(stateStoreSnapshotIndex))
1493			}
1494		case <-stopCh:
1495			return
1496		}
1497	}
1498}
1499
1500// Region returns the region of the server
1501func (s *Server) Region() string {
1502	return s.config.Region
1503}
1504
1505// Datacenter returns the data center of the server
1506func (s *Server) Datacenter() string {
1507	return s.config.Datacenter
1508}
1509
1510// GetConfig returns the config of the server for testing purposes only
1511func (s *Server) GetConfig() *Config {
1512	return s.config
1513}
1514
1515// ReplicationToken returns the token used for replication. We use a method to support
1516// dynamic reloading of this value later.
1517func (s *Server) ReplicationToken() string {
1518	return s.config.ReplicationToken
1519}
1520
1521// peersInfoContent is used to help operators understand what happened to the
1522// peers.json file. This is written to a file called peers.info in the same
1523// location.
1524const peersInfoContent = `
1525As of Nomad 0.5.5, the peers.json file is only used for recovery
1526after an outage. The format of this file depends on what the server has
1527configured for its Raft protocol version. Please see the agent configuration
1528page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more
1529details about this parameter.
1530For Raft protocol version 2 and earlier, this should be formatted as a JSON
1531array containing the address and port of each Consul server in the cluster, like
1532this:
1533[
1534  "10.1.0.1:8300",
1535  "10.1.0.2:8300",
1536  "10.1.0.3:8300"
1537]
1538For Raft protocol version 3 and later, this should be formatted as a JSON
1539array containing the node ID, address:port, and suffrage information of each
1540Consul server in the cluster, like this:
1541[
1542  {
1543    "id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
1544    "address": "10.1.0.1:8300",
1545    "non_voter": false
1546  },
1547  {
1548    "id": "8b6dda82-3103-11e7-93ae-92361f002671",
1549    "address": "10.1.0.2:8300",
1550    "non_voter": false
1551  },
1552  {
1553    "id": "97e17742-3103-11e7-93ae-92361f002671",
1554    "address": "10.1.0.3:8300",
1555    "non_voter": false
1556  }
1557]
1558The "id" field is the node ID of the server. This can be found in the logs when
1559the server starts up, or in the "node-id" file inside the server's data
1560directory.
1561The "address" field is the address and port of the server.
1562The "non_voter" field controls whether the server is a non-voter, which is used
1563in some advanced Autopilot configurations, please see
1564https://www.nomadproject.io/guides/operations/outage.html for more information. If
1565"non_voter" is omitted it will default to false, which is typical for most
1566clusters.
1567
1568Under normal operation, the peers.json file will not be present.
1569
1570When Nomad starts for the first time, it will create this peers.info file and
1571delete any existing peers.json file so that recovery doesn't occur on the first
1572startup.
1573
1574Once this peers.info file is present, any peers.json file will be ingested at
1575startup, and will set the Raft peer configuration manually to recover from an
1576outage. It's crucial that all servers in the cluster are shut down before
1577creating the peers.json file, and that all servers receive the same
1578configuration. Once the peers.json file is successfully ingested and applied, it
1579will be deleted.
1580
1581Please see https://www.nomadproject.io/guides/outage.html for more information.
1582`
1583