1package consul
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"net"
10	"net/rpc"
11	"os"
12	"path/filepath"
13	"reflect"
14	"strconv"
15	"strings"
16	"sync"
17	"sync/atomic"
18	"time"
19
20	metrics "github.com/armon/go-metrics"
21	"github.com/hashicorp/consul/acl"
22	ca "github.com/hashicorp/consul/agent/connect/ca"
23	"github.com/hashicorp/consul/agent/consul/authmethod"
24	"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
25	"github.com/hashicorp/consul/agent/consul/autopilot"
26	"github.com/hashicorp/consul/agent/consul/fsm"
27	"github.com/hashicorp/consul/agent/consul/state"
28	"github.com/hashicorp/consul/agent/metadata"
29	"github.com/hashicorp/consul/agent/pool"
30	"github.com/hashicorp/consul/agent/router"
31	"github.com/hashicorp/consul/agent/structs"
32	"github.com/hashicorp/consul/agent/token"
33	"github.com/hashicorp/consul/lib"
34	"github.com/hashicorp/consul/logging"
35	"github.com/hashicorp/consul/tlsutil"
36	"github.com/hashicorp/consul/types"
37	connlimit "github.com/hashicorp/go-connlimit"
38	"github.com/hashicorp/go-hclog"
39	"github.com/hashicorp/go-memdb"
40	"github.com/hashicorp/memberlist"
41	"github.com/hashicorp/raft"
42	raftboltdb "github.com/hashicorp/raft-boltdb"
43	"github.com/hashicorp/serf/serf"
44	"golang.org/x/time/rate"
45)
46
47// These are the protocol versions that Consul can _understand_. These are
48// Consul-level protocol versions, that are used to configure the Serf
49// protocol versions.
50const (
51	DefaultRPCProtocol = 2
52
53	ProtocolVersionMin uint8 = 2
54
55	// Version 3 added support for network coordinates but we kept the
56	// default protocol version at 2 to ease the transition to this new
57	// feature. A Consul agent speaking version 2 of the protocol will
58	// attempt to send its coordinates to a server who understands version
59	// 3 or greater.
60	ProtocolVersion2Compatible = 2
61
62	ProtocolVersionMax = 3
63)
64
65const (
66	serfLANSnapshot   = "serf/local.snapshot"
67	serfWANSnapshot   = "serf/remote.snapshot"
68	raftState         = "raft/"
69	snapshotsRetained = 2
70
71	// serverRPCCache controls how long we keep an idle connection
72	// open to a server
73	serverRPCCache = 2 * time.Minute
74
75	// serverMaxStreams controls how many idle streams we keep
76	// open to a server
77	serverMaxStreams = 64
78
79	// raftLogCacheSize is the maximum number of logs to cache in-memory.
80	// This is used to reduce disk I/O for the recently committed entries.
81	raftLogCacheSize = 512
82
83	// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
84	// to replicate to gracefully leave the cluster.
85	raftRemoveGracePeriod = 5 * time.Second
86
87	// serfEventChSize is the size of the buffered channel to get Serf
88	// events. If this is exhausted we will block Serf and Memberlist.
89	serfEventChSize = 2048
90
91	// reconcileChSize is the size of the buffered channel reconcile updates
92	// from Serf with the Catalog. If this is exhausted we will drop updates,
93	// and wait for a periodic reconcile.
94	reconcileChSize = 256
95)
96
97const (
98	legacyACLReplicationRoutineName       = "legacy ACL replication"
99	aclPolicyReplicationRoutineName       = "ACL policy replication"
100	aclRoleReplicationRoutineName         = "ACL role replication"
101	aclTokenReplicationRoutineName        = "ACL token replication"
102	aclTokenReapingRoutineName            = "acl token reaping"
103	aclUpgradeRoutineName                 = "legacy ACL token upgrade"
104	caRootPruningRoutineName              = "CA root pruning"
105	configReplicationRoutineName          = "config entry replication"
106	federationStateReplicationRoutineName = "federation state replication"
107	federationStateAntiEntropyRoutineName = "federation state anti-entropy"
108	federationStatePruningRoutineName     = "federation state pruning"
109	intentionReplicationRoutineName       = "intention replication"
110	secondaryCARootWatchRoutineName       = "secondary CA roots watch"
111	secondaryCertRenewWatchRoutineName    = "secondary cert renew watch"
112)
113
114var (
115	ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
116)
117
118// Server is Consul server which manages the service discovery,
119// health checking, DC forwarding, Raft, and multiple Serf pools.
120type Server struct {
121	// queriesBlocking is a counter that we incr and decr atomically in
122	// rpc calls to provide telemetry on how many blocking queries are running.
123	// We interact with queriesBlocking atomically, do not move without ensuring it is
124	// correctly 64-byte aligned in the struct layout
125	queriesBlocking uint64
126
127	// aclConfig is the configuration for the ACL system
128	aclConfig *acl.Config
129
130	// acls is used to resolve tokens to effective policies
131	acls *ACLResolver
132
133	aclAuthMethodValidators authmethod.Cache
134
135	// DEPRECATED (ACL-Legacy-Compat) - only needed while we support both
136	// useNewACLs is used to determine whether we can use new ACLs or not
137	useNewACLs int32
138
139	// autopilot is the Autopilot instance for this server.
140	autopilot *autopilot.Autopilot
141
142	// autopilotWaitGroup is used to block until Autopilot shuts down.
143	autopilotWaitGroup sync.WaitGroup
144
145	// caProviderReconfigurationLock guards the provider reconfiguration.
146	caProviderReconfigurationLock sync.Mutex
147	// caProvider is the current CA provider in use for Connect. This is
148	// only non-nil when we are the leader.
149	caProvider ca.Provider
150	// caProviderRoot is the CARoot that was stored along with the ca.Provider
151	// active. It's only updated in lock-step with the caProvider. This prevents
152	// races between state updates to active roots and the fetch of the provider
153	// instance.
154	caProviderRoot *structs.CARoot
155	caProviderLock sync.RWMutex
156
157	// rate limiter to use when signing leaf certificates
158	caLeafLimiter connectSignRateLimiter
159
160	// Consul configuration
161	config *Config
162
163	// configReplicator is used to manage the leaders replication routines for
164	// centralized config
165	configReplicator *Replicator
166
167	// federationStateReplicator is used to manage the leaders replication routines for
168	// federation states
169	federationStateReplicator *Replicator
170
171	// dcSupportsFederationStates is used to determine whether we can
172	// replicate federation states or not. All servers in the local
173	// DC must be on a version of Consul supporting federation states
174	// before this will get enabled.
175	dcSupportsFederationStates int32
176
177	// tokens holds ACL tokens initially from the configuration, but can
178	// be updated at runtime, so should always be used instead of going to
179	// the configuration directly.
180	tokens *token.Store
181
182	// Connection pool to other consul servers
183	connPool *pool.ConnPool
184
185	// eventChLAN is used to receive events from the
186	// serf cluster in the datacenter
187	eventChLAN chan serf.Event
188
189	// eventChWAN is used to receive events from the
190	// serf cluster that spans datacenters
191	eventChWAN chan serf.Event
192
193	// fsm is the state machine used with Raft to provide
194	// strong consistency.
195	fsm *fsm.FSM
196
197	// Logger uses the provided LogOutput
198	logger  hclog.InterceptLogger
199	loggers *loggerStore
200
201	// The raft instance is used among Consul nodes within the DC to protect
202	// operations that require strong consistency.
203	// the state directly.
204	raft          *raft.Raft
205	raftLayer     *RaftLayer
206	raftStore     *raftboltdb.BoltStore
207	raftTransport *raft.NetworkTransport
208	raftInmem     *raft.InmemStore
209
210	// raftNotifyCh is set up by setupRaft() and ensures that we get reliable leader
211	// transition notifications from the Raft layer.
212	raftNotifyCh <-chan bool
213
214	// reconcileCh is used to pass events from the serf handler
215	// into the leader manager, so that the strong state can be
216	// updated
217	reconcileCh chan serf.Member
218
219	// readyForConsistentReads is used to track when the leader server is
220	// ready to serve consistent reads, after it has applied its initial
221	// barrier. This is updated atomically.
222	readyForConsistentReads int32
223
224	// leaveCh is used to signal that the server is leaving the cluster
225	// and trying to shed its RPC traffic onto other Consul servers. This
226	// is only ever closed.
227	leaveCh chan struct{}
228
229	// router is used to map out Consul servers in the WAN and in Consul
230	// Enterprise user-defined areas.
231	router *router.Router
232
233	// rpcLimiter is used to rate limit the total number of RPCs initiated
234	// from an agent.
235	rpcLimiter atomic.Value
236
237	// rpcConnLimiter limits the number of RPC connections from a single source IP
238	rpcConnLimiter connlimit.Limiter
239
240	// Listener is used to listen for incoming connections
241	Listener  net.Listener
242	rpcServer *rpc.Server
243
244	// insecureRPCServer is a RPC server that is configure with
245	// IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign
246	// to request client certificates. At this point a client doesn't have
247	// a client cert and thus cannot present it. This is the only RPC
248	// Endpoint that is available at the time of writing.
249	insecureRPCServer *rpc.Server
250
251	// tlsConfigurator holds the agent configuration relevant to TLS and
252	// configures everything related to it.
253	tlsConfigurator *tlsutil.Configurator
254
255	// serfLAN is the Serf cluster maintained inside the DC
256	// which contains all the DC nodes
257	serfLAN *serf.Serf
258
259	// segmentLAN maps segment names to their Serf cluster
260	segmentLAN map[string]*serf.Serf
261
262	// serfWAN is the Serf cluster maintained between DC's
263	// which SHOULD only consist of Consul servers
264	serfWAN                *serf.Serf
265	memberlistTransportWAN memberlist.IngestionAwareTransport
266	gatewayLocator         *GatewayLocator
267
268	// serverLookup tracks server consuls in the local datacenter.
269	// Used to do leader forwarding and provide fast lookup by server id and address
270	serverLookup *ServerLookup
271
272	// floodLock controls access to floodCh.
273	floodLock sync.RWMutex
274	floodCh   []chan struct{}
275
276	// sessionTimers track the expiration time of each Session that has
277	// a TTL. On expiration, a SessionDestroy event will occur, and
278	// destroy the session via standard session destroy processing
279	sessionTimers *SessionTimers
280
281	// statsFetcher is used by autopilot to check the status of the other
282	// Consul router.
283	statsFetcher *StatsFetcher
284
285	// reassertLeaderCh is used to signal the leader loop should re-run
286	// leadership actions after a snapshot restore.
287	reassertLeaderCh chan chan error
288
289	// tombstoneGC is used to track the pending GC invocations
290	// for the KV tombstones
291	tombstoneGC *state.TombstoneGC
292
293	// aclReplicationStatus (and its associated lock) provide information
294	// about the health of the ACL replication goroutine.
295	aclReplicationStatus     structs.ACLReplicationStatus
296	aclReplicationStatusLock sync.RWMutex
297
298	// shutdown and the associated members here are used in orchestrating
299	// a clean shutdown. The shutdownCh is never written to, only closed to
300	// indicate a shutdown has been initiated.
301	shutdown     bool
302	shutdownCh   chan struct{}
303	shutdownLock sync.Mutex
304
305	// State for whether this datacenter is acting as a secondary CA.
306	actingSecondaryCA   bool
307	actingSecondaryLock sync.RWMutex
308
309	// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
310	leaderRoutineManager *LeaderRoutineManager
311
312	// embedded struct to hold all the enterprise specific data
313	EnterpriseServer
314}
315
316// NewServer is only used to help setting up a server for testing. Normal code
317// exercises NewServerLogger.
318func NewServer(config *Config) (*Server, error) {
319	c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
320	if err != nil {
321		return nil, err
322	}
323	return NewServerLogger(config, nil, new(token.Store), c)
324}
325
326// NewServerLogger is used to construct a new Consul server from the
327// configuration, potentially returning an error
328func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token.Store, tlsConfigurator *tlsutil.Configurator) (*Server, error) {
329	return NewServerWithOptions(config,
330		WithLogger(logger),
331		WithTokenStore(tokens),
332		WithTLSConfigurator(tlsConfigurator))
333}
334
335// NewServerWithOptions is used to construct a new Consul server from the configuration
336// and extra options, potentially returning an error
337func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, error) {
338	flat := flattenConsulOptions(options)
339
340	logger := flat.logger
341	tokens := flat.tokens
342	tlsConfigurator := flat.tlsConfigurator
343	connPool := flat.connPool
344
345	// Check the protocol version.
346	if err := config.CheckProtocolVersion(); err != nil {
347		return nil, err
348	}
349
350	// Check for a data directory.
351	if config.DataDir == "" && !config.DevMode {
352		return nil, fmt.Errorf("Config must provide a DataDir")
353	}
354
355	// Sanity check the ACLs.
356	if err := config.CheckACL(); err != nil {
357		return nil, err
358	}
359
360	// Ensure we have a log output and create a logger.
361	if config.LogOutput == nil {
362		config.LogOutput = os.Stderr
363	}
364
365	if logger == nil {
366		logger = hclog.NewInterceptLogger(&hclog.LoggerOptions{
367			Level:  hclog.Debug,
368			Output: config.LogOutput,
369		})
370	}
371
372	// Check if TLS is enabled
373	if config.CAFile != "" || config.CAPath != "" {
374		config.UseTLS = true
375	}
376
377	// Set the primary DC if it wasn't set.
378	if config.PrimaryDatacenter == "" {
379		if config.ACLDatacenter != "" {
380			config.PrimaryDatacenter = config.ACLDatacenter
381		} else {
382			config.PrimaryDatacenter = config.Datacenter
383		}
384	}
385
386	if config.PrimaryDatacenter != "" {
387		config.ACLDatacenter = config.PrimaryDatacenter
388	}
389
390	// Create the tombstone GC.
391	gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity)
392	if err != nil {
393		return nil, err
394	}
395
396	// Create the shutdown channel - this is closed but never written to.
397	shutdownCh := make(chan struct{})
398
399	if connPool == nil {
400		connPool = &pool.ConnPool{
401			Server:          true,
402			SrcAddr:         config.RPCSrcAddr,
403			LogOutput:       config.LogOutput,
404			MaxTime:         serverRPCCache,
405			MaxStreams:      serverMaxStreams,
406			TLSConfigurator: tlsConfigurator,
407			Datacenter:      config.Datacenter,
408		}
409	}
410
411	serverLogger := logger.NamedIntercept(logging.ConsulServer)
412	loggers := newLoggerStore(serverLogger)
413	// Create server.
414	s := &Server{
415		config:                  config,
416		tokens:                  tokens,
417		connPool:                connPool,
418		eventChLAN:              make(chan serf.Event, serfEventChSize),
419		eventChWAN:              make(chan serf.Event, serfEventChSize),
420		logger:                  serverLogger,
421		loggers:                 loggers,
422		leaveCh:                 make(chan struct{}),
423		reconcileCh:             make(chan serf.Member, reconcileChSize),
424		router:                  router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)),
425		rpcServer:               rpc.NewServer(),
426		insecureRPCServer:       rpc.NewServer(),
427		tlsConfigurator:         tlsConfigurator,
428		reassertLeaderCh:        make(chan chan error),
429		segmentLAN:              make(map[string]*serf.Serf, len(config.Segments)),
430		sessionTimers:           NewSessionTimers(),
431		tombstoneGC:             gc,
432		serverLookup:            NewServerLookup(),
433		shutdownCh:              shutdownCh,
434		leaderRoutineManager:    NewLeaderRoutineManager(logger),
435		aclAuthMethodValidators: authmethod.NewCache(),
436	}
437
438	if s.config.ConnectMeshGatewayWANFederationEnabled {
439		s.gatewayLocator = NewGatewayLocator(
440			s.logger,
441			s,
442			s.config.Datacenter,
443			s.config.PrimaryDatacenter,
444		)
445		s.connPool.GatewayResolver = s.gatewayLocator.PickGateway
446	}
447
448	// Initialize enterprise specific server functionality
449	if err := s.initEnterprise(); err != nil {
450		s.Shutdown()
451		return nil, err
452	}
453
454	s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
455
456	configReplicatorConfig := ReplicatorConfig{
457		Name:     logging.ConfigEntry,
458		Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig},
459		Rate:     s.config.ConfigReplicationRate,
460		Burst:    s.config.ConfigReplicationBurst,
461		Logger:   s.logger,
462	}
463	s.configReplicator, err = NewReplicator(&configReplicatorConfig)
464	if err != nil {
465		s.Shutdown()
466		return nil, err
467	}
468
469	federationStateReplicatorConfig := ReplicatorConfig{
470		Name: logging.FederationState,
471		Delegate: &IndexReplicator{
472			Delegate: &FederationStateReplicator{
473				srv:            s,
474				gatewayLocator: s.gatewayLocator,
475			},
476			Logger: s.logger,
477		},
478		Rate:             s.config.FederationStateReplicationRate,
479		Burst:            s.config.FederationStateReplicationBurst,
480		Logger:           logger,
481		SuppressErrorLog: isErrFederationStatesNotSupported,
482	}
483	s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig)
484	if err != nil {
485		s.Shutdown()
486		return nil, err
487	}
488
489	// Initialize the stats fetcher that autopilot will use.
490	s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter)
491
492	s.aclConfig = newACLConfig(logger)
493	s.useNewACLs = 0
494	aclConfig := ACLResolverConfig{
495		Config:      config,
496		Delegate:    s,
497		CacheConfig: serverACLCacheConfig,
498		AutoDisable: false,
499		Logger:      logger,
500		ACLConfig:   s.aclConfig,
501	}
502	// Initialize the ACL resolver.
503	if s.acls, err = NewACLResolver(&aclConfig); err != nil {
504		s.Shutdown()
505		return nil, fmt.Errorf("Failed to create ACL resolver: %v", err)
506	}
507
508	// Initialize the RPC layer.
509	if err := s.setupRPC(); err != nil {
510		s.Shutdown()
511		return nil, fmt.Errorf("Failed to start RPC layer: %v", err)
512	}
513
514	// Initialize any extra RPC listeners for segments.
515	segmentListeners, err := s.setupSegmentRPC()
516	if err != nil {
517		s.Shutdown()
518		return nil, fmt.Errorf("Failed to start segment RPC layer: %v", err)
519	}
520
521	// Initialize the Raft server.
522	if err := s.setupRaft(); err != nil {
523		s.Shutdown()
524		return nil, fmt.Errorf("Failed to start Raft: %v", err)
525	}
526
527	if s.config.ConnectEnabled && (s.config.AutoEncryptAllowTLS || s.config.AutoConfigAuthzEnabled) {
528		go s.connectCARootsMonitor(&lib.StopChannelContext{StopCh: s.shutdownCh})
529	}
530
531	if s.gatewayLocator != nil {
532		go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
533	}
534
535	// Serf and dynamic bind ports
536	//
537	// The LAN serf cluster announces the port of the WAN serf cluster
538	// which creates a race when the WAN cluster is supposed to bind to
539	// a dynamic port (port 0). The current memberlist implementation will
540	// update the bind port in the configuration after the memberlist is
541	// created, so we can pull it out from there reliably, even though it's
542	// a little gross to be reading the updated config.
543
544	// Initialize the WAN Serf if enabled
545	serfBindPortWAN := -1
546	if config.SerfWANConfig != nil {
547		serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
548		s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
549		if err != nil {
550			s.Shutdown()
551			return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
552		}
553
554		// This is always a *memberlist.NetTransport or something which wraps
555		// it which satisfies this interface.
556		s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(memberlist.IngestionAwareTransport)
557
558		// See big comment above why we are doing this.
559		if serfBindPortWAN == 0 {
560			serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
561			if serfBindPortWAN == 0 {
562				return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf")
563			}
564			s.logger.Info("Serf WAN TCP bound", "port", serfBindPortWAN)
565		}
566	}
567
568	// Initialize the LAN segments before the default LAN Serf so we have
569	// updated port information to publish there.
570	if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil {
571		s.Shutdown()
572		return nil, fmt.Errorf("Failed to setup network segments: %v", err)
573	}
574
575	// Initialize the LAN Serf for the default network segment.
576	s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener)
577	if err != nil {
578		s.Shutdown()
579		return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
580	}
581	go s.lanEventHandler()
582
583	// Start the flooders after the LAN event handler is wired up.
584	s.floodSegments(config)
585
586	// Add a "static route" to the WAN Serf and hook it up to Serf events.
587	if s.serfWAN != nil {
588		if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
589			s.Shutdown()
590			return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
591		}
592		go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
593
594		// Fire up the LAN <-> WAN join flooder.
595		addrFn := func(s *metadata.Server) (string, error) {
596			if s.WanJoinPort == 0 {
597				return "", fmt.Errorf("no wan join  port for server: %s", s.Addr.String())
598			}
599			addr, _, err := net.SplitHostPort(s.Addr.String())
600			if err != nil {
601				return "", err
602			}
603			return fmt.Sprintf("%s:%d", addr, s.WanJoinPort), nil
604		}
605		go s.Flood(addrFn, s.serfWAN)
606	}
607
608	// Start enterprise specific functionality
609	if err := s.startEnterprise(); err != nil {
610		s.Shutdown()
611		return nil, err
612	}
613
614	// Initialize Autopilot. This must happen before starting leadership monitoring
615	// as establishing leadership could attempt to use autopilot and cause a panic.
616	s.initAutopilot(config)
617
618	// Start monitoring leadership. This must happen after Serf is set up
619	// since it can fire events when leadership is obtained.
620	go s.monitorLeadership()
621
622	// Start listening for RPC requests.
623	go s.listen(s.Listener)
624
625	// Start listeners for any segments with separate RPC listeners.
626	for _, listener := range segmentListeners {
627		go s.listen(listener)
628	}
629
630	// Start the metrics handlers.
631	go s.updateMetrics()
632
633	return s, nil
634}
635
636func (s *Server) connectCARootsMonitor(ctx context.Context) {
637	for {
638		ws := memdb.NewWatchSet()
639		state := s.fsm.State()
640		ws.Add(state.AbandonCh())
641		_, cas, err := state.CARoots(ws)
642		if err != nil {
643			s.logger.Error("Failed to watch AutoEncrypt CARoot", "error", err)
644			return
645		}
646		caPems := []string{}
647		for _, ca := range cas {
648			caPems = append(caPems, ca.RootCert)
649		}
650		if err := s.tlsConfigurator.UpdateAutoTLSCA(caPems); err != nil {
651			s.logger.Error("Failed to update AutoEncrypt CAPems", "error", err)
652		}
653
654		if err := ws.WatchCtx(ctx); err == context.Canceled {
655			s.logger.Info("shutting down Connect CA roots monitor")
656			return
657		}
658	}
659}
660
661// setupRaft is used to setup and initialize Raft
662func (s *Server) setupRaft() error {
663	// If we have an unclean exit then attempt to close the Raft store.
664	defer func() {
665		if s.raft == nil && s.raftStore != nil {
666			if err := s.raftStore.Close(); err != nil {
667				s.logger.Error("failed to close Raft store", "error", err)
668			}
669		}
670	}()
671
672	// Create the FSM.
673	var err error
674	s.fsm, err = fsm.New(s.tombstoneGC, s.logger)
675	if err != nil {
676		return err
677	}
678
679	var serverAddressProvider raft.ServerAddressProvider = nil
680	if s.config.RaftConfig.ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher
681		serverAddressProvider = s.serverLookup
682	}
683
684	// Create a transport layer.
685	transConfig := &raft.NetworkTransportConfig{
686		Stream:                s.raftLayer,
687		MaxPool:               3,
688		Timeout:               10 * time.Second,
689		ServerAddressProvider: serverAddressProvider,
690		Logger:                s.loggers.Named(logging.Raft),
691	}
692
693	trans := raft.NewNetworkTransportWithConfig(transConfig)
694	s.raftTransport = trans
695	s.config.RaftConfig.Logger = s.loggers.Named(logging.Raft)
696
697	// Versions of the Raft protocol below 3 require the LocalID to match the network
698	// address of the transport.
699	s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr())
700	if s.config.RaftConfig.ProtocolVersion >= 3 {
701		s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID)
702	}
703
704	// Build an all in-memory setup for dev mode, otherwise prepare a full
705	// disk-based setup.
706	var log raft.LogStore
707	var stable raft.StableStore
708	var snap raft.SnapshotStore
709	if s.config.DevMode {
710		store := raft.NewInmemStore()
711		s.raftInmem = store
712		stable = store
713		log = store
714		snap = raft.NewInmemSnapshotStore()
715	} else {
716		// Create the base raft path.
717		path := filepath.Join(s.config.DataDir, raftState)
718		if err := lib.EnsurePath(path, true); err != nil {
719			return err
720		}
721
722		// Create the backend raft store for logs and stable storage.
723		store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
724		if err != nil {
725			return err
726		}
727		s.raftStore = store
728		stable = store
729
730		// Wrap the store in a LogCache to improve performance.
731		cacheStore, err := raft.NewLogCache(raftLogCacheSize, store)
732		if err != nil {
733			return err
734		}
735		log = cacheStore
736
737		// Create the snapshot store.
738		snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput)
739		if err != nil {
740			return err
741		}
742		snap = snapshots
743
744		// For an existing cluster being upgraded to the new version of
745		// Raft, we almost never want to run recovery based on the old
746		// peers.json file. We create a peers.info file with a helpful
747		// note about where peers.json went, and use that as a sentinel
748		// to avoid ingesting the old one that first time (if we have to
749		// create the peers.info file because it's not there, we also
750		// blow away any existing peers.json file).
751		peersFile := filepath.Join(path, "peers.json")
752		peersInfoFile := filepath.Join(path, "peers.info")
753		if _, err := os.Stat(peersInfoFile); os.IsNotExist(err) {
754			if err := ioutil.WriteFile(peersInfoFile, []byte(peersInfoContent), 0755); err != nil {
755				return fmt.Errorf("failed to write peers.info file: %v", err)
756			}
757
758			// Blow away the peers.json file if present, since the
759			// peers.info sentinel wasn't there.
760			if _, err := os.Stat(peersFile); err == nil {
761				if err := os.Remove(peersFile); err != nil {
762					return fmt.Errorf("failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
763				}
764				s.logger.Info("deleted peers.json file (see peers.info for details)")
765			}
766		} else if _, err := os.Stat(peersFile); err == nil {
767			s.logger.Info("found peers.json file, recovering Raft configuration...")
768
769			var configuration raft.Configuration
770			if s.config.RaftConfig.ProtocolVersion < 3 {
771				configuration, err = raft.ReadPeersJSON(peersFile)
772			} else {
773				configuration, err = raft.ReadConfigJSON(peersFile)
774			}
775			if err != nil {
776				return fmt.Errorf("recovery failed to parse peers.json: %v", err)
777			}
778
779			tmpFsm, err := fsm.New(s.tombstoneGC, s.logger)
780			if err != nil {
781				return fmt.Errorf("recovery failed to make temp FSM: %v", err)
782			}
783			if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
784				log, stable, snap, trans, configuration); err != nil {
785				return fmt.Errorf("recovery failed: %v", err)
786			}
787
788			if err := os.Remove(peersFile); err != nil {
789				return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
790			}
791			s.logger.Info("deleted peers.json file after successful recovery")
792		}
793	}
794
795	// If we are in bootstrap or dev mode and the state is clean then we can
796	// bootstrap now.
797	if s.config.Bootstrap || s.config.DevMode {
798		hasState, err := raft.HasExistingState(log, stable, snap)
799		if err != nil {
800			return err
801		}
802		if !hasState {
803			configuration := raft.Configuration{
804				Servers: []raft.Server{
805					raft.Server{
806						ID:      s.config.RaftConfig.LocalID,
807						Address: trans.LocalAddr(),
808					},
809				},
810			}
811			if err := raft.BootstrapCluster(s.config.RaftConfig,
812				log, stable, snap, trans, configuration); err != nil {
813				return err
814			}
815		}
816	}
817
818	// Set up a channel for reliable leader notifications.
819	raftNotifyCh := make(chan bool, 10)
820	s.config.RaftConfig.NotifyCh = raftNotifyCh
821	s.raftNotifyCh = raftNotifyCh
822
823	// Setup the Raft store.
824	s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm.ChunkingFSM(), log, stable, snap, trans)
825	if err != nil {
826		return err
827	}
828	return nil
829}
830
831// endpointFactory is a function that returns an RPC endpoint bound to the given
832// server.
833type factory func(s *Server) interface{}
834
835// endpoints is a list of registered RPC endpoint factories.
836var endpoints []factory
837
838// registerEndpoint registers a new RPC endpoint factory.
839func registerEndpoint(fn factory) {
840	endpoints = append(endpoints, fn)
841}
842
843// setupRPC is used to setup the RPC listener
844func (s *Server) setupRPC() error {
845	s.rpcConnLimiter.SetConfig(connlimit.Config{
846		MaxConnsPerClientIP: s.config.RPCMaxConnsPerClient,
847	})
848
849	for _, fn := range endpoints {
850		s.rpcServer.Register(fn(s))
851	}
852
853	// Only register AutoEncrypt on the insecure RPC server. Insecure only
854	// means that verify incoming is turned off even though it might have
855	// been configured.
856	s.insecureRPCServer.Register(&AutoEncrypt{srv: s})
857
858	// Setup the AutoConfig JWT Authorizer
859	var authz AutoConfigAuthorizer
860	if s.config.AutoConfigAuthzEnabled {
861		// create the auto config authorizer from the JWT authmethod
862		validator, err := ssoauth.NewValidator(s.logger, &s.config.AutoConfigAuthzAuthMethod)
863		if err != nil {
864			return fmt.Errorf("Failed to initialize JWT Auto Config Authorizer: %w", err)
865		}
866
867		authz = &jwtAuthorizer{
868			validator:       validator,
869			allowReuse:      s.config.AutoConfigAuthzAllowReuse,
870			claimAssertions: s.config.AutoConfigAuthzClaimAssertions,
871		}
872	} else {
873		// This authorizer always returns that the endpoint is disabled
874		authz = &disabledAuthorizer{}
875	}
876	// now register with the insecure RPC server
877	s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz))
878
879	ln, err := net.ListenTCP("tcp", s.config.RPCAddr)
880	if err != nil {
881		return err
882	}
883	s.Listener = ln
884
885	if s.config.NotifyListen != nil {
886		s.config.NotifyListen()
887	}
888	// todo(fs): we should probably guard this
889	if s.config.RPCAdvertise == nil {
890		s.config.RPCAdvertise = ln.Addr().(*net.TCPAddr)
891	}
892
893	// Verify that we have a usable advertise address
894	if s.config.RPCAdvertise.IP.IsUnspecified() {
895		ln.Close()
896		return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise)
897	}
898
899	// TODO (hans) switch NewRaftLayer to tlsConfigurator
900
901	// Provide a DC specific wrapper. Raft replication is only
902	// ever done in the same datacenter, so we can provide it as a constant.
903	wrapper := tlsutil.SpecificDC(s.config.Datacenter, s.tlsConfigurator.OutgoingRPCWrapper())
904
905	// Define a callback for determining whether to wrap a connection with TLS
906	tlsFunc := func(address raft.ServerAddress) bool {
907		// raft only talks to its own datacenter
908		return s.tlsConfigurator.UseTLS(s.config.Datacenter)
909	}
910	s.raftLayer = NewRaftLayer(s.config.RPCSrcAddr, s.config.RPCAdvertise, wrapper, tlsFunc)
911	return nil
912}
913
914// Shutdown is used to shutdown the server
915func (s *Server) Shutdown() error {
916	s.logger.Info("shutting down server")
917	s.shutdownLock.Lock()
918	defer s.shutdownLock.Unlock()
919
920	if s.shutdown {
921		return nil
922	}
923
924	s.shutdown = true
925	close(s.shutdownCh)
926
927	// ensure that any leader routines still running get canceled
928	if s.leaderRoutineManager != nil {
929		s.leaderRoutineManager.StopAll()
930	}
931
932	if s.serfLAN != nil {
933		s.serfLAN.Shutdown()
934	}
935
936	if s.serfWAN != nil {
937		s.serfWAN.Shutdown()
938		if err := s.router.RemoveArea(types.AreaWAN); err != nil {
939			s.logger.Warn("error removing WAN area", "error", err)
940		}
941	}
942	s.router.Shutdown()
943
944	if s.raft != nil {
945		s.raftTransport.Close()
946		s.raftLayer.Close()
947		future := s.raft.Shutdown()
948		if err := future.Error(); err != nil {
949			s.logger.Warn("error shutting down raft", "error", err)
950		}
951		if s.raftStore != nil {
952			s.raftStore.Close()
953		}
954	}
955
956	if s.Listener != nil {
957		s.Listener.Close()
958	}
959
960	// Close the connection pool
961	if s.connPool != nil {
962		s.connPool.Shutdown()
963	}
964
965	if s.acls != nil {
966		s.acls.Close()
967	}
968
969	if s.config.NotifyShutdown != nil {
970		s.config.NotifyShutdown()
971	}
972
973	return nil
974}
975
976// Leave is used to prepare for a graceful shutdown of the server
977func (s *Server) Leave() error {
978	s.logger.Info("server starting leave")
979
980	// Check the number of known peers
981	numPeers, err := s.numPeers()
982	if err != nil {
983		s.logger.Error("failed to check raft peers", "error", err)
984		return err
985	}
986
987	addr := s.raftTransport.LocalAddr()
988
989	// If we are the current leader, and we have any other peers (cluster has multiple
990	// servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size.
991	// If we are not the leader, then we should issue our leave intention and wait to be
992	// removed for some sane period of time.
993	isLeader := s.IsLeader()
994	if isLeader && numPeers > 1 {
995		minRaftProtocol, err := s.autopilot.MinRaftProtocol()
996		if err != nil {
997			return err
998		}
999
1000		if minRaftProtocol >= 2 && s.config.RaftConfig.ProtocolVersion >= 3 {
1001			future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0)
1002			if err := future.Error(); err != nil {
1003				s.logger.Error("failed to remove ourself as raft peer", "error", err)
1004			}
1005		} else {
1006			future := s.raft.RemovePeer(addr)
1007			if err := future.Error(); err != nil {
1008				s.logger.Error("failed to remove ourself as raft peer", "error", err)
1009			}
1010		}
1011	}
1012
1013	// Leave the WAN pool
1014	if s.serfWAN != nil {
1015		if err := s.serfWAN.Leave(); err != nil {
1016			s.logger.Error("failed to leave WAN Serf cluster", "error", err)
1017		}
1018	}
1019
1020	// Leave the LAN pool
1021	if s.serfLAN != nil {
1022		if err := s.serfLAN.Leave(); err != nil {
1023			s.logger.Error("failed to leave LAN Serf cluster", "error", err)
1024		}
1025	}
1026
1027	// Leave everything enterprise related as well
1028	s.handleEnterpriseLeave()
1029
1030	// Start refusing RPCs now that we've left the LAN pool. It's important
1031	// to do this *after* we've left the LAN pool so that clients will know
1032	// to shift onto another server if they perform a retry. We also wake up
1033	// all queries in the RPC retry state.
1034	s.logger.Info("Waiting to drain RPC traffic", "drain_time", s.config.LeaveDrainTime)
1035	close(s.leaveCh)
1036	time.Sleep(s.config.LeaveDrainTime)
1037
1038	// If we were not leader, wait to be safely removed from the cluster. We
1039	// must wait to allow the raft replication to take place, otherwise an
1040	// immediate shutdown could cause a loss of quorum.
1041	if !isLeader {
1042		left := false
1043		limit := time.Now().Add(raftRemoveGracePeriod)
1044		for !left && time.Now().Before(limit) {
1045			// Sleep a while before we check.
1046			time.Sleep(50 * time.Millisecond)
1047
1048			// Get the latest configuration.
1049			future := s.raft.GetConfiguration()
1050			if err := future.Error(); err != nil {
1051				s.logger.Error("failed to get raft configuration", "error", err)
1052				break
1053			}
1054
1055			// See if we are no longer included.
1056			left = true
1057			for _, server := range future.Configuration().Servers {
1058				if server.Address == addr {
1059					left = false
1060					break
1061				}
1062			}
1063		}
1064
1065		// TODO (slackpad) With the old Raft library we used to force the
1066		// peers set to empty when a graceful leave occurred. This would
1067		// keep voting spam down if the server was restarted, but it was
1068		// dangerous because the peers was inconsistent with the logs and
1069		// snapshots, so it wasn't really safe in all cases for the server
1070		// to become leader. This is now safe, but the log spam is noisy.
1071		// The next new version of the library will have a "you are not a
1072		// peer stop it" behavior that should address this. We will have
1073		// to evaluate during the RC period if this interim situation is
1074		// not too confusing for operators.
1075
1076		// TODO (slackpad) When we take a later new version of the Raft
1077		// library it won't try to complete replication, so this peer
1078		// may not realize that it has been removed. Need to revisit this
1079		// and the warning here.
1080		if !left {
1081			s.logger.Warn("failed to leave raft configuration gracefully, timeout")
1082		}
1083	}
1084
1085	return nil
1086}
1087
1088// numPeers is used to check on the number of known peers, including potentially
1089// the local node. We count only voters, since others can't actually become
1090// leader, so aren't considered peers.
1091func (s *Server) numPeers() (int, error) {
1092	future := s.raft.GetConfiguration()
1093	if err := future.Error(); err != nil {
1094		return 0, err
1095	}
1096
1097	return autopilot.NumPeers(future.Configuration()), nil
1098}
1099
1100// JoinLAN is used to have Consul join the inner-DC pool
1101// The target address should be another node inside the DC
1102// listening on the Serf LAN address
1103func (s *Server) JoinLAN(addrs []string) (int, error) {
1104	return s.serfLAN.Join(addrs, true)
1105}
1106
1107// JoinWAN is used to have Consul join the cross-WAN Consul ring
1108// The target address should be another node listening on the
1109// Serf WAN address
1110func (s *Server) JoinWAN(addrs []string) (int, error) {
1111	if s.serfWAN == nil {
1112		return 0, ErrWANFederationDisabled
1113	}
1114	return s.serfWAN.Join(addrs, true)
1115}
1116
1117// PrimaryMeshGatewayAddressesReadyCh returns a channel that will be closed
1118// when federation state replication ships back at least one primary mesh
1119// gateway (not via fallback config).
1120func (s *Server) PrimaryMeshGatewayAddressesReadyCh() <-chan struct{} {
1121	if s.gatewayLocator == nil {
1122		return nil
1123	}
1124	return s.gatewayLocator.PrimaryMeshGatewayAddressesReadyCh()
1125}
1126
1127// PickRandomMeshGatewaySuitableForDialing is a convenience function used for writing tests.
1128func (s *Server) PickRandomMeshGatewaySuitableForDialing(dc string) string {
1129	if s.gatewayLocator == nil {
1130		return ""
1131	}
1132	return s.gatewayLocator.PickGateway(dc)
1133}
1134
1135// RefreshPrimaryGatewayFallbackAddresses is used to update the list of current
1136// fallback addresses for locating mesh gateways in the primary datacenter.
1137func (s *Server) RefreshPrimaryGatewayFallbackAddresses(addrs []string) {
1138	if s.gatewayLocator != nil {
1139		s.gatewayLocator.RefreshPrimaryGatewayFallbackAddresses(addrs)
1140	}
1141}
1142
1143// PrimaryGatewayFallbackAddresses returns the current set of discovered
1144// fallback addresses for the mesh gateways in the primary datacenter.
1145func (s *Server) PrimaryGatewayFallbackAddresses() []string {
1146	if s.gatewayLocator == nil {
1147		return nil
1148	}
1149	return s.gatewayLocator.PrimaryGatewayFallbackAddresses()
1150}
1151
1152// LocalMember is used to return the local node
1153func (s *Server) LocalMember() serf.Member {
1154	return s.serfLAN.LocalMember()
1155}
1156
1157// LANMembers is used to return the members of the LAN cluster
1158func (s *Server) LANMembers() []serf.Member {
1159	return s.serfLAN.Members()
1160}
1161
1162// WANMembers is used to return the members of the LAN cluster
1163func (s *Server) WANMembers() []serf.Member {
1164	if s.serfWAN == nil {
1165		return nil
1166	}
1167	return s.serfWAN.Members()
1168}
1169
1170// RemoveFailedNode is used to remove a failed node from the cluster
1171func (s *Server) RemoveFailedNode(node string, prune bool) error {
1172	var removeFn func(*serf.Serf, string) error
1173	if prune {
1174		removeFn = (*serf.Serf).RemoveFailedNodePrune
1175	} else {
1176		removeFn = (*serf.Serf).RemoveFailedNode
1177	}
1178
1179	if err := removeFn(s.serfLAN, node); err != nil {
1180		return err
1181	}
1182	// The Serf WAN pool stores members as node.datacenter
1183	// so the dc is appended if not present
1184	if !strings.HasSuffix(node, "."+s.config.Datacenter) {
1185		node = node + "." + s.config.Datacenter
1186	}
1187	if s.serfWAN != nil {
1188		if err := removeFn(s.serfWAN, node); err != nil {
1189			return err
1190		}
1191	}
1192	return nil
1193}
1194
1195// IsLeader checks if this server is the cluster leader
1196func (s *Server) IsLeader() bool {
1197	return s.raft.State() == raft.Leader
1198}
1199
1200// LeaderLastContact returns the time of last contact by a leader.
1201// This only makes sense if we are currently a follower.
1202func (s *Server) LeaderLastContact() time.Time {
1203	return s.raft.LastContact()
1204}
1205
1206// KeyManagerLAN returns the LAN Serf keyring manager
1207func (s *Server) KeyManagerLAN() *serf.KeyManager {
1208	return s.serfLAN.KeyManager()
1209}
1210
1211// KeyManagerWAN returns the WAN Serf keyring manager
1212func (s *Server) KeyManagerWAN() *serf.KeyManager {
1213	return s.serfWAN.KeyManager()
1214}
1215
1216// LANSegments returns a map of LAN segments by name
1217func (s *Server) LANSegments() map[string]*serf.Serf {
1218	segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1)
1219	segments[""] = s.serfLAN
1220	for name, segment := range s.segmentLAN {
1221		segments[name] = segment
1222	}
1223
1224	return segments
1225}
1226
1227// inmemCodec is used to do an RPC call without going over a network
1228type inmemCodec struct {
1229	method string
1230	args   interface{}
1231	reply  interface{}
1232	err    error
1233}
1234
1235func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error {
1236	req.ServiceMethod = i.method
1237	return nil
1238}
1239
1240func (i *inmemCodec) ReadRequestBody(args interface{}) error {
1241	sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args)))
1242	dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args)))
1243	dst.Set(sourceValue)
1244	return nil
1245}
1246
1247func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error {
1248	if resp.Error != "" {
1249		i.err = errors.New(resp.Error)
1250		return nil
1251	}
1252	sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply)))
1253	dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply)))
1254	dst.Set(sourceValue)
1255	return nil
1256}
1257
1258func (i *inmemCodec) Close() error {
1259	return nil
1260}
1261
1262// RPC is used to make a local RPC call
1263func (s *Server) RPC(method string, args interface{}, reply interface{}) error {
1264	codec := &inmemCodec{
1265		method: method,
1266		args:   args,
1267		reply:  reply,
1268	}
1269
1270	// Enforce the RPC limit.
1271	//
1272	// "client" metric path because the internal client API is calling to the
1273	// internal server API. It's odd that the same request directed to a server is
1274	// recorded differently. On the other hand this possibly masks the different
1275	// between regular client requests that traverse the network and these which
1276	// don't (unless forwarded). This still seems most sane.
1277	metrics.IncrCounter([]string{"client", "rpc"}, 1)
1278	if !s.rpcLimiter.Load().(*rate.Limiter).Allow() {
1279		metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
1280		return structs.ErrRPCRateExceeded
1281	}
1282	if err := s.rpcServer.ServeRequest(codec); err != nil {
1283		return err
1284	}
1285	return codec.err
1286}
1287
1288// SnapshotRPC dispatches the given snapshot request, reading from the streaming
1289// input and writing to the streaming output depending on the operation.
1290func (s *Server) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
1291	replyFn structs.SnapshotReplyFn) error {
1292
1293	// Enforce the RPC limit.
1294	//
1295	// "client" metric path because the internal client API is calling to the
1296	// internal server API. It's odd that the same request directed to a server is
1297	// recorded differently. On the other hand this possibly masks the different
1298	// between regular client requests that traverse the network and these which
1299	// don't (unless forwarded). This still seems most sane.
1300	metrics.IncrCounter([]string{"client", "rpc"}, 1)
1301	if !s.rpcLimiter.Load().(*rate.Limiter).Allow() {
1302		metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
1303		return structs.ErrRPCRateExceeded
1304	}
1305
1306	// Perform the operation.
1307	var reply structs.SnapshotResponse
1308	snap, err := s.dispatchSnapshotRequest(args, in, &reply)
1309	if err != nil {
1310		return err
1311	}
1312	defer func() {
1313		if err := snap.Close(); err != nil {
1314			s.logger.Error("Failed to close snapshot", "error", err)
1315		}
1316	}()
1317
1318	// Let the caller peek at the reply.
1319	if replyFn != nil {
1320		if err := replyFn(&reply); err != nil {
1321			return nil
1322		}
1323	}
1324
1325	// Stream the snapshot.
1326	if out != nil {
1327		if _, err := io.Copy(out, snap); err != nil {
1328			return fmt.Errorf("failed to stream snapshot: %v", err)
1329		}
1330	}
1331	return nil
1332}
1333
1334// RegisterEndpoint is used to substitute an endpoint for testing.
1335func (s *Server) RegisterEndpoint(name string, handler interface{}) error {
1336	s.logger.Warn("endpoint injected; this should only be used for testing")
1337	return s.rpcServer.RegisterName(name, handler)
1338}
1339
1340// Stats is used to return statistics for debugging and insight
1341// for various sub-systems
1342func (s *Server) Stats() map[string]map[string]string {
1343	toString := func(v uint64) string {
1344		return strconv.FormatUint(v, 10)
1345	}
1346	numKnownDCs := len(s.router.GetDatacenters())
1347	stats := map[string]map[string]string{
1348		"consul": map[string]string{
1349			"server":            "true",
1350			"leader":            fmt.Sprintf("%v", s.IsLeader()),
1351			"leader_addr":       string(s.raft.Leader()),
1352			"bootstrap":         fmt.Sprintf("%v", s.config.Bootstrap),
1353			"known_datacenters": toString(uint64(numKnownDCs)),
1354		},
1355		"raft":     s.raft.Stats(),
1356		"serf_lan": s.serfLAN.Stats(),
1357		"runtime":  runtimeStats(),
1358	}
1359
1360	if s.ACLsEnabled() {
1361		if s.UseLegacyACLs() {
1362			stats["consul"]["acl"] = "legacy"
1363		} else {
1364			stats["consul"]["acl"] = "enabled"
1365		}
1366	} else {
1367		stats["consul"]["acl"] = "disabled"
1368	}
1369
1370	if s.serfWAN != nil {
1371		stats["serf_wan"] = s.serfWAN.Stats()
1372	}
1373
1374	for outerKey, outerValue := range s.enterpriseStats() {
1375		if _, ok := stats[outerKey]; ok {
1376			for innerKey, innerValue := range outerValue {
1377				stats[outerKey][innerKey] = innerValue
1378			}
1379		} else {
1380			stats[outerKey] = outerValue
1381		}
1382	}
1383
1384	return stats
1385}
1386
1387// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
1388func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
1389	lan, err := s.serfLAN.GetCoordinate()
1390	if err != nil {
1391		return nil, err
1392	}
1393
1394	cs := lib.CoordinateSet{"": lan}
1395	for name, segment := range s.segmentLAN {
1396		c, err := segment.GetCoordinate()
1397		if err != nil {
1398			return nil, err
1399		}
1400		cs[name] = c
1401	}
1402	return cs, nil
1403}
1404
1405// ReloadConfig is used to have the Server do an online reload of
1406// relevant configuration information
1407func (s *Server) ReloadConfig(config *Config) error {
1408	s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
1409	s.rpcConnLimiter.SetConfig(connlimit.Config{
1410		MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
1411	})
1412
1413	if s.IsLeader() {
1414		// only bootstrap the config entries if we are the leader
1415		// this will error if we lose leadership while bootstrapping here.
1416		return s.bootstrapConfigEntries(config.ConfigEntryBootstrap)
1417	}
1418
1419	return nil
1420}
1421
1422// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
1423func (s *Server) setConsistentReadReady() {
1424	atomic.StoreInt32(&s.readyForConsistentReads, 1)
1425}
1426
1427// Atomically reset readiness state flag on leadership revoke
1428func (s *Server) resetConsistentReadReady() {
1429	atomic.StoreInt32(&s.readyForConsistentReads, 0)
1430}
1431
1432// Returns true if this server is ready to serve consistent reads
1433func (s *Server) isReadyForConsistentReads() bool {
1434	return atomic.LoadInt32(&s.readyForConsistentReads) == 1
1435}
1436
1437func (s *Server) intentionReplicationEnabled() bool {
1438	return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter
1439}
1440
1441// CreateACLToken will create an ACL token from the given template
1442func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) {
1443	// we have to require local tokens or else it would require having these servers use a token with acl:write to make a
1444	// token create RPC to the servers in the primary DC.
1445	if !s.LocalTokensEnabled() {
1446		return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter)
1447	}
1448
1449	newToken := *template
1450
1451	// generate the accessor id
1452	if newToken.AccessorID == "" {
1453		accessor, err := lib.GenerateUUID(s.checkTokenUUID)
1454		if err != nil {
1455			return nil, err
1456		}
1457
1458		newToken.AccessorID = accessor
1459	}
1460
1461	// generate the secret id
1462	if newToken.SecretID == "" {
1463		secret, err := lib.GenerateUUID(s.checkTokenUUID)
1464		if err != nil {
1465			return nil, err
1466		}
1467
1468		newToken.SecretID = secret
1469	}
1470
1471	newToken.CreateTime = time.Now()
1472
1473	req := structs.ACLTokenBatchSetRequest{
1474		Tokens: structs.ACLTokens{&newToken},
1475		CAS:    false,
1476	}
1477
1478	// perform the request to mint the new token
1479	if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil {
1480		return nil, err
1481	}
1482
1483	// return the full token definition from the FSM
1484	_, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta)
1485	return token, err
1486}
1487
1488// DatacenterJoinAddresses will return all the strings suitable for usage in
1489// retry join operations to connect to the the LAN or LAN segment gossip pool.
1490func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) {
1491	members, err := s.LANSegmentMembers(segment)
1492	if err != nil {
1493		return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
1494	}
1495
1496	var joinAddrs []string
1497	for _, m := range members {
1498		if ok, _ := metadata.IsConsulServer(m); ok {
1499			serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)}
1500			joinAddrs = append(joinAddrs, serfAddr.String())
1501		}
1502	}
1503
1504	return joinAddrs, nil
1505}
1506
1507// peersInfoContent is used to help operators understand what happened to the
1508// peers.json file. This is written to a file called peers.info in the same
1509// location.
1510const peersInfoContent = `
1511As of Consul 0.7.0, the peers.json file is only used for recovery
1512after an outage. The format of this file depends on what the server has
1513configured for its Raft protocol version. Please see the agent configuration
1514page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more
1515details about this parameter.
1516
1517For Raft protocol version 2 and earlier, this should be formatted as a JSON
1518array containing the address and port of each Consul server in the cluster, like
1519this:
1520
1521[
1522  "10.1.0.1:8300",
1523  "10.1.0.2:8300",
1524  "10.1.0.3:8300"
1525]
1526
1527For Raft protocol version 3 and later, this should be formatted as a JSON
1528array containing the node ID, address:port, and suffrage information of each
1529Consul server in the cluster, like this:
1530
1531[
1532  {
1533    "id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
1534    "address": "10.1.0.1:8300",
1535    "non_voter": false
1536  },
1537  {
1538    "id": "8b6dda82-3103-11e7-93ae-92361f002671",
1539    "address": "10.1.0.2:8300",
1540    "non_voter": false
1541  },
1542  {
1543    "id": "97e17742-3103-11e7-93ae-92361f002671",
1544    "address": "10.1.0.3:8300",
1545    "non_voter": false
1546  }
1547]
1548
1549The "id" field is the node ID of the server. This can be found in the logs when
1550the server starts up, or in the "node-id" file inside the server's data
1551directory.
1552
1553The "address" field is the address and port of the server.
1554
1555The "non_voter" field controls whether the server is a non-voter, which is used
1556in some advanced Autopilot configurations, please see
1557https://www.consul.io/docs/guides/autopilot.html for more information. If
1558"non_voter" is omitted it will default to false, which is typical for most
1559clusters.
1560
1561Under normal operation, the peers.json file will not be present.
1562
1563When Consul starts for the first time, it will create this peers.info file and
1564delete any existing peers.json file so that recovery doesn't occur on the first
1565startup.
1566
1567Once this peers.info file is present, any peers.json file will be ingested at
1568startup, and will set the Raft peer configuration manually to recover from an
1569outage. It's crucial that all servers in the cluster are shut down before
1570creating the peers.json file, and that all servers receive the same
1571configuration. Once the peers.json file is successfully ingested and applied, it
1572will be deleted.
1573
1574Please see https://www.consul.io/docs/guides/outage.html for more information.
1575`
1576