1package consul 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "net" 10 "net/rpc" 11 "os" 12 "path/filepath" 13 "reflect" 14 "strconv" 15 "strings" 16 "sync" 17 "sync/atomic" 18 "time" 19 20 metrics "github.com/armon/go-metrics" 21 "github.com/hashicorp/consul/acl" 22 ca "github.com/hashicorp/consul/agent/connect/ca" 23 "github.com/hashicorp/consul/agent/consul/authmethod" 24 "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" 25 "github.com/hashicorp/consul/agent/consul/autopilot" 26 "github.com/hashicorp/consul/agent/consul/fsm" 27 "github.com/hashicorp/consul/agent/consul/state" 28 "github.com/hashicorp/consul/agent/metadata" 29 "github.com/hashicorp/consul/agent/pool" 30 "github.com/hashicorp/consul/agent/router" 31 "github.com/hashicorp/consul/agent/structs" 32 "github.com/hashicorp/consul/agent/token" 33 "github.com/hashicorp/consul/lib" 34 "github.com/hashicorp/consul/logging" 35 "github.com/hashicorp/consul/tlsutil" 36 "github.com/hashicorp/consul/types" 37 connlimit "github.com/hashicorp/go-connlimit" 38 "github.com/hashicorp/go-hclog" 39 "github.com/hashicorp/go-memdb" 40 "github.com/hashicorp/memberlist" 41 "github.com/hashicorp/raft" 42 raftboltdb "github.com/hashicorp/raft-boltdb" 43 "github.com/hashicorp/serf/serf" 44 "golang.org/x/time/rate" 45) 46 47// These are the protocol versions that Consul can _understand_. These are 48// Consul-level protocol versions, that are used to configure the Serf 49// protocol versions. 50const ( 51 DefaultRPCProtocol = 2 52 53 ProtocolVersionMin uint8 = 2 54 55 // Version 3 added support for network coordinates but we kept the 56 // default protocol version at 2 to ease the transition to this new 57 // feature. A Consul agent speaking version 2 of the protocol will 58 // attempt to send its coordinates to a server who understands version 59 // 3 or greater. 60 ProtocolVersion2Compatible = 2 61 62 ProtocolVersionMax = 3 63) 64 65const ( 66 serfLANSnapshot = "serf/local.snapshot" 67 serfWANSnapshot = "serf/remote.snapshot" 68 raftState = "raft/" 69 snapshotsRetained = 2 70 71 // serverRPCCache controls how long we keep an idle connection 72 // open to a server 73 serverRPCCache = 2 * time.Minute 74 75 // serverMaxStreams controls how many idle streams we keep 76 // open to a server 77 serverMaxStreams = 64 78 79 // raftLogCacheSize is the maximum number of logs to cache in-memory. 80 // This is used to reduce disk I/O for the recently committed entries. 81 raftLogCacheSize = 512 82 83 // raftRemoveGracePeriod is how long we wait to allow a RemovePeer 84 // to replicate to gracefully leave the cluster. 85 raftRemoveGracePeriod = 5 * time.Second 86 87 // serfEventChSize is the size of the buffered channel to get Serf 88 // events. If this is exhausted we will block Serf and Memberlist. 89 serfEventChSize = 2048 90 91 // reconcileChSize is the size of the buffered channel reconcile updates 92 // from Serf with the Catalog. If this is exhausted we will drop updates, 93 // and wait for a periodic reconcile. 94 reconcileChSize = 256 95) 96 97const ( 98 legacyACLReplicationRoutineName = "legacy ACL replication" 99 aclPolicyReplicationRoutineName = "ACL policy replication" 100 aclRoleReplicationRoutineName = "ACL role replication" 101 aclTokenReplicationRoutineName = "ACL token replication" 102 aclTokenReapingRoutineName = "acl token reaping" 103 aclUpgradeRoutineName = "legacy ACL token upgrade" 104 caRootPruningRoutineName = "CA root pruning" 105 configReplicationRoutineName = "config entry replication" 106 federationStateReplicationRoutineName = "federation state replication" 107 federationStateAntiEntropyRoutineName = "federation state anti-entropy" 108 federationStatePruningRoutineName = "federation state pruning" 109 intentionReplicationRoutineName = "intention replication" 110 secondaryCARootWatchRoutineName = "secondary CA roots watch" 111 secondaryCertRenewWatchRoutineName = "secondary cert renew watch" 112) 113 114var ( 115 ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled") 116) 117 118// Server is Consul server which manages the service discovery, 119// health checking, DC forwarding, Raft, and multiple Serf pools. 120type Server struct { 121 // queriesBlocking is a counter that we incr and decr atomically in 122 // rpc calls to provide telemetry on how many blocking queries are running. 123 // We interact with queriesBlocking atomically, do not move without ensuring it is 124 // correctly 64-byte aligned in the struct layout 125 queriesBlocking uint64 126 127 // aclConfig is the configuration for the ACL system 128 aclConfig *acl.Config 129 130 // acls is used to resolve tokens to effective policies 131 acls *ACLResolver 132 133 aclAuthMethodValidators authmethod.Cache 134 135 // DEPRECATED (ACL-Legacy-Compat) - only needed while we support both 136 // useNewACLs is used to determine whether we can use new ACLs or not 137 useNewACLs int32 138 139 // autopilot is the Autopilot instance for this server. 140 autopilot *autopilot.Autopilot 141 142 // autopilotWaitGroup is used to block until Autopilot shuts down. 143 autopilotWaitGroup sync.WaitGroup 144 145 // caProviderReconfigurationLock guards the provider reconfiguration. 146 caProviderReconfigurationLock sync.Mutex 147 // caProvider is the current CA provider in use for Connect. This is 148 // only non-nil when we are the leader. 149 caProvider ca.Provider 150 // caProviderRoot is the CARoot that was stored along with the ca.Provider 151 // active. It's only updated in lock-step with the caProvider. This prevents 152 // races between state updates to active roots and the fetch of the provider 153 // instance. 154 caProviderRoot *structs.CARoot 155 caProviderLock sync.RWMutex 156 157 // rate limiter to use when signing leaf certificates 158 caLeafLimiter connectSignRateLimiter 159 160 // Consul configuration 161 config *Config 162 163 // configReplicator is used to manage the leaders replication routines for 164 // centralized config 165 configReplicator *Replicator 166 167 // federationStateReplicator is used to manage the leaders replication routines for 168 // federation states 169 federationStateReplicator *Replicator 170 171 // dcSupportsFederationStates is used to determine whether we can 172 // replicate federation states or not. All servers in the local 173 // DC must be on a version of Consul supporting federation states 174 // before this will get enabled. 175 dcSupportsFederationStates int32 176 177 // tokens holds ACL tokens initially from the configuration, but can 178 // be updated at runtime, so should always be used instead of going to 179 // the configuration directly. 180 tokens *token.Store 181 182 // Connection pool to other consul servers 183 connPool *pool.ConnPool 184 185 // eventChLAN is used to receive events from the 186 // serf cluster in the datacenter 187 eventChLAN chan serf.Event 188 189 // eventChWAN is used to receive events from the 190 // serf cluster that spans datacenters 191 eventChWAN chan serf.Event 192 193 // fsm is the state machine used with Raft to provide 194 // strong consistency. 195 fsm *fsm.FSM 196 197 // Logger uses the provided LogOutput 198 logger hclog.InterceptLogger 199 loggers *loggerStore 200 201 // The raft instance is used among Consul nodes within the DC to protect 202 // operations that require strong consistency. 203 // the state directly. 204 raft *raft.Raft 205 raftLayer *RaftLayer 206 raftStore *raftboltdb.BoltStore 207 raftTransport *raft.NetworkTransport 208 raftInmem *raft.InmemStore 209 210 // raftNotifyCh is set up by setupRaft() and ensures that we get reliable leader 211 // transition notifications from the Raft layer. 212 raftNotifyCh <-chan bool 213 214 // reconcileCh is used to pass events from the serf handler 215 // into the leader manager, so that the strong state can be 216 // updated 217 reconcileCh chan serf.Member 218 219 // readyForConsistentReads is used to track when the leader server is 220 // ready to serve consistent reads, after it has applied its initial 221 // barrier. This is updated atomically. 222 readyForConsistentReads int32 223 224 // leaveCh is used to signal that the server is leaving the cluster 225 // and trying to shed its RPC traffic onto other Consul servers. This 226 // is only ever closed. 227 leaveCh chan struct{} 228 229 // router is used to map out Consul servers in the WAN and in Consul 230 // Enterprise user-defined areas. 231 router *router.Router 232 233 // rpcLimiter is used to rate limit the total number of RPCs initiated 234 // from an agent. 235 rpcLimiter atomic.Value 236 237 // rpcConnLimiter limits the number of RPC connections from a single source IP 238 rpcConnLimiter connlimit.Limiter 239 240 // Listener is used to listen for incoming connections 241 Listener net.Listener 242 rpcServer *rpc.Server 243 244 // insecureRPCServer is a RPC server that is configure with 245 // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign 246 // to request client certificates. At this point a client doesn't have 247 // a client cert and thus cannot present it. This is the only RPC 248 // Endpoint that is available at the time of writing. 249 insecureRPCServer *rpc.Server 250 251 // tlsConfigurator holds the agent configuration relevant to TLS and 252 // configures everything related to it. 253 tlsConfigurator *tlsutil.Configurator 254 255 // serfLAN is the Serf cluster maintained inside the DC 256 // which contains all the DC nodes 257 serfLAN *serf.Serf 258 259 // segmentLAN maps segment names to their Serf cluster 260 segmentLAN map[string]*serf.Serf 261 262 // serfWAN is the Serf cluster maintained between DC's 263 // which SHOULD only consist of Consul servers 264 serfWAN *serf.Serf 265 memberlistTransportWAN memberlist.IngestionAwareTransport 266 gatewayLocator *GatewayLocator 267 268 // serverLookup tracks server consuls in the local datacenter. 269 // Used to do leader forwarding and provide fast lookup by server id and address 270 serverLookup *ServerLookup 271 272 // floodLock controls access to floodCh. 273 floodLock sync.RWMutex 274 floodCh []chan struct{} 275 276 // sessionTimers track the expiration time of each Session that has 277 // a TTL. On expiration, a SessionDestroy event will occur, and 278 // destroy the session via standard session destroy processing 279 sessionTimers *SessionTimers 280 281 // statsFetcher is used by autopilot to check the status of the other 282 // Consul router. 283 statsFetcher *StatsFetcher 284 285 // reassertLeaderCh is used to signal the leader loop should re-run 286 // leadership actions after a snapshot restore. 287 reassertLeaderCh chan chan error 288 289 // tombstoneGC is used to track the pending GC invocations 290 // for the KV tombstones 291 tombstoneGC *state.TombstoneGC 292 293 // aclReplicationStatus (and its associated lock) provide information 294 // about the health of the ACL replication goroutine. 295 aclReplicationStatus structs.ACLReplicationStatus 296 aclReplicationStatusLock sync.RWMutex 297 298 // shutdown and the associated members here are used in orchestrating 299 // a clean shutdown. The shutdownCh is never written to, only closed to 300 // indicate a shutdown has been initiated. 301 shutdown bool 302 shutdownCh chan struct{} 303 shutdownLock sync.Mutex 304 305 // State for whether this datacenter is acting as a secondary CA. 306 actingSecondaryCA bool 307 actingSecondaryLock sync.RWMutex 308 309 // Manager to handle starting/stopping go routines when establishing/revoking raft leadership 310 leaderRoutineManager *LeaderRoutineManager 311 312 // embedded struct to hold all the enterprise specific data 313 EnterpriseServer 314} 315 316// NewServer is only used to help setting up a server for testing. Normal code 317// exercises NewServerLogger. 318func NewServer(config *Config) (*Server, error) { 319 c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil) 320 if err != nil { 321 return nil, err 322 } 323 return NewServerLogger(config, nil, new(token.Store), c) 324} 325 326// NewServerLogger is used to construct a new Consul server from the 327// configuration, potentially returning an error 328func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token.Store, tlsConfigurator *tlsutil.Configurator) (*Server, error) { 329 return NewServerWithOptions(config, 330 WithLogger(logger), 331 WithTokenStore(tokens), 332 WithTLSConfigurator(tlsConfigurator)) 333} 334 335// NewServerWithOptions is used to construct a new Consul server from the configuration 336// and extra options, potentially returning an error 337func NewServerWithOptions(config *Config, options ...ConsulOption) (*Server, error) { 338 flat := flattenConsulOptions(options) 339 340 logger := flat.logger 341 tokens := flat.tokens 342 tlsConfigurator := flat.tlsConfigurator 343 connPool := flat.connPool 344 345 // Check the protocol version. 346 if err := config.CheckProtocolVersion(); err != nil { 347 return nil, err 348 } 349 350 // Check for a data directory. 351 if config.DataDir == "" && !config.DevMode { 352 return nil, fmt.Errorf("Config must provide a DataDir") 353 } 354 355 // Sanity check the ACLs. 356 if err := config.CheckACL(); err != nil { 357 return nil, err 358 } 359 360 // Ensure we have a log output and create a logger. 361 if config.LogOutput == nil { 362 config.LogOutput = os.Stderr 363 } 364 365 if logger == nil { 366 logger = hclog.NewInterceptLogger(&hclog.LoggerOptions{ 367 Level: hclog.Debug, 368 Output: config.LogOutput, 369 }) 370 } 371 372 // Check if TLS is enabled 373 if config.CAFile != "" || config.CAPath != "" { 374 config.UseTLS = true 375 } 376 377 // Set the primary DC if it wasn't set. 378 if config.PrimaryDatacenter == "" { 379 if config.ACLDatacenter != "" { 380 config.PrimaryDatacenter = config.ACLDatacenter 381 } else { 382 config.PrimaryDatacenter = config.Datacenter 383 } 384 } 385 386 if config.PrimaryDatacenter != "" { 387 config.ACLDatacenter = config.PrimaryDatacenter 388 } 389 390 // Create the tombstone GC. 391 gc, err := state.NewTombstoneGC(config.TombstoneTTL, config.TombstoneTTLGranularity) 392 if err != nil { 393 return nil, err 394 } 395 396 // Create the shutdown channel - this is closed but never written to. 397 shutdownCh := make(chan struct{}) 398 399 if connPool == nil { 400 connPool = &pool.ConnPool{ 401 Server: true, 402 SrcAddr: config.RPCSrcAddr, 403 LogOutput: config.LogOutput, 404 MaxTime: serverRPCCache, 405 MaxStreams: serverMaxStreams, 406 TLSConfigurator: tlsConfigurator, 407 Datacenter: config.Datacenter, 408 } 409 } 410 411 serverLogger := logger.NamedIntercept(logging.ConsulServer) 412 loggers := newLoggerStore(serverLogger) 413 // Create server. 414 s := &Server{ 415 config: config, 416 tokens: tokens, 417 connPool: connPool, 418 eventChLAN: make(chan serf.Event, serfEventChSize), 419 eventChWAN: make(chan serf.Event, serfEventChSize), 420 logger: serverLogger, 421 loggers: loggers, 422 leaveCh: make(chan struct{}), 423 reconcileCh: make(chan serf.Member, reconcileChSize), 424 router: router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter)), 425 rpcServer: rpc.NewServer(), 426 insecureRPCServer: rpc.NewServer(), 427 tlsConfigurator: tlsConfigurator, 428 reassertLeaderCh: make(chan chan error), 429 segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), 430 sessionTimers: NewSessionTimers(), 431 tombstoneGC: gc, 432 serverLookup: NewServerLookup(), 433 shutdownCh: shutdownCh, 434 leaderRoutineManager: NewLeaderRoutineManager(logger), 435 aclAuthMethodValidators: authmethod.NewCache(), 436 } 437 438 if s.config.ConnectMeshGatewayWANFederationEnabled { 439 s.gatewayLocator = NewGatewayLocator( 440 s.logger, 441 s, 442 s.config.Datacenter, 443 s.config.PrimaryDatacenter, 444 ) 445 s.connPool.GatewayResolver = s.gatewayLocator.PickGateway 446 } 447 448 // Initialize enterprise specific server functionality 449 if err := s.initEnterprise(); err != nil { 450 s.Shutdown() 451 return nil, err 452 } 453 454 s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) 455 456 configReplicatorConfig := ReplicatorConfig{ 457 Name: logging.ConfigEntry, 458 Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig}, 459 Rate: s.config.ConfigReplicationRate, 460 Burst: s.config.ConfigReplicationBurst, 461 Logger: s.logger, 462 } 463 s.configReplicator, err = NewReplicator(&configReplicatorConfig) 464 if err != nil { 465 s.Shutdown() 466 return nil, err 467 } 468 469 federationStateReplicatorConfig := ReplicatorConfig{ 470 Name: logging.FederationState, 471 Delegate: &IndexReplicator{ 472 Delegate: &FederationStateReplicator{ 473 srv: s, 474 gatewayLocator: s.gatewayLocator, 475 }, 476 Logger: s.logger, 477 }, 478 Rate: s.config.FederationStateReplicationRate, 479 Burst: s.config.FederationStateReplicationBurst, 480 Logger: logger, 481 SuppressErrorLog: isErrFederationStatesNotSupported, 482 } 483 s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig) 484 if err != nil { 485 s.Shutdown() 486 return nil, err 487 } 488 489 // Initialize the stats fetcher that autopilot will use. 490 s.statsFetcher = NewStatsFetcher(logger, s.connPool, s.config.Datacenter) 491 492 s.aclConfig = newACLConfig(logger) 493 s.useNewACLs = 0 494 aclConfig := ACLResolverConfig{ 495 Config: config, 496 Delegate: s, 497 CacheConfig: serverACLCacheConfig, 498 AutoDisable: false, 499 Logger: logger, 500 ACLConfig: s.aclConfig, 501 } 502 // Initialize the ACL resolver. 503 if s.acls, err = NewACLResolver(&aclConfig); err != nil { 504 s.Shutdown() 505 return nil, fmt.Errorf("Failed to create ACL resolver: %v", err) 506 } 507 508 // Initialize the RPC layer. 509 if err := s.setupRPC(); err != nil { 510 s.Shutdown() 511 return nil, fmt.Errorf("Failed to start RPC layer: %v", err) 512 } 513 514 // Initialize any extra RPC listeners for segments. 515 segmentListeners, err := s.setupSegmentRPC() 516 if err != nil { 517 s.Shutdown() 518 return nil, fmt.Errorf("Failed to start segment RPC layer: %v", err) 519 } 520 521 // Initialize the Raft server. 522 if err := s.setupRaft(); err != nil { 523 s.Shutdown() 524 return nil, fmt.Errorf("Failed to start Raft: %v", err) 525 } 526 527 if s.config.ConnectEnabled && (s.config.AutoEncryptAllowTLS || s.config.AutoConfigAuthzEnabled) { 528 go s.connectCARootsMonitor(&lib.StopChannelContext{StopCh: s.shutdownCh}) 529 } 530 531 if s.gatewayLocator != nil { 532 go s.gatewayLocator.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) 533 } 534 535 // Serf and dynamic bind ports 536 // 537 // The LAN serf cluster announces the port of the WAN serf cluster 538 // which creates a race when the WAN cluster is supposed to bind to 539 // a dynamic port (port 0). The current memberlist implementation will 540 // update the bind port in the configuration after the memberlist is 541 // created, so we can pull it out from there reliably, even though it's 542 // a little gross to be reading the updated config. 543 544 // Initialize the WAN Serf if enabled 545 serfBindPortWAN := -1 546 if config.SerfWANConfig != nil { 547 serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort 548 s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener) 549 if err != nil { 550 s.Shutdown() 551 return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) 552 } 553 554 // This is always a *memberlist.NetTransport or something which wraps 555 // it which satisfies this interface. 556 s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(memberlist.IngestionAwareTransport) 557 558 // See big comment above why we are doing this. 559 if serfBindPortWAN == 0 { 560 serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort 561 if serfBindPortWAN == 0 { 562 return nil, fmt.Errorf("Failed to get dynamic bind port for WAN Serf") 563 } 564 s.logger.Info("Serf WAN TCP bound", "port", serfBindPortWAN) 565 } 566 } 567 568 // Initialize the LAN segments before the default LAN Serf so we have 569 // updated port information to publish there. 570 if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil { 571 s.Shutdown() 572 return nil, fmt.Errorf("Failed to setup network segments: %v", err) 573 } 574 575 // Initialize the LAN Serf for the default network segment. 576 s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener) 577 if err != nil { 578 s.Shutdown() 579 return nil, fmt.Errorf("Failed to start LAN Serf: %v", err) 580 } 581 go s.lanEventHandler() 582 583 // Start the flooders after the LAN event handler is wired up. 584 s.floodSegments(config) 585 586 // Add a "static route" to the WAN Serf and hook it up to Serf events. 587 if s.serfWAN != nil { 588 if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil { 589 s.Shutdown() 590 return nil, fmt.Errorf("Failed to add WAN serf route: %v", err) 591 } 592 go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) 593 594 // Fire up the LAN <-> WAN join flooder. 595 addrFn := func(s *metadata.Server) (string, error) { 596 if s.WanJoinPort == 0 { 597 return "", fmt.Errorf("no wan join port for server: %s", s.Addr.String()) 598 } 599 addr, _, err := net.SplitHostPort(s.Addr.String()) 600 if err != nil { 601 return "", err 602 } 603 return fmt.Sprintf("%s:%d", addr, s.WanJoinPort), nil 604 } 605 go s.Flood(addrFn, s.serfWAN) 606 } 607 608 // Start enterprise specific functionality 609 if err := s.startEnterprise(); err != nil { 610 s.Shutdown() 611 return nil, err 612 } 613 614 // Initialize Autopilot. This must happen before starting leadership monitoring 615 // as establishing leadership could attempt to use autopilot and cause a panic. 616 s.initAutopilot(config) 617 618 // Start monitoring leadership. This must happen after Serf is set up 619 // since it can fire events when leadership is obtained. 620 go s.monitorLeadership() 621 622 // Start listening for RPC requests. 623 go s.listen(s.Listener) 624 625 // Start listeners for any segments with separate RPC listeners. 626 for _, listener := range segmentListeners { 627 go s.listen(listener) 628 } 629 630 // Start the metrics handlers. 631 go s.updateMetrics() 632 633 return s, nil 634} 635 636func (s *Server) connectCARootsMonitor(ctx context.Context) { 637 for { 638 ws := memdb.NewWatchSet() 639 state := s.fsm.State() 640 ws.Add(state.AbandonCh()) 641 _, cas, err := state.CARoots(ws) 642 if err != nil { 643 s.logger.Error("Failed to watch AutoEncrypt CARoot", "error", err) 644 return 645 } 646 caPems := []string{} 647 for _, ca := range cas { 648 caPems = append(caPems, ca.RootCert) 649 } 650 if err := s.tlsConfigurator.UpdateAutoTLSCA(caPems); err != nil { 651 s.logger.Error("Failed to update AutoEncrypt CAPems", "error", err) 652 } 653 654 if err := ws.WatchCtx(ctx); err == context.Canceled { 655 s.logger.Info("shutting down Connect CA roots monitor") 656 return 657 } 658 } 659} 660 661// setupRaft is used to setup and initialize Raft 662func (s *Server) setupRaft() error { 663 // If we have an unclean exit then attempt to close the Raft store. 664 defer func() { 665 if s.raft == nil && s.raftStore != nil { 666 if err := s.raftStore.Close(); err != nil { 667 s.logger.Error("failed to close Raft store", "error", err) 668 } 669 } 670 }() 671 672 // Create the FSM. 673 var err error 674 s.fsm, err = fsm.New(s.tombstoneGC, s.logger) 675 if err != nil { 676 return err 677 } 678 679 var serverAddressProvider raft.ServerAddressProvider = nil 680 if s.config.RaftConfig.ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher 681 serverAddressProvider = s.serverLookup 682 } 683 684 // Create a transport layer. 685 transConfig := &raft.NetworkTransportConfig{ 686 Stream: s.raftLayer, 687 MaxPool: 3, 688 Timeout: 10 * time.Second, 689 ServerAddressProvider: serverAddressProvider, 690 Logger: s.loggers.Named(logging.Raft), 691 } 692 693 trans := raft.NewNetworkTransportWithConfig(transConfig) 694 s.raftTransport = trans 695 s.config.RaftConfig.Logger = s.loggers.Named(logging.Raft) 696 697 // Versions of the Raft protocol below 3 require the LocalID to match the network 698 // address of the transport. 699 s.config.RaftConfig.LocalID = raft.ServerID(trans.LocalAddr()) 700 if s.config.RaftConfig.ProtocolVersion >= 3 { 701 s.config.RaftConfig.LocalID = raft.ServerID(s.config.NodeID) 702 } 703 704 // Build an all in-memory setup for dev mode, otherwise prepare a full 705 // disk-based setup. 706 var log raft.LogStore 707 var stable raft.StableStore 708 var snap raft.SnapshotStore 709 if s.config.DevMode { 710 store := raft.NewInmemStore() 711 s.raftInmem = store 712 stable = store 713 log = store 714 snap = raft.NewInmemSnapshotStore() 715 } else { 716 // Create the base raft path. 717 path := filepath.Join(s.config.DataDir, raftState) 718 if err := lib.EnsurePath(path, true); err != nil { 719 return err 720 } 721 722 // Create the backend raft store for logs and stable storage. 723 store, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db")) 724 if err != nil { 725 return err 726 } 727 s.raftStore = store 728 stable = store 729 730 // Wrap the store in a LogCache to improve performance. 731 cacheStore, err := raft.NewLogCache(raftLogCacheSize, store) 732 if err != nil { 733 return err 734 } 735 log = cacheStore 736 737 // Create the snapshot store. 738 snapshots, err := raft.NewFileSnapshotStore(path, snapshotsRetained, s.config.LogOutput) 739 if err != nil { 740 return err 741 } 742 snap = snapshots 743 744 // For an existing cluster being upgraded to the new version of 745 // Raft, we almost never want to run recovery based on the old 746 // peers.json file. We create a peers.info file with a helpful 747 // note about where peers.json went, and use that as a sentinel 748 // to avoid ingesting the old one that first time (if we have to 749 // create the peers.info file because it's not there, we also 750 // blow away any existing peers.json file). 751 peersFile := filepath.Join(path, "peers.json") 752 peersInfoFile := filepath.Join(path, "peers.info") 753 if _, err := os.Stat(peersInfoFile); os.IsNotExist(err) { 754 if err := ioutil.WriteFile(peersInfoFile, []byte(peersInfoContent), 0755); err != nil { 755 return fmt.Errorf("failed to write peers.info file: %v", err) 756 } 757 758 // Blow away the peers.json file if present, since the 759 // peers.info sentinel wasn't there. 760 if _, err := os.Stat(peersFile); err == nil { 761 if err := os.Remove(peersFile); err != nil { 762 return fmt.Errorf("failed to delete peers.json, please delete manually (see peers.info for details): %v", err) 763 } 764 s.logger.Info("deleted peers.json file (see peers.info for details)") 765 } 766 } else if _, err := os.Stat(peersFile); err == nil { 767 s.logger.Info("found peers.json file, recovering Raft configuration...") 768 769 var configuration raft.Configuration 770 if s.config.RaftConfig.ProtocolVersion < 3 { 771 configuration, err = raft.ReadPeersJSON(peersFile) 772 } else { 773 configuration, err = raft.ReadConfigJSON(peersFile) 774 } 775 if err != nil { 776 return fmt.Errorf("recovery failed to parse peers.json: %v", err) 777 } 778 779 tmpFsm, err := fsm.New(s.tombstoneGC, s.logger) 780 if err != nil { 781 return fmt.Errorf("recovery failed to make temp FSM: %v", err) 782 } 783 if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm, 784 log, stable, snap, trans, configuration); err != nil { 785 return fmt.Errorf("recovery failed: %v", err) 786 } 787 788 if err := os.Remove(peersFile); err != nil { 789 return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err) 790 } 791 s.logger.Info("deleted peers.json file after successful recovery") 792 } 793 } 794 795 // If we are in bootstrap or dev mode and the state is clean then we can 796 // bootstrap now. 797 if s.config.Bootstrap || s.config.DevMode { 798 hasState, err := raft.HasExistingState(log, stable, snap) 799 if err != nil { 800 return err 801 } 802 if !hasState { 803 configuration := raft.Configuration{ 804 Servers: []raft.Server{ 805 raft.Server{ 806 ID: s.config.RaftConfig.LocalID, 807 Address: trans.LocalAddr(), 808 }, 809 }, 810 } 811 if err := raft.BootstrapCluster(s.config.RaftConfig, 812 log, stable, snap, trans, configuration); err != nil { 813 return err 814 } 815 } 816 } 817 818 // Set up a channel for reliable leader notifications. 819 raftNotifyCh := make(chan bool, 10) 820 s.config.RaftConfig.NotifyCh = raftNotifyCh 821 s.raftNotifyCh = raftNotifyCh 822 823 // Setup the Raft store. 824 s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm.ChunkingFSM(), log, stable, snap, trans) 825 if err != nil { 826 return err 827 } 828 return nil 829} 830 831// endpointFactory is a function that returns an RPC endpoint bound to the given 832// server. 833type factory func(s *Server) interface{} 834 835// endpoints is a list of registered RPC endpoint factories. 836var endpoints []factory 837 838// registerEndpoint registers a new RPC endpoint factory. 839func registerEndpoint(fn factory) { 840 endpoints = append(endpoints, fn) 841} 842 843// setupRPC is used to setup the RPC listener 844func (s *Server) setupRPC() error { 845 s.rpcConnLimiter.SetConfig(connlimit.Config{ 846 MaxConnsPerClientIP: s.config.RPCMaxConnsPerClient, 847 }) 848 849 for _, fn := range endpoints { 850 s.rpcServer.Register(fn(s)) 851 } 852 853 // Only register AutoEncrypt on the insecure RPC server. Insecure only 854 // means that verify incoming is turned off even though it might have 855 // been configured. 856 s.insecureRPCServer.Register(&AutoEncrypt{srv: s}) 857 858 // Setup the AutoConfig JWT Authorizer 859 var authz AutoConfigAuthorizer 860 if s.config.AutoConfigAuthzEnabled { 861 // create the auto config authorizer from the JWT authmethod 862 validator, err := ssoauth.NewValidator(s.logger, &s.config.AutoConfigAuthzAuthMethod) 863 if err != nil { 864 return fmt.Errorf("Failed to initialize JWT Auto Config Authorizer: %w", err) 865 } 866 867 authz = &jwtAuthorizer{ 868 validator: validator, 869 allowReuse: s.config.AutoConfigAuthzAllowReuse, 870 claimAssertions: s.config.AutoConfigAuthzClaimAssertions, 871 } 872 } else { 873 // This authorizer always returns that the endpoint is disabled 874 authz = &disabledAuthorizer{} 875 } 876 // now register with the insecure RPC server 877 s.insecureRPCServer.Register(NewAutoConfig(s.config, s.tlsConfigurator, s, authz)) 878 879 ln, err := net.ListenTCP("tcp", s.config.RPCAddr) 880 if err != nil { 881 return err 882 } 883 s.Listener = ln 884 885 if s.config.NotifyListen != nil { 886 s.config.NotifyListen() 887 } 888 // todo(fs): we should probably guard this 889 if s.config.RPCAdvertise == nil { 890 s.config.RPCAdvertise = ln.Addr().(*net.TCPAddr) 891 } 892 893 // Verify that we have a usable advertise address 894 if s.config.RPCAdvertise.IP.IsUnspecified() { 895 ln.Close() 896 return fmt.Errorf("RPC advertise address is not advertisable: %v", s.config.RPCAdvertise) 897 } 898 899 // TODO (hans) switch NewRaftLayer to tlsConfigurator 900 901 // Provide a DC specific wrapper. Raft replication is only 902 // ever done in the same datacenter, so we can provide it as a constant. 903 wrapper := tlsutil.SpecificDC(s.config.Datacenter, s.tlsConfigurator.OutgoingRPCWrapper()) 904 905 // Define a callback for determining whether to wrap a connection with TLS 906 tlsFunc := func(address raft.ServerAddress) bool { 907 // raft only talks to its own datacenter 908 return s.tlsConfigurator.UseTLS(s.config.Datacenter) 909 } 910 s.raftLayer = NewRaftLayer(s.config.RPCSrcAddr, s.config.RPCAdvertise, wrapper, tlsFunc) 911 return nil 912} 913 914// Shutdown is used to shutdown the server 915func (s *Server) Shutdown() error { 916 s.logger.Info("shutting down server") 917 s.shutdownLock.Lock() 918 defer s.shutdownLock.Unlock() 919 920 if s.shutdown { 921 return nil 922 } 923 924 s.shutdown = true 925 close(s.shutdownCh) 926 927 // ensure that any leader routines still running get canceled 928 if s.leaderRoutineManager != nil { 929 s.leaderRoutineManager.StopAll() 930 } 931 932 if s.serfLAN != nil { 933 s.serfLAN.Shutdown() 934 } 935 936 if s.serfWAN != nil { 937 s.serfWAN.Shutdown() 938 if err := s.router.RemoveArea(types.AreaWAN); err != nil { 939 s.logger.Warn("error removing WAN area", "error", err) 940 } 941 } 942 s.router.Shutdown() 943 944 if s.raft != nil { 945 s.raftTransport.Close() 946 s.raftLayer.Close() 947 future := s.raft.Shutdown() 948 if err := future.Error(); err != nil { 949 s.logger.Warn("error shutting down raft", "error", err) 950 } 951 if s.raftStore != nil { 952 s.raftStore.Close() 953 } 954 } 955 956 if s.Listener != nil { 957 s.Listener.Close() 958 } 959 960 // Close the connection pool 961 if s.connPool != nil { 962 s.connPool.Shutdown() 963 } 964 965 if s.acls != nil { 966 s.acls.Close() 967 } 968 969 if s.config.NotifyShutdown != nil { 970 s.config.NotifyShutdown() 971 } 972 973 return nil 974} 975 976// Leave is used to prepare for a graceful shutdown of the server 977func (s *Server) Leave() error { 978 s.logger.Info("server starting leave") 979 980 // Check the number of known peers 981 numPeers, err := s.numPeers() 982 if err != nil { 983 s.logger.Error("failed to check raft peers", "error", err) 984 return err 985 } 986 987 addr := s.raftTransport.LocalAddr() 988 989 // If we are the current leader, and we have any other peers (cluster has multiple 990 // servers), we should do a RemoveServer/RemovePeer to safely reduce the quorum size. 991 // If we are not the leader, then we should issue our leave intention and wait to be 992 // removed for some sane period of time. 993 isLeader := s.IsLeader() 994 if isLeader && numPeers > 1 { 995 minRaftProtocol, err := s.autopilot.MinRaftProtocol() 996 if err != nil { 997 return err 998 } 999 1000 if minRaftProtocol >= 2 && s.config.RaftConfig.ProtocolVersion >= 3 { 1001 future := s.raft.RemoveServer(raft.ServerID(s.config.NodeID), 0, 0) 1002 if err := future.Error(); err != nil { 1003 s.logger.Error("failed to remove ourself as raft peer", "error", err) 1004 } 1005 } else { 1006 future := s.raft.RemovePeer(addr) 1007 if err := future.Error(); err != nil { 1008 s.logger.Error("failed to remove ourself as raft peer", "error", err) 1009 } 1010 } 1011 } 1012 1013 // Leave the WAN pool 1014 if s.serfWAN != nil { 1015 if err := s.serfWAN.Leave(); err != nil { 1016 s.logger.Error("failed to leave WAN Serf cluster", "error", err) 1017 } 1018 } 1019 1020 // Leave the LAN pool 1021 if s.serfLAN != nil { 1022 if err := s.serfLAN.Leave(); err != nil { 1023 s.logger.Error("failed to leave LAN Serf cluster", "error", err) 1024 } 1025 } 1026 1027 // Leave everything enterprise related as well 1028 s.handleEnterpriseLeave() 1029 1030 // Start refusing RPCs now that we've left the LAN pool. It's important 1031 // to do this *after* we've left the LAN pool so that clients will know 1032 // to shift onto another server if they perform a retry. We also wake up 1033 // all queries in the RPC retry state. 1034 s.logger.Info("Waiting to drain RPC traffic", "drain_time", s.config.LeaveDrainTime) 1035 close(s.leaveCh) 1036 time.Sleep(s.config.LeaveDrainTime) 1037 1038 // If we were not leader, wait to be safely removed from the cluster. We 1039 // must wait to allow the raft replication to take place, otherwise an 1040 // immediate shutdown could cause a loss of quorum. 1041 if !isLeader { 1042 left := false 1043 limit := time.Now().Add(raftRemoveGracePeriod) 1044 for !left && time.Now().Before(limit) { 1045 // Sleep a while before we check. 1046 time.Sleep(50 * time.Millisecond) 1047 1048 // Get the latest configuration. 1049 future := s.raft.GetConfiguration() 1050 if err := future.Error(); err != nil { 1051 s.logger.Error("failed to get raft configuration", "error", err) 1052 break 1053 } 1054 1055 // See if we are no longer included. 1056 left = true 1057 for _, server := range future.Configuration().Servers { 1058 if server.Address == addr { 1059 left = false 1060 break 1061 } 1062 } 1063 } 1064 1065 // TODO (slackpad) With the old Raft library we used to force the 1066 // peers set to empty when a graceful leave occurred. This would 1067 // keep voting spam down if the server was restarted, but it was 1068 // dangerous because the peers was inconsistent with the logs and 1069 // snapshots, so it wasn't really safe in all cases for the server 1070 // to become leader. This is now safe, but the log spam is noisy. 1071 // The next new version of the library will have a "you are not a 1072 // peer stop it" behavior that should address this. We will have 1073 // to evaluate during the RC period if this interim situation is 1074 // not too confusing for operators. 1075 1076 // TODO (slackpad) When we take a later new version of the Raft 1077 // library it won't try to complete replication, so this peer 1078 // may not realize that it has been removed. Need to revisit this 1079 // and the warning here. 1080 if !left { 1081 s.logger.Warn("failed to leave raft configuration gracefully, timeout") 1082 } 1083 } 1084 1085 return nil 1086} 1087 1088// numPeers is used to check on the number of known peers, including potentially 1089// the local node. We count only voters, since others can't actually become 1090// leader, so aren't considered peers. 1091func (s *Server) numPeers() (int, error) { 1092 future := s.raft.GetConfiguration() 1093 if err := future.Error(); err != nil { 1094 return 0, err 1095 } 1096 1097 return autopilot.NumPeers(future.Configuration()), nil 1098} 1099 1100// JoinLAN is used to have Consul join the inner-DC pool 1101// The target address should be another node inside the DC 1102// listening on the Serf LAN address 1103func (s *Server) JoinLAN(addrs []string) (int, error) { 1104 return s.serfLAN.Join(addrs, true) 1105} 1106 1107// JoinWAN is used to have Consul join the cross-WAN Consul ring 1108// The target address should be another node listening on the 1109// Serf WAN address 1110func (s *Server) JoinWAN(addrs []string) (int, error) { 1111 if s.serfWAN == nil { 1112 return 0, ErrWANFederationDisabled 1113 } 1114 return s.serfWAN.Join(addrs, true) 1115} 1116 1117// PrimaryMeshGatewayAddressesReadyCh returns a channel that will be closed 1118// when federation state replication ships back at least one primary mesh 1119// gateway (not via fallback config). 1120func (s *Server) PrimaryMeshGatewayAddressesReadyCh() <-chan struct{} { 1121 if s.gatewayLocator == nil { 1122 return nil 1123 } 1124 return s.gatewayLocator.PrimaryMeshGatewayAddressesReadyCh() 1125} 1126 1127// PickRandomMeshGatewaySuitableForDialing is a convenience function used for writing tests. 1128func (s *Server) PickRandomMeshGatewaySuitableForDialing(dc string) string { 1129 if s.gatewayLocator == nil { 1130 return "" 1131 } 1132 return s.gatewayLocator.PickGateway(dc) 1133} 1134 1135// RefreshPrimaryGatewayFallbackAddresses is used to update the list of current 1136// fallback addresses for locating mesh gateways in the primary datacenter. 1137func (s *Server) RefreshPrimaryGatewayFallbackAddresses(addrs []string) { 1138 if s.gatewayLocator != nil { 1139 s.gatewayLocator.RefreshPrimaryGatewayFallbackAddresses(addrs) 1140 } 1141} 1142 1143// PrimaryGatewayFallbackAddresses returns the current set of discovered 1144// fallback addresses for the mesh gateways in the primary datacenter. 1145func (s *Server) PrimaryGatewayFallbackAddresses() []string { 1146 if s.gatewayLocator == nil { 1147 return nil 1148 } 1149 return s.gatewayLocator.PrimaryGatewayFallbackAddresses() 1150} 1151 1152// LocalMember is used to return the local node 1153func (s *Server) LocalMember() serf.Member { 1154 return s.serfLAN.LocalMember() 1155} 1156 1157// LANMembers is used to return the members of the LAN cluster 1158func (s *Server) LANMembers() []serf.Member { 1159 return s.serfLAN.Members() 1160} 1161 1162// WANMembers is used to return the members of the LAN cluster 1163func (s *Server) WANMembers() []serf.Member { 1164 if s.serfWAN == nil { 1165 return nil 1166 } 1167 return s.serfWAN.Members() 1168} 1169 1170// RemoveFailedNode is used to remove a failed node from the cluster 1171func (s *Server) RemoveFailedNode(node string, prune bool) error { 1172 var removeFn func(*serf.Serf, string) error 1173 if prune { 1174 removeFn = (*serf.Serf).RemoveFailedNodePrune 1175 } else { 1176 removeFn = (*serf.Serf).RemoveFailedNode 1177 } 1178 1179 if err := removeFn(s.serfLAN, node); err != nil { 1180 return err 1181 } 1182 // The Serf WAN pool stores members as node.datacenter 1183 // so the dc is appended if not present 1184 if !strings.HasSuffix(node, "."+s.config.Datacenter) { 1185 node = node + "." + s.config.Datacenter 1186 } 1187 if s.serfWAN != nil { 1188 if err := removeFn(s.serfWAN, node); err != nil { 1189 return err 1190 } 1191 } 1192 return nil 1193} 1194 1195// IsLeader checks if this server is the cluster leader 1196func (s *Server) IsLeader() bool { 1197 return s.raft.State() == raft.Leader 1198} 1199 1200// LeaderLastContact returns the time of last contact by a leader. 1201// This only makes sense if we are currently a follower. 1202func (s *Server) LeaderLastContact() time.Time { 1203 return s.raft.LastContact() 1204} 1205 1206// KeyManagerLAN returns the LAN Serf keyring manager 1207func (s *Server) KeyManagerLAN() *serf.KeyManager { 1208 return s.serfLAN.KeyManager() 1209} 1210 1211// KeyManagerWAN returns the WAN Serf keyring manager 1212func (s *Server) KeyManagerWAN() *serf.KeyManager { 1213 return s.serfWAN.KeyManager() 1214} 1215 1216// LANSegments returns a map of LAN segments by name 1217func (s *Server) LANSegments() map[string]*serf.Serf { 1218 segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1) 1219 segments[""] = s.serfLAN 1220 for name, segment := range s.segmentLAN { 1221 segments[name] = segment 1222 } 1223 1224 return segments 1225} 1226 1227// inmemCodec is used to do an RPC call without going over a network 1228type inmemCodec struct { 1229 method string 1230 args interface{} 1231 reply interface{} 1232 err error 1233} 1234 1235func (i *inmemCodec) ReadRequestHeader(req *rpc.Request) error { 1236 req.ServiceMethod = i.method 1237 return nil 1238} 1239 1240func (i *inmemCodec) ReadRequestBody(args interface{}) error { 1241 sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.args))) 1242 dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(args))) 1243 dst.Set(sourceValue) 1244 return nil 1245} 1246 1247func (i *inmemCodec) WriteResponse(resp *rpc.Response, reply interface{}) error { 1248 if resp.Error != "" { 1249 i.err = errors.New(resp.Error) 1250 return nil 1251 } 1252 sourceValue := reflect.Indirect(reflect.Indirect(reflect.ValueOf(reply))) 1253 dst := reflect.Indirect(reflect.Indirect(reflect.ValueOf(i.reply))) 1254 dst.Set(sourceValue) 1255 return nil 1256} 1257 1258func (i *inmemCodec) Close() error { 1259 return nil 1260} 1261 1262// RPC is used to make a local RPC call 1263func (s *Server) RPC(method string, args interface{}, reply interface{}) error { 1264 codec := &inmemCodec{ 1265 method: method, 1266 args: args, 1267 reply: reply, 1268 } 1269 1270 // Enforce the RPC limit. 1271 // 1272 // "client" metric path because the internal client API is calling to the 1273 // internal server API. It's odd that the same request directed to a server is 1274 // recorded differently. On the other hand this possibly masks the different 1275 // between regular client requests that traverse the network and these which 1276 // don't (unless forwarded). This still seems most sane. 1277 metrics.IncrCounter([]string{"client", "rpc"}, 1) 1278 if !s.rpcLimiter.Load().(*rate.Limiter).Allow() { 1279 metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) 1280 return structs.ErrRPCRateExceeded 1281 } 1282 if err := s.rpcServer.ServeRequest(codec); err != nil { 1283 return err 1284 } 1285 return codec.err 1286} 1287 1288// SnapshotRPC dispatches the given snapshot request, reading from the streaming 1289// input and writing to the streaming output depending on the operation. 1290func (s *Server) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, 1291 replyFn structs.SnapshotReplyFn) error { 1292 1293 // Enforce the RPC limit. 1294 // 1295 // "client" metric path because the internal client API is calling to the 1296 // internal server API. It's odd that the same request directed to a server is 1297 // recorded differently. On the other hand this possibly masks the different 1298 // between regular client requests that traverse the network and these which 1299 // don't (unless forwarded). This still seems most sane. 1300 metrics.IncrCounter([]string{"client", "rpc"}, 1) 1301 if !s.rpcLimiter.Load().(*rate.Limiter).Allow() { 1302 metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) 1303 return structs.ErrRPCRateExceeded 1304 } 1305 1306 // Perform the operation. 1307 var reply structs.SnapshotResponse 1308 snap, err := s.dispatchSnapshotRequest(args, in, &reply) 1309 if err != nil { 1310 return err 1311 } 1312 defer func() { 1313 if err := snap.Close(); err != nil { 1314 s.logger.Error("Failed to close snapshot", "error", err) 1315 } 1316 }() 1317 1318 // Let the caller peek at the reply. 1319 if replyFn != nil { 1320 if err := replyFn(&reply); err != nil { 1321 return nil 1322 } 1323 } 1324 1325 // Stream the snapshot. 1326 if out != nil { 1327 if _, err := io.Copy(out, snap); err != nil { 1328 return fmt.Errorf("failed to stream snapshot: %v", err) 1329 } 1330 } 1331 return nil 1332} 1333 1334// RegisterEndpoint is used to substitute an endpoint for testing. 1335func (s *Server) RegisterEndpoint(name string, handler interface{}) error { 1336 s.logger.Warn("endpoint injected; this should only be used for testing") 1337 return s.rpcServer.RegisterName(name, handler) 1338} 1339 1340// Stats is used to return statistics for debugging and insight 1341// for various sub-systems 1342func (s *Server) Stats() map[string]map[string]string { 1343 toString := func(v uint64) string { 1344 return strconv.FormatUint(v, 10) 1345 } 1346 numKnownDCs := len(s.router.GetDatacenters()) 1347 stats := map[string]map[string]string{ 1348 "consul": map[string]string{ 1349 "server": "true", 1350 "leader": fmt.Sprintf("%v", s.IsLeader()), 1351 "leader_addr": string(s.raft.Leader()), 1352 "bootstrap": fmt.Sprintf("%v", s.config.Bootstrap), 1353 "known_datacenters": toString(uint64(numKnownDCs)), 1354 }, 1355 "raft": s.raft.Stats(), 1356 "serf_lan": s.serfLAN.Stats(), 1357 "runtime": runtimeStats(), 1358 } 1359 1360 if s.ACLsEnabled() { 1361 if s.UseLegacyACLs() { 1362 stats["consul"]["acl"] = "legacy" 1363 } else { 1364 stats["consul"]["acl"] = "enabled" 1365 } 1366 } else { 1367 stats["consul"]["acl"] = "disabled" 1368 } 1369 1370 if s.serfWAN != nil { 1371 stats["serf_wan"] = s.serfWAN.Stats() 1372 } 1373 1374 for outerKey, outerValue := range s.enterpriseStats() { 1375 if _, ok := stats[outerKey]; ok { 1376 for innerKey, innerValue := range outerValue { 1377 stats[outerKey][innerKey] = innerValue 1378 } 1379 } else { 1380 stats[outerKey] = outerValue 1381 } 1382 } 1383 1384 return stats 1385} 1386 1387// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool. 1388func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { 1389 lan, err := s.serfLAN.GetCoordinate() 1390 if err != nil { 1391 return nil, err 1392 } 1393 1394 cs := lib.CoordinateSet{"": lan} 1395 for name, segment := range s.segmentLAN { 1396 c, err := segment.GetCoordinate() 1397 if err != nil { 1398 return nil, err 1399 } 1400 cs[name] = c 1401 } 1402 return cs, nil 1403} 1404 1405// ReloadConfig is used to have the Server do an online reload of 1406// relevant configuration information 1407func (s *Server) ReloadConfig(config *Config) error { 1408 s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) 1409 s.rpcConnLimiter.SetConfig(connlimit.Config{ 1410 MaxConnsPerClientIP: config.RPCMaxConnsPerClient, 1411 }) 1412 1413 if s.IsLeader() { 1414 // only bootstrap the config entries if we are the leader 1415 // this will error if we lose leadership while bootstrapping here. 1416 return s.bootstrapConfigEntries(config.ConfigEntryBootstrap) 1417 } 1418 1419 return nil 1420} 1421 1422// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write 1423func (s *Server) setConsistentReadReady() { 1424 atomic.StoreInt32(&s.readyForConsistentReads, 1) 1425} 1426 1427// Atomically reset readiness state flag on leadership revoke 1428func (s *Server) resetConsistentReadReady() { 1429 atomic.StoreInt32(&s.readyForConsistentReads, 0) 1430} 1431 1432// Returns true if this server is ready to serve consistent reads 1433func (s *Server) isReadyForConsistentReads() bool { 1434 return atomic.LoadInt32(&s.readyForConsistentReads) == 1 1435} 1436 1437func (s *Server) intentionReplicationEnabled() bool { 1438 return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter 1439} 1440 1441// CreateACLToken will create an ACL token from the given template 1442func (s *Server) CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) { 1443 // we have to require local tokens or else it would require having these servers use a token with acl:write to make a 1444 // token create RPC to the servers in the primary DC. 1445 if !s.LocalTokensEnabled() { 1446 return nil, fmt.Errorf("Agent Auto Configuration requires local token usage to be enabled in this datacenter: %s", s.config.Datacenter) 1447 } 1448 1449 newToken := *template 1450 1451 // generate the accessor id 1452 if newToken.AccessorID == "" { 1453 accessor, err := lib.GenerateUUID(s.checkTokenUUID) 1454 if err != nil { 1455 return nil, err 1456 } 1457 1458 newToken.AccessorID = accessor 1459 } 1460 1461 // generate the secret id 1462 if newToken.SecretID == "" { 1463 secret, err := lib.GenerateUUID(s.checkTokenUUID) 1464 if err != nil { 1465 return nil, err 1466 } 1467 1468 newToken.SecretID = secret 1469 } 1470 1471 newToken.CreateTime = time.Now() 1472 1473 req := structs.ACLTokenBatchSetRequest{ 1474 Tokens: structs.ACLTokens{&newToken}, 1475 CAS: false, 1476 } 1477 1478 // perform the request to mint the new token 1479 if _, err := s.raftApplyMsgpack(structs.ACLTokenSetRequestType, &req); err != nil { 1480 return nil, err 1481 } 1482 1483 // return the full token definition from the FSM 1484 _, token, err := s.fsm.State().ACLTokenGetByAccessor(nil, newToken.AccessorID, &newToken.EnterpriseMeta) 1485 return token, err 1486} 1487 1488// DatacenterJoinAddresses will return all the strings suitable for usage in 1489// retry join operations to connect to the the LAN or LAN segment gossip pool. 1490func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { 1491 members, err := s.LANSegmentMembers(segment) 1492 if err != nil { 1493 return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) 1494 } 1495 1496 var joinAddrs []string 1497 for _, m := range members { 1498 if ok, _ := metadata.IsConsulServer(m); ok { 1499 serfAddr := net.TCPAddr{IP: m.Addr, Port: int(m.Port)} 1500 joinAddrs = append(joinAddrs, serfAddr.String()) 1501 } 1502 } 1503 1504 return joinAddrs, nil 1505} 1506 1507// peersInfoContent is used to help operators understand what happened to the 1508// peers.json file. This is written to a file called peers.info in the same 1509// location. 1510const peersInfoContent = ` 1511As of Consul 0.7.0, the peers.json file is only used for recovery 1512after an outage. The format of this file depends on what the server has 1513configured for its Raft protocol version. Please see the agent configuration 1514page at https://www.consul.io/docs/agent/options.html#_raft_protocol for more 1515details about this parameter. 1516 1517For Raft protocol version 2 and earlier, this should be formatted as a JSON 1518array containing the address and port of each Consul server in the cluster, like 1519this: 1520 1521[ 1522 "10.1.0.1:8300", 1523 "10.1.0.2:8300", 1524 "10.1.0.3:8300" 1525] 1526 1527For Raft protocol version 3 and later, this should be formatted as a JSON 1528array containing the node ID, address:port, and suffrage information of each 1529Consul server in the cluster, like this: 1530 1531[ 1532 { 1533 "id": "adf4238a-882b-9ddc-4a9d-5b6758e4159e", 1534 "address": "10.1.0.1:8300", 1535 "non_voter": false 1536 }, 1537 { 1538 "id": "8b6dda82-3103-11e7-93ae-92361f002671", 1539 "address": "10.1.0.2:8300", 1540 "non_voter": false 1541 }, 1542 { 1543 "id": "97e17742-3103-11e7-93ae-92361f002671", 1544 "address": "10.1.0.3:8300", 1545 "non_voter": false 1546 } 1547] 1548 1549The "id" field is the node ID of the server. This can be found in the logs when 1550the server starts up, or in the "node-id" file inside the server's data 1551directory. 1552 1553The "address" field is the address and port of the server. 1554 1555The "non_voter" field controls whether the server is a non-voter, which is used 1556in some advanced Autopilot configurations, please see 1557https://www.consul.io/docs/guides/autopilot.html for more information. If 1558"non_voter" is omitted it will default to false, which is typical for most 1559clusters. 1560 1561Under normal operation, the peers.json file will not be present. 1562 1563When Consul starts for the first time, it will create this peers.info file and 1564delete any existing peers.json file so that recovery doesn't occur on the first 1565startup. 1566 1567Once this peers.info file is present, any peers.json file will be ingested at 1568startup, and will set the Raft peer configuration manually to recover from an 1569outage. It's crucial that all servers in the cluster are shut down before 1570creating the peers.json file, and that all servers receive the same 1571configuration. Once the peers.json file is successfully ingested and applied, it 1572will be deleted. 1573 1574Please see https://www.consul.io/docs/guides/outage.html for more information. 1575` 1576