1package consul 2 3import ( 4 "context" 5 "fmt" 6 "net" 7 "reflect" 8 "strconv" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "github.com/armon/go-metrics" 14 "github.com/armon/go-metrics/prometheus" 15 "github.com/hashicorp/go-hclog" 16 "github.com/hashicorp/go-memdb" 17 "github.com/hashicorp/go-uuid" 18 "github.com/hashicorp/go-version" 19 "github.com/hashicorp/raft" 20 "github.com/hashicorp/serf/serf" 21 "golang.org/x/time/rate" 22 23 "github.com/hashicorp/consul/acl" 24 "github.com/hashicorp/consul/agent/metadata" 25 "github.com/hashicorp/consul/agent/structs" 26 "github.com/hashicorp/consul/api" 27 "github.com/hashicorp/consul/lib" 28 "github.com/hashicorp/consul/logging" 29 "github.com/hashicorp/consul/types" 30) 31 32var LeaderSummaries = []prometheus.SummaryDefinition{ 33 { 34 Name: []string{"leader", "barrier"}, 35 Help: "Measures the time spent waiting for the raft barrier upon gaining leadership.", 36 }, 37 { 38 Name: []string{"leader", "reconcileMember"}, 39 Help: "Measures the time spent updating the raft store for a single serf member's information.", 40 }, 41 { 42 Name: []string{"leader", "reapTombstones"}, 43 Help: "Measures the time spent clearing tombstones.", 44 }, 45} 46 47const ( 48 newLeaderEvent = "consul:new-leader" 49 barrierWriteTimeout = 2 * time.Minute 50) 51 52var ( 53 // caRootPruneInterval is how often we check for stale CARoots to remove. 54 caRootPruneInterval = time.Hour 55 56 // minCentralizedConfigVersion is the minimum Consul version in which centralized 57 // config is supported 58 minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0")) 59) 60 61// monitorLeadership is used to monitor if we acquire or lose our role 62// as the leader in the Raft cluster. There is some work the leader is 63// expected to do, so we must react to changes 64func (s *Server) monitorLeadership() { 65 // We use the notify channel we configured Raft with, NOT Raft's 66 // leaderCh, which is only notified best-effort. Doing this ensures 67 // that we get all notifications in order, which is required for 68 // cleanup and to ensure we never run multiple leader loops. 69 raftNotifyCh := s.raftNotifyCh 70 71 aclModeCheckWait := aclModeCheckMinInterval 72 var aclUpgradeCh <-chan time.Time 73 if s.config.ACLsEnabled { 74 aclUpgradeCh = time.After(aclModeCheckWait) 75 } 76 var weAreLeaderCh chan struct{} 77 var leaderLoop sync.WaitGroup 78 for { 79 select { 80 case isLeader := <-raftNotifyCh: 81 switch { 82 case isLeader: 83 if weAreLeaderCh != nil { 84 s.logger.Error("attempted to start the leader loop while running") 85 continue 86 } 87 88 weAreLeaderCh = make(chan struct{}) 89 leaderLoop.Add(1) 90 go func(ch chan struct{}) { 91 defer leaderLoop.Done() 92 s.leaderLoop(ch) 93 }(weAreLeaderCh) 94 s.logger.Info("cluster leadership acquired") 95 96 default: 97 if weAreLeaderCh == nil { 98 s.logger.Error("attempted to stop the leader loop while not running") 99 continue 100 } 101 102 s.logger.Debug("shutting down leader loop") 103 close(weAreLeaderCh) 104 leaderLoop.Wait() 105 weAreLeaderCh = nil 106 s.logger.Info("cluster leadership lost") 107 } 108 case <-aclUpgradeCh: 109 if atomic.LoadInt32(&s.useNewACLs) == 0 { 110 aclModeCheckWait = aclModeCheckWait * 2 111 if aclModeCheckWait > aclModeCheckMaxInterval { 112 aclModeCheckWait = aclModeCheckMaxInterval 113 } 114 aclUpgradeCh = time.After(aclModeCheckWait) 115 116 if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade { 117 if weAreLeaderCh != nil { 118 if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil { 119 s.logger.Error("error transitioning to using new ACLs", "error", err) 120 continue 121 } 122 } 123 124 s.logger.Debug("transitioning out of legacy ACL mode") 125 atomic.StoreInt32(&s.useNewACLs, 1) 126 s.updateACLAdvertisement() 127 128 // setting this to nil ensures that we will never hit this case again 129 aclUpgradeCh = nil 130 } 131 } else { 132 // establishLeadership probably transitioned us 133 aclUpgradeCh = nil 134 } 135 case <-s.shutdownCh: 136 return 137 } 138 } 139} 140 141func (s *Server) leadershipTransfer() error { 142 retryCount := 3 143 for i := 0; i < retryCount; i++ { 144 future := s.raft.LeadershipTransfer() 145 if err := future.Error(); err != nil { 146 s.logger.Error("failed to transfer leadership attempt, will retry", 147 "attempt", i, 148 "retry_limit", retryCount, 149 "error", err, 150 ) 151 } else { 152 s.logger.Info("successfully transferred leadership", 153 "attempt", i, 154 "retry_limit", retryCount, 155 ) 156 return nil 157 } 158 159 } 160 return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount) 161} 162 163// leaderLoop runs as long as we are the leader to run various 164// maintenance activities 165func (s *Server) leaderLoop(stopCh chan struct{}) { 166 stopCtx := &lib.StopChannelContext{StopCh: stopCh} 167 168 // Fire a user event indicating a new leader 169 payload := []byte(s.config.NodeName) 170 for name, segment := range s.LANSegments() { 171 if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil { 172 s.logger.Warn("failed to broadcast new leader event on segment", 173 "segment", name, 174 "error", err, 175 ) 176 } 177 } 178 179 // Reconcile channel is only used once initial reconcile 180 // has succeeded 181 var reconcileCh chan serf.Member 182 establishedLeader := false 183 184RECONCILE: 185 // Setup a reconciliation timer 186 reconcileCh = nil 187 interval := time.After(s.config.ReconcileInterval) 188 189 // Apply a raft barrier to ensure our FSM is caught up 190 start := time.Now() 191 barrier := s.raft.Barrier(barrierWriteTimeout) 192 if err := barrier.Error(); err != nil { 193 s.logger.Error("failed to wait for barrier", "error", err) 194 goto WAIT 195 } 196 metrics.MeasureSince([]string{"leader", "barrier"}, start) 197 198 // Check if we need to handle initial leadership actions 199 if !establishedLeader { 200 if err := s.establishLeadership(stopCtx); err != nil { 201 s.logger.Error("failed to establish leadership", "error", err) 202 // Immediately revoke leadership since we didn't successfully 203 // establish leadership. 204 s.revokeLeadership() 205 206 // attempt to transfer leadership. If successful it is 207 // time to leave the leaderLoop since this node is no 208 // longer the leader. If leadershipTransfer() fails, we 209 // will try to acquire it again after 210 // 5 seconds. 211 if err := s.leadershipTransfer(); err != nil { 212 s.logger.Error("failed to transfer leadership", "error", err) 213 interval = time.After(5 * time.Second) 214 goto WAIT 215 } 216 return 217 } 218 establishedLeader = true 219 defer s.revokeLeadership() 220 } 221 222 // Reconcile any missing data 223 if err := s.reconcile(); err != nil { 224 s.logger.Error("failed to reconcile", "error", err) 225 goto WAIT 226 } 227 228 // Initial reconcile worked, now we can process the channel 229 // updates 230 reconcileCh = s.reconcileCh 231 232WAIT: 233 // Poll the stop channel to give it priority so we don't waste time 234 // trying to perform the other operations if we have been asked to shut 235 // down. 236 select { 237 case <-stopCh: 238 return 239 default: 240 } 241 242 // Periodically reconcile as long as we are the leader, 243 // or when Serf events arrive 244 for { 245 select { 246 case <-stopCh: 247 return 248 case <-s.shutdownCh: 249 return 250 case <-interval: 251 goto RECONCILE 252 case member := <-reconcileCh: 253 s.reconcileMember(member) 254 case index := <-s.tombstoneGC.ExpireCh(): 255 go s.reapTombstones(index) 256 case errCh := <-s.reassertLeaderCh: 257 // we can get into this state when the initial 258 // establishLeadership has failed as well as the follow 259 // up leadershipTransfer. Afterwards we will be waiting 260 // for the interval to trigger a reconciliation and can 261 // potentially end up here. There is no point to 262 // reassert because this agent was never leader in the 263 // first place. 264 if !establishedLeader { 265 errCh <- fmt.Errorf("leadership has not been established") 266 continue 267 } 268 269 // continue to reassert only if we previously were the 270 // leader, which means revokeLeadership followed by an 271 // establishLeadership(). 272 s.revokeLeadership() 273 err := s.establishLeadership(stopCtx) 274 errCh <- err 275 276 // in case establishLeadership failed, we will try to 277 // transfer leadership. At this time raft thinks we are 278 // the leader, but consul disagrees. 279 if err != nil { 280 if err := s.leadershipTransfer(); err != nil { 281 // establishedLeader was true before, 282 // but it no longer is since it revoked 283 // leadership and Leadership transfer 284 // also failed. Which is why it stays 285 // in the leaderLoop, but now 286 // establishedLeader needs to be set to 287 // false. 288 establishedLeader = false 289 interval = time.After(5 * time.Second) 290 goto WAIT 291 } 292 293 // leadershipTransfer was successful and it is 294 // time to leave the leaderLoop. 295 return 296 } 297 298 } 299 } 300} 301 302// establishLeadership is invoked once we become leader and are able 303// to invoke an initial barrier. The barrier is used to ensure any 304// previously inflight transactions have been committed and that our 305// state is up-to-date. 306func (s *Server) establishLeadership(ctx context.Context) error { 307 start := time.Now() 308 // check for the upgrade here - this helps us transition to new ACLs much 309 // quicker if this is a new cluster or this is a test agent 310 if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade { 311 if err := s.initializeACLs(ctx, true); err != nil { 312 return err 313 } 314 atomic.StoreInt32(&s.useNewACLs, 1) 315 s.updateACLAdvertisement() 316 } else if err := s.initializeACLs(ctx, false); err != nil { 317 return err 318 } 319 320 // Hint the tombstone expiration timer. When we freshly establish leadership 321 // we become the authoritative timer, and so we need to start the clock 322 // on any pending GC events. 323 s.tombstoneGC.SetEnabled(true) 324 lastIndex := s.raft.LastIndex() 325 s.tombstoneGC.Hint(lastIndex) 326 327 // Setup the session timers. This is done both when starting up or when 328 // a leader fail over happens. Since the timers are maintained by the leader 329 // node along, effectively this means all the timers are renewed at the 330 // time of failover. The TTL contract is that the session will not be expired 331 // before the TTL, so expiring it later is allowable. 332 // 333 // This MUST be done after the initial barrier to ensure the latest Sessions 334 // are available to be initialized. Otherwise initialization may use stale 335 // data. 336 if err := s.initializeSessionTimers(); err != nil { 337 return err 338 } 339 340 if err := s.establishEnterpriseLeadership(ctx); err != nil { 341 return err 342 } 343 344 s.getOrCreateAutopilotConfig() 345 s.autopilot.Start(ctx) 346 347 s.startConfigReplication(ctx) 348 349 s.startFederationStateReplication(ctx) 350 351 s.startFederationStateAntiEntropy(ctx) 352 353 if err := s.startConnectLeader(ctx); err != nil { 354 return err 355 } 356 357 // Attempt to bootstrap config entries. We wait until after starting the 358 // Connect leader tasks so we hopefully have transitioned to supporting 359 // service-intentions. 360 if err := s.bootstrapConfigEntries(s.config.ConfigEntryBootstrap); err != nil { 361 return err 362 } 363 364 s.setConsistentReadReady() 365 366 s.logger.Debug("successfully established leadership", "duration", time.Since(start)) 367 return nil 368} 369 370// revokeLeadership is invoked once we step down as leader. 371// This is used to cleanup any state that may be specific to a leader. 372func (s *Server) revokeLeadership() { 373 // Disable the tombstone GC, since it is only useful as a leader 374 s.tombstoneGC.SetEnabled(false) 375 376 // Clear the session timers on either shutdown or step down, since we 377 // are no longer responsible for session expirations. 378 s.clearAllSessionTimers() 379 380 s.revokeEnterpriseLeadership() 381 382 s.stopFederationStateAntiEntropy() 383 384 s.stopFederationStateReplication() 385 386 s.stopConfigReplication() 387 388 s.stopConnectLeader() 389 390 s.caManager.setCAProvider(nil, nil) 391 s.caManager.setState(caStateUninitialized, false) 392 393 s.stopACLTokenReaping() 394 395 s.stopACLUpgrade() 396 397 s.resetConsistentReadReady() 398 399 // Stop returns a chan and we want to block until it is closed 400 // which indicates that autopilot is actually stopped. 401 <-s.autopilot.Stop() 402} 403 404// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed 405func (s *Server) initializeLegacyACL() error { 406 if !s.config.ACLsEnabled { 407 return nil 408 } 409 410 authDC := s.config.ACLDatacenter 411 412 // Create anonymous token if missing. 413 state := s.fsm.State() 414 _, token, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil) 415 if err != nil { 416 return fmt.Errorf("failed to get anonymous token: %v", err) 417 } 418 // Ignoring expiration times to avoid an insertion collision. 419 if token == nil { 420 req := structs.ACLRequest{ 421 Datacenter: authDC, 422 Op: structs.ACLSet, 423 ACL: structs.ACL{ 424 ID: anonymousToken, 425 Name: "Anonymous Token", 426 Type: structs.ACLTokenTypeClient, 427 }, 428 } 429 _, err := s.raftApply(structs.ACLRequestType, &req) 430 if err != nil { 431 return fmt.Errorf("failed to create anonymous token: %v", err) 432 } 433 s.logger.Info("Created the anonymous token") 434 } 435 436 // Check for configured master token. 437 if master := s.config.ACLMasterToken; len(master) > 0 { 438 _, token, err = state.ACLTokenGetBySecret(nil, master, nil) 439 if err != nil { 440 return fmt.Errorf("failed to get master token: %v", err) 441 } 442 // Ignoring expiration times to avoid an insertion collision. 443 if token == nil { 444 req := structs.ACLRequest{ 445 Datacenter: authDC, 446 Op: structs.ACLSet, 447 ACL: structs.ACL{ 448 ID: master, 449 Name: "Master Token", 450 Type: structs.ACLTokenTypeManagement, 451 }, 452 } 453 _, err := s.raftApply(structs.ACLRequestType, &req) 454 if err != nil { 455 return fmt.Errorf("failed to create master token: %v", err) 456 } 457 s.logger.Info("Created ACL master token from configuration") 458 } 459 } 460 461 // Check to see if we need to initialize the ACL bootstrap info. This 462 // needs a Consul version check since it introduces a new Raft operation 463 // that'll produce an error on older servers, and it also makes a piece 464 // of state in the state store that will cause problems with older 465 // servers consuming snapshots, so we have to wait to create it. 466 var minVersion = version.Must(version.NewVersion("0.9.1")) 467 if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVersion); ok { 468 canBootstrap, _, err := state.CanBootstrapACLToken() 469 if err != nil { 470 return fmt.Errorf("failed looking for ACL bootstrap info: %v", err) 471 } 472 if canBootstrap { 473 req := structs.ACLRequest{ 474 Datacenter: authDC, 475 Op: structs.ACLBootstrapInit, 476 } 477 resp, err := s.raftApply(structs.ACLRequestType, &req) 478 if err != nil { 479 return fmt.Errorf("failed to initialize ACL bootstrap: %v", err) 480 } 481 switch v := resp.(type) { 482 case bool: 483 if v { 484 s.logger.Info("ACL bootstrap enabled") 485 } else { 486 s.logger.Info("ACL bootstrap disabled, existing management tokens found") 487 } 488 489 default: 490 return fmt.Errorf("unexpected response trying to initialize ACL bootstrap: %T", v) 491 } 492 } 493 } else { 494 s.logger.Warn("Can't initialize ACL bootstrap until all servers are >= " + minVersion.String()) 495 } 496 497 return nil 498} 499 500// initializeACLs is used to setup the ACLs if we are the leader 501// and need to do this. 502func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error { 503 if !s.config.ACLsEnabled { 504 return nil 505 } 506 507 // Purge the cache, since it could've changed while we were not the 508 // leader. 509 s.acls.cache.Purge() 510 511 // Purge the auth method validators since they could've changed while we 512 // were not leader. 513 s.aclAuthMethodValidators.Purge() 514 515 // Remove any token affected by CVE-2019-8336 516 if !s.InACLDatacenter() { 517 _, token, err := s.fsm.State().ACLTokenGetBySecret(nil, redactedToken, nil) 518 if err == nil && token != nil { 519 req := structs.ACLTokenBatchDeleteRequest{ 520 TokenIDs: []string{token.AccessorID}, 521 } 522 523 _, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req) 524 if err != nil { 525 return fmt.Errorf("failed to remove token with a redacted secret: %v", err) 526 } 527 } 528 } 529 530 if s.InACLDatacenter() { 531 if s.UseLegacyACLs() && !upgrade { 532 s.logger.Info("initializing legacy acls") 533 return s.initializeLegacyACL() 534 } 535 536 s.logger.Info("initializing acls") 537 538 // Create/Upgrade the builtin global-management policy 539 _, policy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMeta()) 540 if err != nil { 541 return fmt.Errorf("failed to get the builtin global-management policy") 542 } 543 if policy == nil || policy.Rules != structs.ACLPolicyGlobalManagement { 544 newPolicy := structs.ACLPolicy{ 545 ID: structs.ACLPolicyGlobalManagementID, 546 Name: "global-management", 547 Description: "Builtin Policy that grants unlimited access", 548 Rules: structs.ACLPolicyGlobalManagement, 549 Syntax: acl.SyntaxCurrent, 550 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 551 } 552 if policy != nil { 553 newPolicy.Name = policy.Name 554 newPolicy.Description = policy.Description 555 } 556 557 newPolicy.SetHash(true) 558 559 req := structs.ACLPolicyBatchSetRequest{ 560 Policies: structs.ACLPolicies{&newPolicy}, 561 } 562 _, err := s.raftApply(structs.ACLPolicySetRequestType, &req) 563 if err != nil { 564 return fmt.Errorf("failed to create global-management policy: %v", err) 565 } 566 s.logger.Info("Created ACL 'global-management' policy") 567 } 568 569 // Check for configured master token. 570 if master := s.config.ACLMasterToken; len(master) > 0 { 571 state := s.fsm.State() 572 if _, err := uuid.ParseUUID(master); err != nil { 573 s.logger.Warn("Configuring a non-UUID master token is deprecated") 574 } 575 576 _, token, err := state.ACLTokenGetBySecret(nil, master, nil) 577 if err != nil { 578 return fmt.Errorf("failed to get master token: %v", err) 579 } 580 // Ignoring expiration times to avoid an insertion collision. 581 if token == nil { 582 accessor, err := lib.GenerateUUID(s.checkTokenUUID) 583 if err != nil { 584 return fmt.Errorf("failed to generate the accessor ID for the master token: %v", err) 585 } 586 587 token := structs.ACLToken{ 588 AccessorID: accessor, 589 SecretID: master, 590 Description: "Master Token", 591 Policies: []structs.ACLTokenPolicyLink{ 592 { 593 ID: structs.ACLPolicyGlobalManagementID, 594 }, 595 }, 596 CreateTime: time.Now(), 597 Local: false, 598 599 // DEPRECATED (ACL-Legacy-Compat) - only needed for compatibility 600 Type: structs.ACLTokenTypeManagement, 601 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 602 } 603 604 token.SetHash(true) 605 606 done := false 607 if canBootstrap, _, err := state.CanBootstrapACLToken(); err == nil && canBootstrap { 608 req := structs.ACLTokenBootstrapRequest{ 609 Token: token, 610 ResetIndex: 0, 611 } 612 if _, err := s.raftApply(structs.ACLBootstrapRequestType, &req); err == nil { 613 s.logger.Info("Bootstrapped ACL master token from configuration") 614 done = true 615 } else { 616 if err.Error() != structs.ACLBootstrapNotAllowedErr.Error() && 617 err.Error() != structs.ACLBootstrapInvalidResetIndexErr.Error() { 618 return fmt.Errorf("failed to bootstrap master token: %v", err) 619 } 620 } 621 } 622 623 if !done { 624 // either we didn't attempt to or setting the token with a bootstrap request failed. 625 req := structs.ACLTokenBatchSetRequest{ 626 Tokens: structs.ACLTokens{&token}, 627 CAS: false, 628 } 629 if _, err := s.raftApply(structs.ACLTokenSetRequestType, &req); err != nil { 630 return fmt.Errorf("failed to create master token: %v", err) 631 } 632 633 s.logger.Info("Created ACL master token from configuration") 634 } 635 } 636 } 637 638 state := s.fsm.State() 639 _, token, err := state.ACLTokenGetBySecret(nil, structs.ACLTokenAnonymousID, nil) 640 if err != nil { 641 return fmt.Errorf("failed to get anonymous token: %v", err) 642 } 643 // Ignoring expiration times to avoid an insertion collision. 644 if token == nil { 645 // DEPRECATED (ACL-Legacy-Compat) - Don't need to query for previous "anonymous" token 646 // check for legacy token that needs an upgrade 647 _, legacyToken, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil) 648 if err != nil { 649 return fmt.Errorf("failed to get anonymous token: %v", err) 650 } 651 // Ignoring expiration times to avoid an insertion collision. 652 653 // the token upgrade routine will take care of upgrading the token if a legacy version exists 654 if legacyToken == nil { 655 token = &structs.ACLToken{ 656 AccessorID: structs.ACLTokenAnonymousID, 657 SecretID: anonymousToken, 658 Description: "Anonymous Token", 659 CreateTime: time.Now(), 660 EnterpriseMeta: *structs.DefaultEnterpriseMeta(), 661 } 662 token.SetHash(true) 663 664 req := structs.ACLTokenBatchSetRequest{ 665 Tokens: structs.ACLTokens{token}, 666 CAS: false, 667 } 668 _, err := s.raftApply(structs.ACLTokenSetRequestType, &req) 669 if err != nil { 670 return fmt.Errorf("failed to create anonymous token: %v", err) 671 } 672 s.logger.Info("Created ACL anonymous token from configuration") 673 } 674 } 675 // launch the upgrade go routine to generate accessors for everything 676 s.startACLUpgrade(ctx) 677 } else { 678 if s.UseLegacyACLs() && !upgrade { 679 if s.IsACLReplicationEnabled() { 680 s.startLegacyACLReplication(ctx) 681 } 682 // return early as we don't want to start new ACL replication 683 // or ACL token reaping as these are new ACL features. 684 return nil 685 } 686 687 if upgrade { 688 s.stopACLReplication() 689 } 690 691 // ACL replication is now mandatory 692 s.startACLReplication(ctx) 693 } 694 695 s.startACLTokenReaping(ctx) 696 697 return nil 698} 699 700// This function is only intended to be run as a managed go routine, it will block until 701// the context passed in indicates that it should exit. 702func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error { 703 limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit)) 704 for { 705 if err := limiter.Wait(ctx); err != nil { 706 return err 707 } 708 709 // actually run the upgrade here 710 state := s.fsm.State() 711 tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize) 712 if err != nil { 713 s.logger.Warn("encountered an error while searching for tokens without accessor ids", "error", err) 714 } 715 // No need to check expiration time here, as that only exists for v2 tokens. 716 717 if len(tokens) == 0 { 718 ws := memdb.NewWatchSet() 719 ws.Add(state.AbandonCh()) 720 ws.Add(waitCh) 721 ws.Add(ctx.Done()) 722 723 // wait for more tokens to need upgrading or the aclUpgradeCh to be closed 724 ws.Watch(nil) 725 continue 726 } 727 728 var newTokens structs.ACLTokens 729 for _, token := range tokens { 730 // This should be entirely unnecessary but is just a small safeguard against changing accessor IDs 731 if token.AccessorID != "" { 732 continue 733 } 734 735 newToken := *token 736 if token.SecretID == anonymousToken { 737 newToken.AccessorID = structs.ACLTokenAnonymousID 738 } else { 739 accessor, err := lib.GenerateUUID(s.checkTokenUUID) 740 if err != nil { 741 s.logger.Warn("failed to generate accessor during token auto-upgrade", "error", err) 742 continue 743 } 744 newToken.AccessorID = accessor 745 } 746 747 // Assign the global-management policy to legacy management tokens 748 if len(newToken.Policies) == 0 && 749 len(newToken.ServiceIdentities) == 0 && 750 len(newToken.NodeIdentities) == 0 && 751 len(newToken.Roles) == 0 && 752 newToken.Type == structs.ACLTokenTypeManagement { 753 newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID}) 754 } 755 756 // need to copy these as we are going to do a CAS operation. 757 newToken.CreateIndex = token.CreateIndex 758 newToken.ModifyIndex = token.ModifyIndex 759 760 newToken.SetHash(true) 761 762 newTokens = append(newTokens, &newToken) 763 } 764 765 req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true} 766 767 _, err = s.raftApply(structs.ACLTokenSetRequestType, req) 768 if err != nil { 769 s.logger.Error("failed to apply acl token upgrade batch", "error", err) 770 } 771 } 772} 773 774func (s *Server) startACLUpgrade(ctx context.Context) { 775 if s.config.PrimaryDatacenter != s.config.Datacenter { 776 // token upgrades should only run in the primary 777 return 778 } 779 780 s.leaderRoutineManager.Start(ctx, aclUpgradeRoutineName, s.legacyACLTokenUpgrade) 781} 782 783func (s *Server) stopACLUpgrade() { 784 s.leaderRoutineManager.Stop(aclUpgradeRoutineName) 785} 786 787// This function is only intended to be run as a managed go routine, it will block until 788// the context passed in indicates that it should exit. 789func (s *Server) runLegacyACLReplication(ctx context.Context) error { 790 var lastRemoteIndex uint64 791 legacyACLLogger := s.aclReplicationLogger(logging.Legacy) 792 limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst) 793 794 for { 795 if err := limiter.Wait(ctx); err != nil { 796 return err 797 } 798 799 if s.tokens.ReplicationToken() == "" { 800 continue 801 } 802 803 index, exit, err := s.replicateLegacyACLs(ctx, legacyACLLogger, lastRemoteIndex) 804 if exit { 805 return nil 806 } 807 808 if err != nil { 809 metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"}, 810 0, 811 ) 812 lastRemoteIndex = 0 813 s.updateACLReplicationStatusError() 814 legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err) 815 } else { 816 metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"}, 817 1, 818 ) 819 metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"}, 820 float32(index), 821 ) 822 lastRemoteIndex = index 823 s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index) 824 legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index) 825 } 826 } 827} 828 829func (s *Server) startLegacyACLReplication(ctx context.Context) { 830 if s.InACLDatacenter() { 831 return 832 } 833 834 // unlike some other leader routines this initializes some extra state 835 // and therefore we want to prevent re-initialization if things are already 836 // running 837 if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) { 838 return 839 } 840 841 s.initReplicationStatus() 842 843 s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication) 844 s.logger.Info("started legacy ACL replication") 845 s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy) 846} 847 848func (s *Server) startACLReplication(ctx context.Context) { 849 if s.InACLDatacenter() { 850 return 851 } 852 853 // unlike some other leader routines this initializes some extra state 854 // and therefore we want to prevent re-initialization if things are already 855 // running 856 if s.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName) { 857 return 858 } 859 860 s.initReplicationStatus() 861 s.leaderRoutineManager.Start(ctx, aclPolicyReplicationRoutineName, s.runACLPolicyReplicator) 862 s.leaderRoutineManager.Start(ctx, aclRoleReplicationRoutineName, s.runACLRoleReplicator) 863 864 if s.config.ACLTokenReplication { 865 s.leaderRoutineManager.Start(ctx, aclTokenReplicationRoutineName, s.runACLTokenReplicator) 866 s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens) 867 } else { 868 s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies) 869 } 870} 871 872type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error) 873 874// This function is only intended to be run as a managed go routine, it will block until 875// the context passed in indicates that it should exit. 876func (s *Server) runACLPolicyReplicator(ctx context.Context) error { 877 policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun()) 878 policyLogger.Info("started ACL Policy replication") 879 return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies") 880} 881 882// This function is only intended to be run as a managed go routine, it will block until 883// the context passed in indicates that it should exit. 884func (s *Server) runACLRoleReplicator(ctx context.Context) error { 885 roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun()) 886 roleLogger.Info("started ACL Role replication") 887 return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles") 888} 889 890// This function is only intended to be run as a managed go routine, it will block until 891// the context passed in indicates that it should exit. 892func (s *Server) runACLTokenReplicator(ctx context.Context) error { 893 tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun()) 894 tokenLogger.Info("started ACL Token replication") 895 return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens") 896} 897 898// This function is only intended to be run as a managed go routine, it will block until 899// the context passed in indicates that it should exit. 900func (s *Server) runACLReplicator( 901 ctx context.Context, 902 logger hclog.Logger, 903 replicationType structs.ACLReplicationType, 904 replicateFunc replicateFunc, 905 metricName string, 906) error { 907 var failedAttempts uint 908 limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst) 909 910 var lastRemoteIndex uint64 911 for { 912 if err := limiter.Wait(ctx); err != nil { 913 return err 914 } 915 916 if s.tokens.ReplicationToken() == "" { 917 continue 918 } 919 920 index, exit, err := replicateFunc(ctx, logger, lastRemoteIndex) 921 if exit { 922 return nil 923 } 924 925 if err != nil { 926 metrics.SetGauge([]string{"leader", "replication", metricName, "status"}, 927 0, 928 ) 929 lastRemoteIndex = 0 930 s.updateACLReplicationStatusError() 931 logger.Warn("ACL replication error (will retry if still leader)", 932 "error", err, 933 ) 934 if (1 << failedAttempts) < aclReplicationMaxRetryBackoff { 935 failedAttempts++ 936 } 937 938 select { 939 case <-ctx.Done(): 940 return nil 941 case <-time.After((1 << failedAttempts) * time.Second): 942 // do nothing 943 } 944 } else { 945 metrics.SetGauge([]string{"leader", "replication", metricName, "status"}, 946 1, 947 ) 948 metrics.SetGauge([]string{"leader", "replication", metricName, "index"}, 949 float32(index), 950 ) 951 lastRemoteIndex = index 952 s.updateACLReplicationStatusIndex(replicationType, index) 953 logger.Debug("ACL replication completed through remote index", 954 "index", index, 955 ) 956 failedAttempts = 0 957 } 958 } 959} 960 961func (s *Server) aclReplicationLogger(singularNoun string) hclog.Logger { 962 return s.loggers. 963 Named(logging.Replication). 964 Named(logging.ACL). 965 Named(singularNoun) 966} 967 968func (s *Server) stopACLReplication() { 969 // these will be no-ops when not started 970 s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName) 971 s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName) 972 s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName) 973 s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName) 974} 975 976func (s *Server) startConfigReplication(ctx context.Context) { 977 if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { 978 // replication shouldn't run in the primary DC 979 return 980 } 981 982 s.leaderRoutineManager.Start(ctx, configReplicationRoutineName, s.configReplicator.Run) 983} 984 985func (s *Server) stopConfigReplication() { 986 // will be a no-op when not started 987 s.leaderRoutineManager.Stop(configReplicationRoutineName) 988} 989 990func (s *Server) startFederationStateReplication(ctx context.Context) { 991 if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter { 992 // replication shouldn't run in the primary DC 993 return 994 } 995 996 if s.gatewayLocator != nil { 997 s.gatewayLocator.SetUseReplicationSignal(true) 998 s.gatewayLocator.SetLastFederationStateReplicationError(nil, false) 999 } 1000 1001 s.leaderRoutineManager.Start(ctx, federationStateReplicationRoutineName, s.federationStateReplicator.Run) 1002} 1003 1004func (s *Server) stopFederationStateReplication() { 1005 // will be a no-op when not started 1006 s.leaderRoutineManager.Stop(federationStateReplicationRoutineName) 1007 1008 if s.gatewayLocator != nil { 1009 s.gatewayLocator.SetUseReplicationSignal(false) 1010 s.gatewayLocator.SetLastFederationStateReplicationError(nil, false) 1011 } 1012} 1013 1014// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary 1015func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { 1016 logger := s.loggers.Named(logging.Autopilot) 1017 state := s.fsm.State() 1018 _, config, err := state.AutopilotConfig() 1019 if err != nil { 1020 logger.Error("failed to get config", "error", err) 1021 return nil 1022 } 1023 if config != nil { 1024 return config 1025 } 1026 1027 config = s.config.AutopilotConfig 1028 req := structs.AutopilotSetConfigRequest{Config: *config} 1029 if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { 1030 logger.Error("failed to initialize config", "error", err) 1031 return nil 1032 } 1033 1034 return config 1035} 1036 1037func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { 1038 if s.config.PrimaryDatacenter != "" && s.config.PrimaryDatacenter != s.config.Datacenter { 1039 // only bootstrap in the primary datacenter 1040 return nil 1041 } 1042 1043 if len(entries) < 1 { 1044 // nothing to initialize 1045 return nil 1046 } 1047 1048 if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minCentralizedConfigVersion); !ok { 1049 s.loggers. 1050 Named(logging.CentralConfig). 1051 Warn("config: can't initialize until all servers >=" + minCentralizedConfigVersion.String()) 1052 return nil 1053 } 1054 1055 state := s.fsm.State() 1056 1057 // Do some quick preflight checks to see if someone is doing something 1058 // that's not allowed at this time: 1059 // 1060 // - Trying to upgrade from an older pre-1.9.0 version of consul with 1061 // intentions AND are trying to bootstrap a service-intentions config entry 1062 // at the same time. 1063 // 1064 // - Trying to insert service-intentions config entries when connect is 1065 // disabled. 1066 1067 usingConfigEntries, err := s.fsm.State().AreIntentionsInConfigEntries() 1068 if err != nil { 1069 return fmt.Errorf("Failed to determine if we are migrating intentions yet: %v", err) 1070 } 1071 1072 if !usingConfigEntries || !s.config.ConnectEnabled { 1073 for _, entry := range entries { 1074 if entry.GetKind() == structs.ServiceIntentions { 1075 if !s.config.ConnectEnabled { 1076 return fmt.Errorf("Refusing to apply configuration entry %q / %q because Connect must be enabled to bootstrap intentions", 1077 entry.GetKind(), entry.GetName()) 1078 } 1079 if !usingConfigEntries { 1080 return fmt.Errorf("Refusing to apply configuration entry %q / %q because intentions are still being migrated to config entries", 1081 entry.GetKind(), entry.GetName()) 1082 } 1083 } 1084 } 1085 } 1086 1087 for _, entry := range entries { 1088 // avoid a round trip through Raft if we know the CAS is going to fail 1089 _, existing, err := state.ConfigEntry(nil, entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta()) 1090 if err != nil { 1091 return fmt.Errorf("Failed to determine whether the configuration for %q / %q already exists: %v", entry.GetKind(), entry.GetName(), err) 1092 } 1093 1094 if existing == nil { 1095 // ensure the ModifyIndex is set to 0 for the CAS request 1096 entry.GetRaftIndex().ModifyIndex = 0 1097 1098 req := structs.ConfigEntryRequest{ 1099 Op: structs.ConfigEntryUpsertCAS, 1100 Datacenter: s.config.Datacenter, 1101 Entry: entry, 1102 } 1103 1104 _, err := s.raftApply(structs.ConfigEntryRequestType, &req) 1105 if err != nil { 1106 return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err) 1107 } 1108 } 1109 } 1110 return nil 1111} 1112 1113// reconcileReaped is used to reconcile nodes that have failed and been reaped 1114// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. 1115// We generate a "reap" event to cause the node to be cleaned up. 1116func (s *Server) reconcileReaped(known map[string]struct{}) error { 1117 state := s.fsm.State() 1118 _, checks, err := state.ChecksInState(nil, api.HealthAny, structs.DefaultEnterpriseMeta()) 1119 if err != nil { 1120 return err 1121 } 1122 for _, check := range checks { 1123 // Ignore any non serf checks 1124 if check.CheckID != structs.SerfCheckID { 1125 continue 1126 } 1127 1128 // Check if this node is "known" by serf 1129 if _, ok := known[check.Node]; ok { 1130 continue 1131 } 1132 1133 // Get the node services, look for ConsulServiceID 1134 _, services, err := state.NodeServices(nil, check.Node, structs.DefaultEnterpriseMeta()) 1135 if err != nil { 1136 return err 1137 } 1138 serverPort := 0 1139 serverAddr := "" 1140 serverID := "" 1141 1142 CHECKS: 1143 for _, service := range services.Services { 1144 if service.ID == structs.ConsulServiceID { 1145 _, node, err := state.GetNode(check.Node) 1146 if err != nil { 1147 s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err) 1148 continue CHECKS 1149 } 1150 1151 serverAddr = node.Address 1152 serverPort = service.Port 1153 lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort)) 1154 svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr)) 1155 if svr != nil { 1156 serverID = svr.ID 1157 } 1158 break 1159 } 1160 } 1161 1162 // Create a fake member 1163 member := serf.Member{ 1164 Name: check.Node, 1165 Tags: map[string]string{ 1166 "dc": s.config.Datacenter, 1167 "role": "node", 1168 }, 1169 } 1170 1171 // Create the appropriate tags if this was a server node 1172 if serverPort > 0 { 1173 member.Tags["role"] = "consul" 1174 member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10) 1175 member.Tags["id"] = serverID 1176 member.Addr = net.ParseIP(serverAddr) 1177 } 1178 1179 // Attempt to reap this member 1180 if err := s.handleReapMember(member); err != nil { 1181 return err 1182 } 1183 } 1184 return nil 1185} 1186 1187// reconcileMember is used to do an async reconcile of a single 1188// serf member 1189func (s *Server) reconcileMember(member serf.Member) error { 1190 // Check if this is a member we should handle 1191 if !s.shouldHandleMember(member) { 1192 s.logger.Warn("skipping reconcile of node", "member", member) 1193 return nil 1194 } 1195 defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now()) 1196 var err error 1197 switch member.Status { 1198 case serf.StatusAlive: 1199 err = s.handleAliveMember(member) 1200 case serf.StatusFailed: 1201 err = s.handleFailedMember(member) 1202 case serf.StatusLeft: 1203 err = s.handleLeftMember(member) 1204 case StatusReap: 1205 err = s.handleReapMember(member) 1206 } 1207 if err != nil { 1208 s.logger.Error("failed to reconcile member", 1209 "member", member, 1210 "error", err, 1211 ) 1212 1213 // Permission denied should not bubble up 1214 if acl.IsErrPermissionDenied(err) { 1215 return nil 1216 } 1217 } 1218 return nil 1219} 1220 1221// shouldHandleMember checks if this is a Consul pool member 1222func (s *Server) shouldHandleMember(member serf.Member) bool { 1223 if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { 1224 return true 1225 } 1226 if valid, parts := metadata.IsConsulServer(member); valid && 1227 parts.Segment == "" && 1228 parts.Datacenter == s.config.Datacenter { 1229 return true 1230 } 1231 return false 1232} 1233 1234// handleAliveMember is used to ensure the node 1235// is registered, with a passing health check. 1236func (s *Server) handleAliveMember(member serf.Member) error { 1237 // Register consul service if a server 1238 var service *structs.NodeService 1239 if valid, parts := metadata.IsConsulServer(member); valid { 1240 service = &structs.NodeService{ 1241 ID: structs.ConsulServiceID, 1242 Service: structs.ConsulServiceName, 1243 Port: parts.Port, 1244 Weights: &structs.Weights{ 1245 Passing: 1, 1246 Warning: 1, 1247 }, 1248 Meta: map[string]string{ 1249 // DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul 1250 "non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"), 1251 "read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"), 1252 "raft_version": strconv.Itoa(parts.RaftVersion), 1253 "serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10), 1254 "serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10), 1255 "serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10), 1256 "version": parts.Build.String(), 1257 }, 1258 } 1259 1260 // Attempt to join the consul server 1261 if err := s.joinConsulServer(member, parts); err != nil { 1262 return err 1263 } 1264 } 1265 1266 // Check if the node exists 1267 state := s.fsm.State() 1268 _, node, err := state.GetNode(member.Name) 1269 if err != nil { 1270 return err 1271 } 1272 if node != nil && node.Address == member.Addr.String() { 1273 // Check if the associated service is available 1274 if service != nil { 1275 match := false 1276 _, services, err := state.NodeServices(nil, member.Name, structs.DefaultEnterpriseMeta()) 1277 if err != nil { 1278 return err 1279 } 1280 if services != nil { 1281 for id, serv := range services.Services { 1282 if id == service.ID { 1283 // If metadata are different, be sure to update it 1284 match = reflect.DeepEqual(serv.Meta, service.Meta) 1285 } 1286 } 1287 } 1288 if !match { 1289 goto AFTER_CHECK 1290 } 1291 } 1292 1293 // Check if the serfCheck is in the passing state 1294 _, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMeta()) 1295 if err != nil { 1296 return err 1297 } 1298 for _, check := range checks { 1299 if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing { 1300 return nil 1301 } 1302 } 1303 } 1304AFTER_CHECK: 1305 s.logger.Info("member joined, marking health alive", "member", member.Name) 1306 1307 // Register with the catalog. 1308 req := structs.RegisterRequest{ 1309 Datacenter: s.config.Datacenter, 1310 Node: member.Name, 1311 ID: types.NodeID(member.Tags["id"]), 1312 Address: member.Addr.String(), 1313 Service: service, 1314 Check: &structs.HealthCheck{ 1315 Node: member.Name, 1316 CheckID: structs.SerfCheckID, 1317 Name: structs.SerfCheckName, 1318 Status: api.HealthPassing, 1319 Output: structs.SerfCheckAliveOutput, 1320 }, 1321 } 1322 if node != nil { 1323 req.TaggedAddresses = node.TaggedAddresses 1324 req.NodeMeta = node.Meta 1325 } 1326 1327 _, err = s.raftApply(structs.RegisterRequestType, &req) 1328 return err 1329} 1330 1331// handleFailedMember is used to mark the node's status 1332// as being critical, along with all checks as unknown. 1333func (s *Server) handleFailedMember(member serf.Member) error { 1334 // Check if the node exists 1335 state := s.fsm.State() 1336 _, node, err := state.GetNode(member.Name) 1337 if err != nil { 1338 return err 1339 } 1340 1341 if node == nil { 1342 s.logger.Info("ignoring failed event for member because it does not exist in the catalog", "member", member.Name) 1343 return nil 1344 } 1345 1346 if node.Address == member.Addr.String() { 1347 // Check if the serfCheck is in the critical state 1348 _, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMeta()) 1349 if err != nil { 1350 return err 1351 } 1352 for _, check := range checks { 1353 if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical { 1354 return nil 1355 } 1356 } 1357 } 1358 s.logger.Info("member failed, marking health critical", "member", member.Name) 1359 1360 // Register with the catalog 1361 req := structs.RegisterRequest{ 1362 Datacenter: s.config.Datacenter, 1363 Node: member.Name, 1364 ID: types.NodeID(member.Tags["id"]), 1365 Address: member.Addr.String(), 1366 Check: &structs.HealthCheck{ 1367 Node: member.Name, 1368 CheckID: structs.SerfCheckID, 1369 Name: structs.SerfCheckName, 1370 Status: api.HealthCritical, 1371 Output: structs.SerfCheckFailedOutput, 1372 }, 1373 1374 // If there's existing information about the node, do not 1375 // clobber it. 1376 SkipNodeUpdate: true, 1377 } 1378 _, err = s.raftApply(structs.RegisterRequestType, &req) 1379 return err 1380} 1381 1382// handleLeftMember is used to handle members that gracefully 1383// left. They are deregistered if necessary. 1384func (s *Server) handleLeftMember(member serf.Member) error { 1385 return s.handleDeregisterMember("left", member) 1386} 1387 1388// handleReapMember is used to handle members that have been 1389// reaped after a prolonged failure. They are deregistered. 1390func (s *Server) handleReapMember(member serf.Member) error { 1391 return s.handleDeregisterMember("reaped", member) 1392} 1393 1394// handleDeregisterMember is used to deregister a member of a given reason 1395func (s *Server) handleDeregisterMember(reason string, member serf.Member) error { 1396 // Do not deregister ourself. This can only happen if the current leader 1397 // is leaving. Instead, we should allow a follower to take-over and 1398 // deregister us later. 1399 if member.Name == s.config.NodeName { 1400 s.logger.Warn("deregistering self should be done by follower", "name", s.config.NodeName) 1401 return nil 1402 } 1403 1404 // Remove from Raft peers if this was a server 1405 if valid, _ := metadata.IsConsulServer(member); valid { 1406 if err := s.removeConsulServer(member); err != nil { 1407 return err 1408 } 1409 } 1410 1411 // Check if the node does not exist 1412 state := s.fsm.State() 1413 _, node, err := state.GetNode(member.Name) 1414 if err != nil { 1415 return err 1416 } 1417 if node == nil { 1418 return nil 1419 } 1420 1421 // Deregister the node 1422 s.logger.Info("deregistering member", "member", member.Name, "reason", reason) 1423 req := structs.DeregisterRequest{ 1424 Datacenter: s.config.Datacenter, 1425 Node: member.Name, 1426 } 1427 _, err = s.raftApply(structs.DeregisterRequestType, &req) 1428 return err 1429} 1430 1431// joinConsulServer is used to try to join another consul server 1432func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { 1433 // Check for possibility of multiple bootstrap nodes 1434 if parts.Bootstrap { 1435 members := s.serfLAN.Members() 1436 for _, member := range members { 1437 valid, p := metadata.IsConsulServer(member) 1438 if valid && member.Name != m.Name && p.Bootstrap { 1439 s.logger.Error("Two nodes are in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", 1440 "node_to_add", m.Name, 1441 "other", member.Name, 1442 ) 1443 return nil 1444 } 1445 } 1446 } 1447 1448 // We used to do a check here and prevent adding the server if the cluster size was too small (1 or 2 servers) as a means 1449 // of preventing the case where we may remove ourselves and cause a loss of leadership. The Autopilot AddServer function 1450 // will now handle simple address updates better and so long as the address doesn't conflict with another node 1451 // it will not require a removal but will instead just update the address. If it would require a removal of other nodes 1452 // due to conflicts then the logic regarding cluster sizes will kick in and prevent doing anything dangerous that could 1453 // cause loss of leadership. 1454 1455 // get the autpilot library version of a server from the serf member 1456 apServer, err := s.autopilotServer(m) 1457 if err != nil { 1458 return err 1459 } 1460 1461 // now ask autopilot to add it 1462 return s.autopilot.AddServer(apServer) 1463} 1464 1465// removeConsulServer is used to try to remove a consul server that has left 1466func (s *Server) removeConsulServer(m serf.Member) error { 1467 server, err := s.autopilotServer(m) 1468 if err != nil || server == nil { 1469 return err 1470 } 1471 1472 return s.autopilot.RemoveServer(server.ID) 1473} 1474 1475// reapTombstones is invoked by the current leader to manage garbage 1476// collection of tombstones. When a key is deleted, we trigger a tombstone 1477// GC clock. Once the expiration is reached, this routine is invoked 1478// to clear all tombstones before this index. This must be replicated 1479// through Raft to ensure consistency. We do this outside the leader loop 1480// to avoid blocking. 1481func (s *Server) reapTombstones(index uint64) { 1482 defer metrics.MeasureSince([]string{"leader", "reapTombstones"}, time.Now()) 1483 req := structs.TombstoneRequest{ 1484 Datacenter: s.config.Datacenter, 1485 Op: structs.TombstoneReap, 1486 ReapIndex: index, 1487 } 1488 _, err := s.raftApply(structs.TombstoneRequestType, &req) 1489 if err != nil { 1490 s.logger.Error("failed to reap tombstones up to index", 1491 "index", index, 1492 "error", err, 1493 ) 1494 } 1495} 1496 1497func (s *Server) setDatacenterSupportsFederationStates() { 1498 atomic.StoreInt32(&s.dcSupportsFederationStates, 1) 1499} 1500 1501func (s *Server) DatacenterSupportsFederationStates() bool { 1502 if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { 1503 return true 1504 } 1505 1506 state := serversFederationStatesInfo{ 1507 supported: true, 1508 found: false, 1509 } 1510 1511 // if we are in a secondary, check if they are supported in the primary dc 1512 if s.config.PrimaryDatacenter != s.config.Datacenter { 1513 s.router.CheckServers(s.config.PrimaryDatacenter, state.update) 1514 1515 if !state.supported || !state.found { 1516 s.logger.Debug("federation states are not enabled in the primary dc") 1517 return false 1518 } 1519 } 1520 1521 // check the servers in the local DC 1522 s.router.CheckServers(s.config.Datacenter, state.update) 1523 1524 if state.supported && state.found { 1525 s.setDatacenterSupportsFederationStates() 1526 return true 1527 } 1528 1529 s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter) 1530 return false 1531} 1532 1533type serversFederationStatesInfo struct { 1534 // supported indicates whether every processed server supports federation states 1535 supported bool 1536 1537 // found indicates that at least one server was processed 1538 found bool 1539} 1540 1541func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool { 1542 if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed { 1543 // they are left or something so regardless we treat these servers as meeting 1544 // the version requirement 1545 return true 1546 } 1547 1548 // mark that we processed at least one server 1549 s.found = true 1550 1551 if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 { 1552 return true 1553 } 1554 1555 // mark that at least one server does not support federation states 1556 s.supported = false 1557 1558 // prevent continuing server evaluation 1559 return false 1560} 1561 1562func (s *Server) setDatacenterSupportsIntentionsAsConfigEntries() { 1563 atomic.StoreInt32(&s.dcSupportsIntentionsAsConfigEntries, 1) 1564} 1565 1566func (s *Server) DatacenterSupportsIntentionsAsConfigEntries() bool { 1567 if atomic.LoadInt32(&s.dcSupportsIntentionsAsConfigEntries) != 0 { 1568 return true 1569 } 1570 1571 state := serversIntentionsAsConfigEntriesInfo{ 1572 supported: true, 1573 found: false, 1574 } 1575 1576 // if we are in a secondary, check if they are supported in the primary dc 1577 if s.config.PrimaryDatacenter != s.config.Datacenter { 1578 s.router.CheckServers(s.config.PrimaryDatacenter, state.update) 1579 1580 if !state.supported || !state.found { 1581 s.logger.Debug("intentions have not been migrated to config entries in the primary dc yet") 1582 return false 1583 } 1584 } 1585 1586 // check the servers in the local DC 1587 s.router.CheckServers(s.config.Datacenter, state.update) 1588 1589 if state.supported && state.found { 1590 s.setDatacenterSupportsIntentionsAsConfigEntries() 1591 return true 1592 } 1593 1594 s.logger.Debug("intentions cannot be migrated to config entries in this datacenter", "datacenter", s.config.Datacenter) 1595 return false 1596} 1597 1598type serversIntentionsAsConfigEntriesInfo struct { 1599 // supported indicates whether every processed server supports intentions as config entries 1600 supported bool 1601 1602 // found indicates that at least one server was processed 1603 found bool 1604} 1605 1606func (s *serversIntentionsAsConfigEntriesInfo) update(srv *metadata.Server) bool { 1607 if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed { 1608 // they are left or something so regardless we treat these servers as meeting 1609 // the version requirement 1610 return true 1611 } 1612 1613 // mark that we processed at least one server 1614 s.found = true 1615 1616 if supported, ok := srv.FeatureFlags["si"]; ok && supported == 1 { 1617 return true 1618 } 1619 1620 // mark that at least one server does not support service-intentions 1621 s.supported = false 1622 1623 // prevent continuing server evaluation 1624 return false 1625} 1626