1package consul
2
3import (
4	"context"
5	"fmt"
6	"net"
7	"reflect"
8	"strconv"
9	"sync"
10	"sync/atomic"
11	"time"
12
13	"github.com/armon/go-metrics"
14	"github.com/armon/go-metrics/prometheus"
15	"github.com/hashicorp/go-hclog"
16	"github.com/hashicorp/go-memdb"
17	"github.com/hashicorp/go-uuid"
18	"github.com/hashicorp/go-version"
19	"github.com/hashicorp/raft"
20	"github.com/hashicorp/serf/serf"
21	"golang.org/x/time/rate"
22
23	"github.com/hashicorp/consul/acl"
24	"github.com/hashicorp/consul/agent/metadata"
25	"github.com/hashicorp/consul/agent/structs"
26	"github.com/hashicorp/consul/api"
27	"github.com/hashicorp/consul/lib"
28	"github.com/hashicorp/consul/logging"
29	"github.com/hashicorp/consul/types"
30)
31
32var LeaderSummaries = []prometheus.SummaryDefinition{
33	{
34		Name: []string{"leader", "barrier"},
35		Help: "Measures the time spent waiting for the raft barrier upon gaining leadership.",
36	},
37	{
38		Name: []string{"leader", "reconcileMember"},
39		Help: "Measures the time spent updating the raft store for a single serf member's information.",
40	},
41	{
42		Name: []string{"leader", "reapTombstones"},
43		Help: "Measures the time spent clearing tombstones.",
44	},
45}
46
47const (
48	newLeaderEvent      = "consul:new-leader"
49	barrierWriteTimeout = 2 * time.Minute
50)
51
52var (
53	// caRootPruneInterval is how often we check for stale CARoots to remove.
54	caRootPruneInterval = time.Hour
55
56	// minCentralizedConfigVersion is the minimum Consul version in which centralized
57	// config is supported
58	minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
59)
60
61// monitorLeadership is used to monitor if we acquire or lose our role
62// as the leader in the Raft cluster. There is some work the leader is
63// expected to do, so we must react to changes
64func (s *Server) monitorLeadership() {
65	// We use the notify channel we configured Raft with, NOT Raft's
66	// leaderCh, which is only notified best-effort. Doing this ensures
67	// that we get all notifications in order, which is required for
68	// cleanup and to ensure we never run multiple leader loops.
69	raftNotifyCh := s.raftNotifyCh
70
71	aclModeCheckWait := aclModeCheckMinInterval
72	var aclUpgradeCh <-chan time.Time
73	if s.config.ACLsEnabled {
74		aclUpgradeCh = time.After(aclModeCheckWait)
75	}
76	var weAreLeaderCh chan struct{}
77	var leaderLoop sync.WaitGroup
78	for {
79		select {
80		case isLeader := <-raftNotifyCh:
81			switch {
82			case isLeader:
83				if weAreLeaderCh != nil {
84					s.logger.Error("attempted to start the leader loop while running")
85					continue
86				}
87
88				weAreLeaderCh = make(chan struct{})
89				leaderLoop.Add(1)
90				go func(ch chan struct{}) {
91					defer leaderLoop.Done()
92					s.leaderLoop(ch)
93				}(weAreLeaderCh)
94				s.logger.Info("cluster leadership acquired")
95
96			default:
97				if weAreLeaderCh == nil {
98					s.logger.Error("attempted to stop the leader loop while not running")
99					continue
100				}
101
102				s.logger.Debug("shutting down leader loop")
103				close(weAreLeaderCh)
104				leaderLoop.Wait()
105				weAreLeaderCh = nil
106				s.logger.Info("cluster leadership lost")
107			}
108		case <-aclUpgradeCh:
109			if atomic.LoadInt32(&s.useNewACLs) == 0 {
110				aclModeCheckWait = aclModeCheckWait * 2
111				if aclModeCheckWait > aclModeCheckMaxInterval {
112					aclModeCheckWait = aclModeCheckMaxInterval
113				}
114				aclUpgradeCh = time.After(aclModeCheckWait)
115
116				if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade {
117					if weAreLeaderCh != nil {
118						if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil {
119							s.logger.Error("error transitioning to using new ACLs", "error", err)
120							continue
121						}
122					}
123
124					s.logger.Debug("transitioning out of legacy ACL mode")
125					atomic.StoreInt32(&s.useNewACLs, 1)
126					s.updateACLAdvertisement()
127
128					// setting this to nil ensures that we will never hit this case again
129					aclUpgradeCh = nil
130				}
131			} else {
132				// establishLeadership probably transitioned us
133				aclUpgradeCh = nil
134			}
135		case <-s.shutdownCh:
136			return
137		}
138	}
139}
140
141func (s *Server) leadershipTransfer() error {
142	retryCount := 3
143	for i := 0; i < retryCount; i++ {
144		future := s.raft.LeadershipTransfer()
145		if err := future.Error(); err != nil {
146			s.logger.Error("failed to transfer leadership attempt, will retry",
147				"attempt", i,
148				"retry_limit", retryCount,
149				"error", err,
150			)
151		} else {
152			s.logger.Info("successfully transferred leadership",
153				"attempt", i,
154				"retry_limit", retryCount,
155			)
156			return nil
157		}
158
159	}
160	return fmt.Errorf("failed to transfer leadership in %d attempts", retryCount)
161}
162
163// leaderLoop runs as long as we are the leader to run various
164// maintenance activities
165func (s *Server) leaderLoop(stopCh chan struct{}) {
166	stopCtx := &lib.StopChannelContext{StopCh: stopCh}
167
168	// Fire a user event indicating a new leader
169	payload := []byte(s.config.NodeName)
170	for name, segment := range s.LANSegments() {
171		if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
172			s.logger.Warn("failed to broadcast new leader event on segment",
173				"segment", name,
174				"error", err,
175			)
176		}
177	}
178
179	// Reconcile channel is only used once initial reconcile
180	// has succeeded
181	var reconcileCh chan serf.Member
182	establishedLeader := false
183
184RECONCILE:
185	// Setup a reconciliation timer
186	reconcileCh = nil
187	interval := time.After(s.config.ReconcileInterval)
188
189	// Apply a raft barrier to ensure our FSM is caught up
190	start := time.Now()
191	barrier := s.raft.Barrier(barrierWriteTimeout)
192	if err := barrier.Error(); err != nil {
193		s.logger.Error("failed to wait for barrier", "error", err)
194		goto WAIT
195	}
196	metrics.MeasureSince([]string{"leader", "barrier"}, start)
197
198	// Check if we need to handle initial leadership actions
199	if !establishedLeader {
200		if err := s.establishLeadership(stopCtx); err != nil {
201			s.logger.Error("failed to establish leadership", "error", err)
202			// Immediately revoke leadership since we didn't successfully
203			// establish leadership.
204			s.revokeLeadership()
205
206			// attempt to transfer leadership. If successful it is
207			// time to leave the leaderLoop since this node is no
208			// longer the leader. If leadershipTransfer() fails, we
209			// will try to acquire it again after
210			// 5 seconds.
211			if err := s.leadershipTransfer(); err != nil {
212				s.logger.Error("failed to transfer leadership", "error", err)
213				interval = time.After(5 * time.Second)
214				goto WAIT
215			}
216			return
217		}
218		establishedLeader = true
219		defer s.revokeLeadership()
220	}
221
222	// Reconcile any missing data
223	if err := s.reconcile(); err != nil {
224		s.logger.Error("failed to reconcile", "error", err)
225		goto WAIT
226	}
227
228	// Initial reconcile worked, now we can process the channel
229	// updates
230	reconcileCh = s.reconcileCh
231
232WAIT:
233	// Poll the stop channel to give it priority so we don't waste time
234	// trying to perform the other operations if we have been asked to shut
235	// down.
236	select {
237	case <-stopCh:
238		return
239	default:
240	}
241
242	// Periodically reconcile as long as we are the leader,
243	// or when Serf events arrive
244	for {
245		select {
246		case <-stopCh:
247			return
248		case <-s.shutdownCh:
249			return
250		case <-interval:
251			goto RECONCILE
252		case member := <-reconcileCh:
253			s.reconcileMember(member)
254		case index := <-s.tombstoneGC.ExpireCh():
255			go s.reapTombstones(index)
256		case errCh := <-s.reassertLeaderCh:
257			// we can get into this state when the initial
258			// establishLeadership has failed as well as the follow
259			// up leadershipTransfer. Afterwards we will be waiting
260			// for the interval to trigger a reconciliation and can
261			// potentially end up here. There is no point to
262			// reassert because this agent was never leader in the
263			// first place.
264			if !establishedLeader {
265				errCh <- fmt.Errorf("leadership has not been established")
266				continue
267			}
268
269			// continue to reassert only if we previously were the
270			// leader, which means revokeLeadership followed by an
271			// establishLeadership().
272			s.revokeLeadership()
273			err := s.establishLeadership(stopCtx)
274			errCh <- err
275
276			// in case establishLeadership failed, we will try to
277			// transfer leadership. At this time raft thinks we are
278			// the leader, but consul disagrees.
279			if err != nil {
280				if err := s.leadershipTransfer(); err != nil {
281					// establishedLeader was true before,
282					// but it no longer is since it revoked
283					// leadership and Leadership transfer
284					// also failed. Which is why it stays
285					// in the leaderLoop, but now
286					// establishedLeader needs to be set to
287					// false.
288					establishedLeader = false
289					interval = time.After(5 * time.Second)
290					goto WAIT
291				}
292
293				// leadershipTransfer was successful and it is
294				// time to leave the leaderLoop.
295				return
296			}
297
298		}
299	}
300}
301
302// establishLeadership is invoked once we become leader and are able
303// to invoke an initial barrier. The barrier is used to ensure any
304// previously inflight transactions have been committed and that our
305// state is up-to-date.
306func (s *Server) establishLeadership(ctx context.Context) error {
307	start := time.Now()
308	// check for the upgrade here - this helps us transition to new ACLs much
309	// quicker if this is a new cluster or this is a test agent
310	if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade {
311		if err := s.initializeACLs(ctx, true); err != nil {
312			return err
313		}
314		atomic.StoreInt32(&s.useNewACLs, 1)
315		s.updateACLAdvertisement()
316	} else if err := s.initializeACLs(ctx, false); err != nil {
317		return err
318	}
319
320	// Hint the tombstone expiration timer. When we freshly establish leadership
321	// we become the authoritative timer, and so we need to start the clock
322	// on any pending GC events.
323	s.tombstoneGC.SetEnabled(true)
324	lastIndex := s.raft.LastIndex()
325	s.tombstoneGC.Hint(lastIndex)
326
327	// Setup the session timers. This is done both when starting up or when
328	// a leader fail over happens. Since the timers are maintained by the leader
329	// node along, effectively this means all the timers are renewed at the
330	// time of failover. The TTL contract is that the session will not be expired
331	// before the TTL, so expiring it later is allowable.
332	//
333	// This MUST be done after the initial barrier to ensure the latest Sessions
334	// are available to be initialized. Otherwise initialization may use stale
335	// data.
336	if err := s.initializeSessionTimers(); err != nil {
337		return err
338	}
339
340	if err := s.establishEnterpriseLeadership(ctx); err != nil {
341		return err
342	}
343
344	s.getOrCreateAutopilotConfig()
345	s.autopilot.Start(ctx)
346
347	s.startConfigReplication(ctx)
348
349	s.startFederationStateReplication(ctx)
350
351	s.startFederationStateAntiEntropy(ctx)
352
353	if err := s.startConnectLeader(ctx); err != nil {
354		return err
355	}
356
357	// Attempt to bootstrap config entries. We wait until after starting the
358	// Connect leader tasks so we hopefully have transitioned to supporting
359	// service-intentions.
360	if err := s.bootstrapConfigEntries(s.config.ConfigEntryBootstrap); err != nil {
361		return err
362	}
363
364	s.setConsistentReadReady()
365
366	s.logger.Debug("successfully established leadership", "duration", time.Since(start))
367	return nil
368}
369
370// revokeLeadership is invoked once we step down as leader.
371// This is used to cleanup any state that may be specific to a leader.
372func (s *Server) revokeLeadership() {
373	// Disable the tombstone GC, since it is only useful as a leader
374	s.tombstoneGC.SetEnabled(false)
375
376	// Clear the session timers on either shutdown or step down, since we
377	// are no longer responsible for session expirations.
378	s.clearAllSessionTimers()
379
380	s.revokeEnterpriseLeadership()
381
382	s.stopFederationStateAntiEntropy()
383
384	s.stopFederationStateReplication()
385
386	s.stopConfigReplication()
387
388	s.stopConnectLeader()
389
390	s.caManager.setCAProvider(nil, nil)
391	s.caManager.setState(caStateUninitialized, false)
392
393	s.stopACLTokenReaping()
394
395	s.stopACLUpgrade()
396
397	s.resetConsistentReadReady()
398
399	// Stop returns a chan and we want to block until it is closed
400	// which indicates that autopilot is actually stopped.
401	<-s.autopilot.Stop()
402}
403
404// DEPRECATED (ACL-Legacy-Compat) - Remove once old ACL compatibility is removed
405func (s *Server) initializeLegacyACL() error {
406	if !s.config.ACLsEnabled {
407		return nil
408	}
409
410	authDC := s.config.ACLDatacenter
411
412	// Create anonymous token if missing.
413	state := s.fsm.State()
414	_, token, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil)
415	if err != nil {
416		return fmt.Errorf("failed to get anonymous token: %v", err)
417	}
418	// Ignoring expiration times to avoid an insertion collision.
419	if token == nil {
420		req := structs.ACLRequest{
421			Datacenter: authDC,
422			Op:         structs.ACLSet,
423			ACL: structs.ACL{
424				ID:   anonymousToken,
425				Name: "Anonymous Token",
426				Type: structs.ACLTokenTypeClient,
427			},
428		}
429		_, err := s.raftApply(structs.ACLRequestType, &req)
430		if err != nil {
431			return fmt.Errorf("failed to create anonymous token: %v", err)
432		}
433		s.logger.Info("Created the anonymous token")
434	}
435
436	// Check for configured master token.
437	if master := s.config.ACLMasterToken; len(master) > 0 {
438		_, token, err = state.ACLTokenGetBySecret(nil, master, nil)
439		if err != nil {
440			return fmt.Errorf("failed to get master token: %v", err)
441		}
442		// Ignoring expiration times to avoid an insertion collision.
443		if token == nil {
444			req := structs.ACLRequest{
445				Datacenter: authDC,
446				Op:         structs.ACLSet,
447				ACL: structs.ACL{
448					ID:   master,
449					Name: "Master Token",
450					Type: structs.ACLTokenTypeManagement,
451				},
452			}
453			_, err := s.raftApply(structs.ACLRequestType, &req)
454			if err != nil {
455				return fmt.Errorf("failed to create master token: %v", err)
456			}
457			s.logger.Info("Created ACL master token from configuration")
458		}
459	}
460
461	// Check to see if we need to initialize the ACL bootstrap info. This
462	// needs a Consul version check since it introduces a new Raft operation
463	// that'll produce an error on older servers, and it also makes a piece
464	// of state in the state store that will cause problems with older
465	// servers consuming snapshots, so we have to wait to create it.
466	var minVersion = version.Must(version.NewVersion("0.9.1"))
467	if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVersion); ok {
468		canBootstrap, _, err := state.CanBootstrapACLToken()
469		if err != nil {
470			return fmt.Errorf("failed looking for ACL bootstrap info: %v", err)
471		}
472		if canBootstrap {
473			req := structs.ACLRequest{
474				Datacenter: authDC,
475				Op:         structs.ACLBootstrapInit,
476			}
477			resp, err := s.raftApply(structs.ACLRequestType, &req)
478			if err != nil {
479				return fmt.Errorf("failed to initialize ACL bootstrap: %v", err)
480			}
481			switch v := resp.(type) {
482			case bool:
483				if v {
484					s.logger.Info("ACL bootstrap enabled")
485				} else {
486					s.logger.Info("ACL bootstrap disabled, existing management tokens found")
487				}
488
489			default:
490				return fmt.Errorf("unexpected response trying to initialize ACL bootstrap: %T", v)
491			}
492		}
493	} else {
494		s.logger.Warn("Can't initialize ACL bootstrap until all servers are >= " + minVersion.String())
495	}
496
497	return nil
498}
499
500// initializeACLs is used to setup the ACLs if we are the leader
501// and need to do this.
502func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
503	if !s.config.ACLsEnabled {
504		return nil
505	}
506
507	// Purge the cache, since it could've changed while we were not the
508	// leader.
509	s.acls.cache.Purge()
510
511	// Purge the auth method validators since they could've changed while we
512	// were not leader.
513	s.aclAuthMethodValidators.Purge()
514
515	// Remove any token affected by CVE-2019-8336
516	if !s.InACLDatacenter() {
517		_, token, err := s.fsm.State().ACLTokenGetBySecret(nil, redactedToken, nil)
518		if err == nil && token != nil {
519			req := structs.ACLTokenBatchDeleteRequest{
520				TokenIDs: []string{token.AccessorID},
521			}
522
523			_, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req)
524			if err != nil {
525				return fmt.Errorf("failed to remove token with a redacted secret: %v", err)
526			}
527		}
528	}
529
530	if s.InACLDatacenter() {
531		if s.UseLegacyACLs() && !upgrade {
532			s.logger.Info("initializing legacy acls")
533			return s.initializeLegacyACL()
534		}
535
536		s.logger.Info("initializing acls")
537
538		// Create/Upgrade the builtin global-management policy
539		_, policy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMeta())
540		if err != nil {
541			return fmt.Errorf("failed to get the builtin global-management policy")
542		}
543		if policy == nil || policy.Rules != structs.ACLPolicyGlobalManagement {
544			newPolicy := structs.ACLPolicy{
545				ID:             structs.ACLPolicyGlobalManagementID,
546				Name:           "global-management",
547				Description:    "Builtin Policy that grants unlimited access",
548				Rules:          structs.ACLPolicyGlobalManagement,
549				Syntax:         acl.SyntaxCurrent,
550				EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
551			}
552			if policy != nil {
553				newPolicy.Name = policy.Name
554				newPolicy.Description = policy.Description
555			}
556
557			newPolicy.SetHash(true)
558
559			req := structs.ACLPolicyBatchSetRequest{
560				Policies: structs.ACLPolicies{&newPolicy},
561			}
562			_, err := s.raftApply(structs.ACLPolicySetRequestType, &req)
563			if err != nil {
564				return fmt.Errorf("failed to create global-management policy: %v", err)
565			}
566			s.logger.Info("Created ACL 'global-management' policy")
567		}
568
569		// Check for configured master token.
570		if master := s.config.ACLMasterToken; len(master) > 0 {
571			state := s.fsm.State()
572			if _, err := uuid.ParseUUID(master); err != nil {
573				s.logger.Warn("Configuring a non-UUID master token is deprecated")
574			}
575
576			_, token, err := state.ACLTokenGetBySecret(nil, master, nil)
577			if err != nil {
578				return fmt.Errorf("failed to get master token: %v", err)
579			}
580			// Ignoring expiration times to avoid an insertion collision.
581			if token == nil {
582				accessor, err := lib.GenerateUUID(s.checkTokenUUID)
583				if err != nil {
584					return fmt.Errorf("failed to generate the accessor ID for the master token: %v", err)
585				}
586
587				token := structs.ACLToken{
588					AccessorID:  accessor,
589					SecretID:    master,
590					Description: "Master Token",
591					Policies: []structs.ACLTokenPolicyLink{
592						{
593							ID: structs.ACLPolicyGlobalManagementID,
594						},
595					},
596					CreateTime: time.Now(),
597					Local:      false,
598
599					// DEPRECATED (ACL-Legacy-Compat) - only needed for compatibility
600					Type:           structs.ACLTokenTypeManagement,
601					EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
602				}
603
604				token.SetHash(true)
605
606				done := false
607				if canBootstrap, _, err := state.CanBootstrapACLToken(); err == nil && canBootstrap {
608					req := structs.ACLTokenBootstrapRequest{
609						Token:      token,
610						ResetIndex: 0,
611					}
612					if _, err := s.raftApply(structs.ACLBootstrapRequestType, &req); err == nil {
613						s.logger.Info("Bootstrapped ACL master token from configuration")
614						done = true
615					} else {
616						if err.Error() != structs.ACLBootstrapNotAllowedErr.Error() &&
617							err.Error() != structs.ACLBootstrapInvalidResetIndexErr.Error() {
618							return fmt.Errorf("failed to bootstrap master token: %v", err)
619						}
620					}
621				}
622
623				if !done {
624					// either we didn't attempt to or setting the token with a bootstrap request failed.
625					req := structs.ACLTokenBatchSetRequest{
626						Tokens: structs.ACLTokens{&token},
627						CAS:    false,
628					}
629					if _, err := s.raftApply(structs.ACLTokenSetRequestType, &req); err != nil {
630						return fmt.Errorf("failed to create master token: %v", err)
631					}
632
633					s.logger.Info("Created ACL master token from configuration")
634				}
635			}
636		}
637
638		state := s.fsm.State()
639		_, token, err := state.ACLTokenGetBySecret(nil, structs.ACLTokenAnonymousID, nil)
640		if err != nil {
641			return fmt.Errorf("failed to get anonymous token: %v", err)
642		}
643		// Ignoring expiration times to avoid an insertion collision.
644		if token == nil {
645			// DEPRECATED (ACL-Legacy-Compat) - Don't need to query for previous "anonymous" token
646			// check for legacy token that needs an upgrade
647			_, legacyToken, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil)
648			if err != nil {
649				return fmt.Errorf("failed to get anonymous token: %v", err)
650			}
651			// Ignoring expiration times to avoid an insertion collision.
652
653			// the token upgrade routine will take care of upgrading the token if a legacy version exists
654			if legacyToken == nil {
655				token = &structs.ACLToken{
656					AccessorID:     structs.ACLTokenAnonymousID,
657					SecretID:       anonymousToken,
658					Description:    "Anonymous Token",
659					CreateTime:     time.Now(),
660					EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
661				}
662				token.SetHash(true)
663
664				req := structs.ACLTokenBatchSetRequest{
665					Tokens: structs.ACLTokens{token},
666					CAS:    false,
667				}
668				_, err := s.raftApply(structs.ACLTokenSetRequestType, &req)
669				if err != nil {
670					return fmt.Errorf("failed to create anonymous token: %v", err)
671				}
672				s.logger.Info("Created ACL anonymous token from configuration")
673			}
674		}
675		// launch the upgrade go routine to generate accessors for everything
676		s.startACLUpgrade(ctx)
677	} else {
678		if s.UseLegacyACLs() && !upgrade {
679			if s.IsACLReplicationEnabled() {
680				s.startLegacyACLReplication(ctx)
681			}
682			// return early as we don't want to start new ACL replication
683			// or ACL token reaping as these are new ACL features.
684			return nil
685		}
686
687		if upgrade {
688			s.stopACLReplication()
689		}
690
691		// ACL replication is now mandatory
692		s.startACLReplication(ctx)
693	}
694
695	s.startACLTokenReaping(ctx)
696
697	return nil
698}
699
700// This function is only intended to be run as a managed go routine, it will block until
701// the context passed in indicates that it should exit.
702func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
703	limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit))
704	for {
705		if err := limiter.Wait(ctx); err != nil {
706			return err
707		}
708
709		// actually run the upgrade here
710		state := s.fsm.State()
711		tokens, waitCh, err := state.ACLTokenListUpgradeable(aclUpgradeBatchSize)
712		if err != nil {
713			s.logger.Warn("encountered an error while searching for tokens without accessor ids", "error", err)
714		}
715		// No need to check expiration time here, as that only exists for v2 tokens.
716
717		if len(tokens) == 0 {
718			ws := memdb.NewWatchSet()
719			ws.Add(state.AbandonCh())
720			ws.Add(waitCh)
721			ws.Add(ctx.Done())
722
723			// wait for more tokens to need upgrading or the aclUpgradeCh to be closed
724			ws.Watch(nil)
725			continue
726		}
727
728		var newTokens structs.ACLTokens
729		for _, token := range tokens {
730			// This should be entirely unnecessary but is just a small safeguard against changing accessor IDs
731			if token.AccessorID != "" {
732				continue
733			}
734
735			newToken := *token
736			if token.SecretID == anonymousToken {
737				newToken.AccessorID = structs.ACLTokenAnonymousID
738			} else {
739				accessor, err := lib.GenerateUUID(s.checkTokenUUID)
740				if err != nil {
741					s.logger.Warn("failed to generate accessor during token auto-upgrade", "error", err)
742					continue
743				}
744				newToken.AccessorID = accessor
745			}
746
747			// Assign the global-management policy to legacy management tokens
748			if len(newToken.Policies) == 0 &&
749				len(newToken.ServiceIdentities) == 0 &&
750				len(newToken.NodeIdentities) == 0 &&
751				len(newToken.Roles) == 0 &&
752				newToken.Type == structs.ACLTokenTypeManagement {
753				newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID})
754			}
755
756			// need to copy these as we are going to do a CAS operation.
757			newToken.CreateIndex = token.CreateIndex
758			newToken.ModifyIndex = token.ModifyIndex
759
760			newToken.SetHash(true)
761
762			newTokens = append(newTokens, &newToken)
763		}
764
765		req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
766
767		_, err = s.raftApply(structs.ACLTokenSetRequestType, req)
768		if err != nil {
769			s.logger.Error("failed to apply acl token upgrade batch", "error", err)
770		}
771	}
772}
773
774func (s *Server) startACLUpgrade(ctx context.Context) {
775	if s.config.PrimaryDatacenter != s.config.Datacenter {
776		// token upgrades should only run in the primary
777		return
778	}
779
780	s.leaderRoutineManager.Start(ctx, aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
781}
782
783func (s *Server) stopACLUpgrade() {
784	s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
785}
786
787// This function is only intended to be run as a managed go routine, it will block until
788// the context passed in indicates that it should exit.
789func (s *Server) runLegacyACLReplication(ctx context.Context) error {
790	var lastRemoteIndex uint64
791	legacyACLLogger := s.aclReplicationLogger(logging.Legacy)
792	limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
793
794	for {
795		if err := limiter.Wait(ctx); err != nil {
796			return err
797		}
798
799		if s.tokens.ReplicationToken() == "" {
800			continue
801		}
802
803		index, exit, err := s.replicateLegacyACLs(ctx, legacyACLLogger, lastRemoteIndex)
804		if exit {
805			return nil
806		}
807
808		if err != nil {
809			metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
810				0,
811			)
812			lastRemoteIndex = 0
813			s.updateACLReplicationStatusError()
814			legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
815		} else {
816			metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
817				1,
818			)
819			metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
820				float32(index),
821			)
822			lastRemoteIndex = index
823			s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
824			legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
825		}
826	}
827}
828
829func (s *Server) startLegacyACLReplication(ctx context.Context) {
830	if s.InACLDatacenter() {
831		return
832	}
833
834	// unlike some other leader routines this initializes some extra state
835	// and therefore we want to prevent re-initialization if things are already
836	// running
837	if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) {
838		return
839	}
840
841	s.initReplicationStatus()
842
843	s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication)
844	s.logger.Info("started legacy ACL replication")
845	s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
846}
847
848func (s *Server) startACLReplication(ctx context.Context) {
849	if s.InACLDatacenter() {
850		return
851	}
852
853	// unlike some other leader routines this initializes some extra state
854	// and therefore we want to prevent re-initialization if things are already
855	// running
856	if s.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName) {
857		return
858	}
859
860	s.initReplicationStatus()
861	s.leaderRoutineManager.Start(ctx, aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
862	s.leaderRoutineManager.Start(ctx, aclRoleReplicationRoutineName, s.runACLRoleReplicator)
863
864	if s.config.ACLTokenReplication {
865		s.leaderRoutineManager.Start(ctx, aclTokenReplicationRoutineName, s.runACLTokenReplicator)
866		s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens)
867	} else {
868		s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies)
869	}
870}
871
872type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error)
873
874// This function is only intended to be run as a managed go routine, it will block until
875// the context passed in indicates that it should exit.
876func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
877	policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun())
878	policyLogger.Info("started ACL Policy replication")
879	return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies")
880}
881
882// This function is only intended to be run as a managed go routine, it will block until
883// the context passed in indicates that it should exit.
884func (s *Server) runACLRoleReplicator(ctx context.Context) error {
885	roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun())
886	roleLogger.Info("started ACL Role replication")
887	return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles")
888}
889
890// This function is only intended to be run as a managed go routine, it will block until
891// the context passed in indicates that it should exit.
892func (s *Server) runACLTokenReplicator(ctx context.Context) error {
893	tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun())
894	tokenLogger.Info("started ACL Token replication")
895	return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens")
896}
897
898// This function is only intended to be run as a managed go routine, it will block until
899// the context passed in indicates that it should exit.
900func (s *Server) runACLReplicator(
901	ctx context.Context,
902	logger hclog.Logger,
903	replicationType structs.ACLReplicationType,
904	replicateFunc replicateFunc,
905	metricName string,
906) error {
907	var failedAttempts uint
908	limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
909
910	var lastRemoteIndex uint64
911	for {
912		if err := limiter.Wait(ctx); err != nil {
913			return err
914		}
915
916		if s.tokens.ReplicationToken() == "" {
917			continue
918		}
919
920		index, exit, err := replicateFunc(ctx, logger, lastRemoteIndex)
921		if exit {
922			return nil
923		}
924
925		if err != nil {
926			metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
927				0,
928			)
929			lastRemoteIndex = 0
930			s.updateACLReplicationStatusError()
931			logger.Warn("ACL replication error (will retry if still leader)",
932				"error", err,
933			)
934			if (1 << failedAttempts) < aclReplicationMaxRetryBackoff {
935				failedAttempts++
936			}
937
938			select {
939			case <-ctx.Done():
940				return nil
941			case <-time.After((1 << failedAttempts) * time.Second):
942				// do nothing
943			}
944		} else {
945			metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
946				1,
947			)
948			metrics.SetGauge([]string{"leader", "replication", metricName, "index"},
949				float32(index),
950			)
951			lastRemoteIndex = index
952			s.updateACLReplicationStatusIndex(replicationType, index)
953			logger.Debug("ACL replication completed through remote index",
954				"index", index,
955			)
956			failedAttempts = 0
957		}
958	}
959}
960
961func (s *Server) aclReplicationLogger(singularNoun string) hclog.Logger {
962	return s.loggers.
963		Named(logging.Replication).
964		Named(logging.ACL).
965		Named(singularNoun)
966}
967
968func (s *Server) stopACLReplication() {
969	// these will be no-ops when not started
970	s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName)
971	s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
972	s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
973	s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
974}
975
976func (s *Server) startConfigReplication(ctx context.Context) {
977	if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
978		// replication shouldn't run in the primary DC
979		return
980	}
981
982	s.leaderRoutineManager.Start(ctx, configReplicationRoutineName, s.configReplicator.Run)
983}
984
985func (s *Server) stopConfigReplication() {
986	// will be a no-op when not started
987	s.leaderRoutineManager.Stop(configReplicationRoutineName)
988}
989
990func (s *Server) startFederationStateReplication(ctx context.Context) {
991	if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
992		// replication shouldn't run in the primary DC
993		return
994	}
995
996	if s.gatewayLocator != nil {
997		s.gatewayLocator.SetUseReplicationSignal(true)
998		s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
999	}
1000
1001	s.leaderRoutineManager.Start(ctx, federationStateReplicationRoutineName, s.federationStateReplicator.Run)
1002}
1003
1004func (s *Server) stopFederationStateReplication() {
1005	// will be a no-op when not started
1006	s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
1007
1008	if s.gatewayLocator != nil {
1009		s.gatewayLocator.SetUseReplicationSignal(false)
1010		s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
1011	}
1012}
1013
1014// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
1015func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
1016	logger := s.loggers.Named(logging.Autopilot)
1017	state := s.fsm.State()
1018	_, config, err := state.AutopilotConfig()
1019	if err != nil {
1020		logger.Error("failed to get config", "error", err)
1021		return nil
1022	}
1023	if config != nil {
1024		return config
1025	}
1026
1027	config = s.config.AutopilotConfig
1028	req := structs.AutopilotSetConfigRequest{Config: *config}
1029	if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil {
1030		logger.Error("failed to initialize config", "error", err)
1031		return nil
1032	}
1033
1034	return config
1035}
1036
1037func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
1038	if s.config.PrimaryDatacenter != "" && s.config.PrimaryDatacenter != s.config.Datacenter {
1039		// only bootstrap in the primary datacenter
1040		return nil
1041	}
1042
1043	if len(entries) < 1 {
1044		// nothing to initialize
1045		return nil
1046	}
1047
1048	if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minCentralizedConfigVersion); !ok {
1049		s.loggers.
1050			Named(logging.CentralConfig).
1051			Warn("config: can't initialize until all servers >=" + minCentralizedConfigVersion.String())
1052		return nil
1053	}
1054
1055	state := s.fsm.State()
1056
1057	// Do some quick preflight checks to see if someone is doing something
1058	// that's not allowed at this time:
1059	//
1060	// - Trying to upgrade from an older pre-1.9.0 version of consul with
1061	// intentions AND are trying to bootstrap a service-intentions config entry
1062	// at the same time.
1063	//
1064	// - Trying to insert service-intentions config entries when connect is
1065	// disabled.
1066
1067	usingConfigEntries, err := s.fsm.State().AreIntentionsInConfigEntries()
1068	if err != nil {
1069		return fmt.Errorf("Failed to determine if we are migrating intentions yet: %v", err)
1070	}
1071
1072	if !usingConfigEntries || !s.config.ConnectEnabled {
1073		for _, entry := range entries {
1074			if entry.GetKind() == structs.ServiceIntentions {
1075				if !s.config.ConnectEnabled {
1076					return fmt.Errorf("Refusing to apply configuration entry %q / %q because Connect must be enabled to bootstrap intentions",
1077						entry.GetKind(), entry.GetName())
1078				}
1079				if !usingConfigEntries {
1080					return fmt.Errorf("Refusing to apply configuration entry %q / %q because intentions are still being migrated to config entries",
1081						entry.GetKind(), entry.GetName())
1082				}
1083			}
1084		}
1085	}
1086
1087	for _, entry := range entries {
1088		// avoid a round trip through Raft if we know the CAS is going to fail
1089		_, existing, err := state.ConfigEntry(nil, entry.GetKind(), entry.GetName(), entry.GetEnterpriseMeta())
1090		if err != nil {
1091			return fmt.Errorf("Failed to determine whether the configuration for %q / %q already exists: %v", entry.GetKind(), entry.GetName(), err)
1092		}
1093
1094		if existing == nil {
1095			// ensure the ModifyIndex is set to 0 for the CAS request
1096			entry.GetRaftIndex().ModifyIndex = 0
1097
1098			req := structs.ConfigEntryRequest{
1099				Op:         structs.ConfigEntryUpsertCAS,
1100				Datacenter: s.config.Datacenter,
1101				Entry:      entry,
1102			}
1103
1104			_, err := s.raftApply(structs.ConfigEntryRequestType, &req)
1105			if err != nil {
1106				return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err)
1107			}
1108		}
1109	}
1110	return nil
1111}
1112
1113// reconcileReaped is used to reconcile nodes that have failed and been reaped
1114// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
1115// We generate a "reap" event to cause the node to be cleaned up.
1116func (s *Server) reconcileReaped(known map[string]struct{}) error {
1117	state := s.fsm.State()
1118	_, checks, err := state.ChecksInState(nil, api.HealthAny, structs.DefaultEnterpriseMeta())
1119	if err != nil {
1120		return err
1121	}
1122	for _, check := range checks {
1123		// Ignore any non serf checks
1124		if check.CheckID != structs.SerfCheckID {
1125			continue
1126		}
1127
1128		// Check if this node is "known" by serf
1129		if _, ok := known[check.Node]; ok {
1130			continue
1131		}
1132
1133		// Get the node services, look for ConsulServiceID
1134		_, services, err := state.NodeServices(nil, check.Node, structs.DefaultEnterpriseMeta())
1135		if err != nil {
1136			return err
1137		}
1138		serverPort := 0
1139		serverAddr := ""
1140		serverID := ""
1141
1142	CHECKS:
1143		for _, service := range services.Services {
1144			if service.ID == structs.ConsulServiceID {
1145				_, node, err := state.GetNode(check.Node)
1146				if err != nil {
1147					s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err)
1148					continue CHECKS
1149				}
1150
1151				serverAddr = node.Address
1152				serverPort = service.Port
1153				lookupAddr := net.JoinHostPort(serverAddr, strconv.Itoa(serverPort))
1154				svr := s.serverLookup.Server(raft.ServerAddress(lookupAddr))
1155				if svr != nil {
1156					serverID = svr.ID
1157				}
1158				break
1159			}
1160		}
1161
1162		// Create a fake member
1163		member := serf.Member{
1164			Name: check.Node,
1165			Tags: map[string]string{
1166				"dc":   s.config.Datacenter,
1167				"role": "node",
1168			},
1169		}
1170
1171		// Create the appropriate tags if this was a server node
1172		if serverPort > 0 {
1173			member.Tags["role"] = "consul"
1174			member.Tags["port"] = strconv.FormatUint(uint64(serverPort), 10)
1175			member.Tags["id"] = serverID
1176			member.Addr = net.ParseIP(serverAddr)
1177		}
1178
1179		// Attempt to reap this member
1180		if err := s.handleReapMember(member); err != nil {
1181			return err
1182		}
1183	}
1184	return nil
1185}
1186
1187// reconcileMember is used to do an async reconcile of a single
1188// serf member
1189func (s *Server) reconcileMember(member serf.Member) error {
1190	// Check if this is a member we should handle
1191	if !s.shouldHandleMember(member) {
1192		s.logger.Warn("skipping reconcile of node", "member", member)
1193		return nil
1194	}
1195	defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now())
1196	var err error
1197	switch member.Status {
1198	case serf.StatusAlive:
1199		err = s.handleAliveMember(member)
1200	case serf.StatusFailed:
1201		err = s.handleFailedMember(member)
1202	case serf.StatusLeft:
1203		err = s.handleLeftMember(member)
1204	case StatusReap:
1205		err = s.handleReapMember(member)
1206	}
1207	if err != nil {
1208		s.logger.Error("failed to reconcile member",
1209			"member", member,
1210			"error", err,
1211		)
1212
1213		// Permission denied should not bubble up
1214		if acl.IsErrPermissionDenied(err) {
1215			return nil
1216		}
1217	}
1218	return nil
1219}
1220
1221// shouldHandleMember checks if this is a Consul pool member
1222func (s *Server) shouldHandleMember(member serf.Member) bool {
1223	if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
1224		return true
1225	}
1226	if valid, parts := metadata.IsConsulServer(member); valid &&
1227		parts.Segment == "" &&
1228		parts.Datacenter == s.config.Datacenter {
1229		return true
1230	}
1231	return false
1232}
1233
1234// handleAliveMember is used to ensure the node
1235// is registered, with a passing health check.
1236func (s *Server) handleAliveMember(member serf.Member) error {
1237	// Register consul service if a server
1238	var service *structs.NodeService
1239	if valid, parts := metadata.IsConsulServer(member); valid {
1240		service = &structs.NodeService{
1241			ID:      structs.ConsulServiceID,
1242			Service: structs.ConsulServiceName,
1243			Port:    parts.Port,
1244			Weights: &structs.Weights{
1245				Passing: 1,
1246				Warning: 1,
1247			},
1248			Meta: map[string]string{
1249				// DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul
1250				"non_voter":             strconv.FormatBool(member.Tags["nonvoter"] == "1"),
1251				"read_replica":          strconv.FormatBool(member.Tags["read_replica"] == "1"),
1252				"raft_version":          strconv.Itoa(parts.RaftVersion),
1253				"serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10),
1254				"serf_protocol_min":     strconv.FormatUint(uint64(member.ProtocolMin), 10),
1255				"serf_protocol_max":     strconv.FormatUint(uint64(member.ProtocolMax), 10),
1256				"version":               parts.Build.String(),
1257			},
1258		}
1259
1260		// Attempt to join the consul server
1261		if err := s.joinConsulServer(member, parts); err != nil {
1262			return err
1263		}
1264	}
1265
1266	// Check if the node exists
1267	state := s.fsm.State()
1268	_, node, err := state.GetNode(member.Name)
1269	if err != nil {
1270		return err
1271	}
1272	if node != nil && node.Address == member.Addr.String() {
1273		// Check if the associated service is available
1274		if service != nil {
1275			match := false
1276			_, services, err := state.NodeServices(nil, member.Name, structs.DefaultEnterpriseMeta())
1277			if err != nil {
1278				return err
1279			}
1280			if services != nil {
1281				for id, serv := range services.Services {
1282					if id == service.ID {
1283						// If metadata are different, be sure to update it
1284						match = reflect.DeepEqual(serv.Meta, service.Meta)
1285					}
1286				}
1287			}
1288			if !match {
1289				goto AFTER_CHECK
1290			}
1291		}
1292
1293		// Check if the serfCheck is in the passing state
1294		_, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMeta())
1295		if err != nil {
1296			return err
1297		}
1298		for _, check := range checks {
1299			if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing {
1300				return nil
1301			}
1302		}
1303	}
1304AFTER_CHECK:
1305	s.logger.Info("member joined, marking health alive", "member", member.Name)
1306
1307	// Register with the catalog.
1308	req := structs.RegisterRequest{
1309		Datacenter: s.config.Datacenter,
1310		Node:       member.Name,
1311		ID:         types.NodeID(member.Tags["id"]),
1312		Address:    member.Addr.String(),
1313		Service:    service,
1314		Check: &structs.HealthCheck{
1315			Node:    member.Name,
1316			CheckID: structs.SerfCheckID,
1317			Name:    structs.SerfCheckName,
1318			Status:  api.HealthPassing,
1319			Output:  structs.SerfCheckAliveOutput,
1320		},
1321	}
1322	if node != nil {
1323		req.TaggedAddresses = node.TaggedAddresses
1324		req.NodeMeta = node.Meta
1325	}
1326
1327	_, err = s.raftApply(structs.RegisterRequestType, &req)
1328	return err
1329}
1330
1331// handleFailedMember is used to mark the node's status
1332// as being critical, along with all checks as unknown.
1333func (s *Server) handleFailedMember(member serf.Member) error {
1334	// Check if the node exists
1335	state := s.fsm.State()
1336	_, node, err := state.GetNode(member.Name)
1337	if err != nil {
1338		return err
1339	}
1340
1341	if node == nil {
1342		s.logger.Info("ignoring failed event for member because it does not exist in the catalog", "member", member.Name)
1343		return nil
1344	}
1345
1346	if node.Address == member.Addr.String() {
1347		// Check if the serfCheck is in the critical state
1348		_, checks, err := state.NodeChecks(nil, member.Name, structs.DefaultEnterpriseMeta())
1349		if err != nil {
1350			return err
1351		}
1352		for _, check := range checks {
1353			if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical {
1354				return nil
1355			}
1356		}
1357	}
1358	s.logger.Info("member failed, marking health critical", "member", member.Name)
1359
1360	// Register with the catalog
1361	req := structs.RegisterRequest{
1362		Datacenter: s.config.Datacenter,
1363		Node:       member.Name,
1364		ID:         types.NodeID(member.Tags["id"]),
1365		Address:    member.Addr.String(),
1366		Check: &structs.HealthCheck{
1367			Node:    member.Name,
1368			CheckID: structs.SerfCheckID,
1369			Name:    structs.SerfCheckName,
1370			Status:  api.HealthCritical,
1371			Output:  structs.SerfCheckFailedOutput,
1372		},
1373
1374		// If there's existing information about the node, do not
1375		// clobber it.
1376		SkipNodeUpdate: true,
1377	}
1378	_, err = s.raftApply(structs.RegisterRequestType, &req)
1379	return err
1380}
1381
1382// handleLeftMember is used to handle members that gracefully
1383// left. They are deregistered if necessary.
1384func (s *Server) handleLeftMember(member serf.Member) error {
1385	return s.handleDeregisterMember("left", member)
1386}
1387
1388// handleReapMember is used to handle members that have been
1389// reaped after a prolonged failure. They are deregistered.
1390func (s *Server) handleReapMember(member serf.Member) error {
1391	return s.handleDeregisterMember("reaped", member)
1392}
1393
1394// handleDeregisterMember is used to deregister a member of a given reason
1395func (s *Server) handleDeregisterMember(reason string, member serf.Member) error {
1396	// Do not deregister ourself. This can only happen if the current leader
1397	// is leaving. Instead, we should allow a follower to take-over and
1398	// deregister us later.
1399	if member.Name == s.config.NodeName {
1400		s.logger.Warn("deregistering self should be done by follower", "name", s.config.NodeName)
1401		return nil
1402	}
1403
1404	// Remove from Raft peers if this was a server
1405	if valid, _ := metadata.IsConsulServer(member); valid {
1406		if err := s.removeConsulServer(member); err != nil {
1407			return err
1408		}
1409	}
1410
1411	// Check if the node does not exist
1412	state := s.fsm.State()
1413	_, node, err := state.GetNode(member.Name)
1414	if err != nil {
1415		return err
1416	}
1417	if node == nil {
1418		return nil
1419	}
1420
1421	// Deregister the node
1422	s.logger.Info("deregistering member", "member", member.Name, "reason", reason)
1423	req := structs.DeregisterRequest{
1424		Datacenter: s.config.Datacenter,
1425		Node:       member.Name,
1426	}
1427	_, err = s.raftApply(structs.DeregisterRequestType, &req)
1428	return err
1429}
1430
1431// joinConsulServer is used to try to join another consul server
1432func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
1433	// Check for possibility of multiple bootstrap nodes
1434	if parts.Bootstrap {
1435		members := s.serfLAN.Members()
1436		for _, member := range members {
1437			valid, p := metadata.IsConsulServer(member)
1438			if valid && member.Name != m.Name && p.Bootstrap {
1439				s.logger.Error("Two nodes are in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.",
1440					"node_to_add", m.Name,
1441					"other", member.Name,
1442				)
1443				return nil
1444			}
1445		}
1446	}
1447
1448	// We used to do a check here and prevent adding the server if the cluster size was too small (1 or 2 servers) as a means
1449	// of preventing the case where we may remove ourselves and cause a loss of leadership. The Autopilot AddServer function
1450	// will now handle simple address updates better and so long as the address doesn't conflict with another node
1451	// it will not require a removal but will instead just update the address. If it would require a removal of other nodes
1452	// due to conflicts then the logic regarding cluster sizes will kick in and prevent doing anything dangerous that could
1453	// cause loss of leadership.
1454
1455	// get the autpilot library version of a server from the serf member
1456	apServer, err := s.autopilotServer(m)
1457	if err != nil {
1458		return err
1459	}
1460
1461	// now ask autopilot to add it
1462	return s.autopilot.AddServer(apServer)
1463}
1464
1465// removeConsulServer is used to try to remove a consul server that has left
1466func (s *Server) removeConsulServer(m serf.Member) error {
1467	server, err := s.autopilotServer(m)
1468	if err != nil || server == nil {
1469		return err
1470	}
1471
1472	return s.autopilot.RemoveServer(server.ID)
1473}
1474
1475// reapTombstones is invoked by the current leader to manage garbage
1476// collection of tombstones. When a key is deleted, we trigger a tombstone
1477// GC clock. Once the expiration is reached, this routine is invoked
1478// to clear all tombstones before this index. This must be replicated
1479// through Raft to ensure consistency. We do this outside the leader loop
1480// to avoid blocking.
1481func (s *Server) reapTombstones(index uint64) {
1482	defer metrics.MeasureSince([]string{"leader", "reapTombstones"}, time.Now())
1483	req := structs.TombstoneRequest{
1484		Datacenter: s.config.Datacenter,
1485		Op:         structs.TombstoneReap,
1486		ReapIndex:  index,
1487	}
1488	_, err := s.raftApply(structs.TombstoneRequestType, &req)
1489	if err != nil {
1490		s.logger.Error("failed to reap tombstones up to index",
1491			"index", index,
1492			"error", err,
1493		)
1494	}
1495}
1496
1497func (s *Server) setDatacenterSupportsFederationStates() {
1498	atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
1499}
1500
1501func (s *Server) DatacenterSupportsFederationStates() bool {
1502	if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
1503		return true
1504	}
1505
1506	state := serversFederationStatesInfo{
1507		supported: true,
1508		found:     false,
1509	}
1510
1511	// if we are in a secondary, check if they are supported in the primary dc
1512	if s.config.PrimaryDatacenter != s.config.Datacenter {
1513		s.router.CheckServers(s.config.PrimaryDatacenter, state.update)
1514
1515		if !state.supported || !state.found {
1516			s.logger.Debug("federation states are not enabled in the primary dc")
1517			return false
1518		}
1519	}
1520
1521	// check the servers in the local DC
1522	s.router.CheckServers(s.config.Datacenter, state.update)
1523
1524	if state.supported && state.found {
1525		s.setDatacenterSupportsFederationStates()
1526		return true
1527	}
1528
1529	s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter)
1530	return false
1531}
1532
1533type serversFederationStatesInfo struct {
1534	// supported indicates whether every processed server supports federation states
1535	supported bool
1536
1537	// found indicates that at least one server was processed
1538	found bool
1539}
1540
1541func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool {
1542	if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
1543		// they are left or something so regardless we treat these servers as meeting
1544		// the version requirement
1545		return true
1546	}
1547
1548	// mark that we processed at least one server
1549	s.found = true
1550
1551	if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 {
1552		return true
1553	}
1554
1555	// mark that at least one server does not support federation states
1556	s.supported = false
1557
1558	// prevent continuing server evaluation
1559	return false
1560}
1561
1562func (s *Server) setDatacenterSupportsIntentionsAsConfigEntries() {
1563	atomic.StoreInt32(&s.dcSupportsIntentionsAsConfigEntries, 1)
1564}
1565
1566func (s *Server) DatacenterSupportsIntentionsAsConfigEntries() bool {
1567	if atomic.LoadInt32(&s.dcSupportsIntentionsAsConfigEntries) != 0 {
1568		return true
1569	}
1570
1571	state := serversIntentionsAsConfigEntriesInfo{
1572		supported: true,
1573		found:     false,
1574	}
1575
1576	// if we are in a secondary, check if they are supported in the primary dc
1577	if s.config.PrimaryDatacenter != s.config.Datacenter {
1578		s.router.CheckServers(s.config.PrimaryDatacenter, state.update)
1579
1580		if !state.supported || !state.found {
1581			s.logger.Debug("intentions have not been migrated to config entries in the primary dc yet")
1582			return false
1583		}
1584	}
1585
1586	// check the servers in the local DC
1587	s.router.CheckServers(s.config.Datacenter, state.update)
1588
1589	if state.supported && state.found {
1590		s.setDatacenterSupportsIntentionsAsConfigEntries()
1591		return true
1592	}
1593
1594	s.logger.Debug("intentions cannot be migrated to config entries in this datacenter", "datacenter", s.config.Datacenter)
1595	return false
1596}
1597
1598type serversIntentionsAsConfigEntriesInfo struct {
1599	// supported indicates whether every processed server supports intentions as config entries
1600	supported bool
1601
1602	// found indicates that at least one server was processed
1603	found bool
1604}
1605
1606func (s *serversIntentionsAsConfigEntriesInfo) update(srv *metadata.Server) bool {
1607	if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
1608		// they are left or something so regardless we treat these servers as meeting
1609		// the version requirement
1610		return true
1611	}
1612
1613	// mark that we processed at least one server
1614	s.found = true
1615
1616	if supported, ok := srv.FeatureFlags["si"]; ok && supported == 1 {
1617		return true
1618	}
1619
1620	// mark that at least one server does not support service-intentions
1621	s.supported = false
1622
1623	// prevent continuing server evaluation
1624	return false
1625}
1626