1package consul
2
3import (
4	"context"
5	"fmt"
6	"strconv"
7	"sync"
8	"time"
9
10	"github.com/armon/go-metrics"
11	"github.com/hashicorp/consul/agent/metadata"
12	"github.com/hashicorp/consul/agent/structs"
13	"github.com/hashicorp/go-version"
14	"github.com/hashicorp/raft"
15	"github.com/hashicorp/serf/serf"
16)
17
18// AutopilotPolicy is the interface for the Autopilot mechanism
19type AutopilotPolicy interface {
20	// PromoteNonVoters defines the handling of non-voting servers
21	PromoteNonVoters(*structs.AutopilotConfig) error
22}
23
24func (s *Server) startAutopilot() {
25	s.autopilotShutdownCh = make(chan struct{})
26	s.autopilotWaitGroup = sync.WaitGroup{}
27	s.autopilotWaitGroup.Add(1)
28
29	go s.autopilotLoop()
30}
31
32func (s *Server) stopAutopilot() {
33	close(s.autopilotShutdownCh)
34	s.autopilotWaitGroup.Wait()
35}
36
37var minAutopilotVersion = version.Must(version.NewVersion("0.8.0"))
38
39// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove.
40func (s *Server) autopilotLoop() {
41	defer s.autopilotWaitGroup.Done()
42
43	// Monitor server health until shutdown
44	ticker := time.NewTicker(s.config.AutopilotInterval)
45	defer ticker.Stop()
46
47	for {
48		select {
49		case <-s.autopilotShutdownCh:
50			return
51		case <-ticker.C:
52			autopilotConfig, ok := s.getOrCreateAutopilotConfig()
53			if !ok {
54				continue
55			}
56
57			if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil {
58				s.logger.Printf("[ERR] autopilot: error checking for non-voters to promote: %s", err)
59			}
60
61			if err := s.pruneDeadServers(autopilotConfig); err != nil {
62				s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
63			}
64		case <-s.autopilotRemoveDeadCh:
65			autopilotConfig, ok := s.getOrCreateAutopilotConfig()
66			if !ok {
67				continue
68			}
69
70			if err := s.pruneDeadServers(autopilotConfig); err != nil {
71				s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err)
72			}
73		}
74	}
75}
76
77// pruneDeadServers removes up to numPeers/2 failed servers
78func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error {
79	// Find any failed servers
80	var failed []string
81	staleRaftServers := make(map[string]raft.Server)
82	if autopilotConfig.CleanupDeadServers {
83		future := s.raft.GetConfiguration()
84		if err := future.Error(); err != nil {
85			return err
86		}
87
88		for _, server := range future.Configuration().Servers {
89			staleRaftServers[string(server.Address)] = server
90		}
91
92		for _, member := range s.serfLAN.Members() {
93			valid, parts := metadata.IsConsulServer(member)
94
95			if valid {
96				// Remove this server from the stale list; it has a serf entry
97				if _, ok := staleRaftServers[parts.Addr.String()]; ok {
98					delete(staleRaftServers, parts.Addr.String())
99				}
100
101				if member.Status == serf.StatusFailed {
102					failed = append(failed, member.Name)
103				}
104			}
105		}
106	}
107
108	removalCount := len(failed) + len(staleRaftServers)
109
110	// Nothing to remove, return early
111	if removalCount == 0 {
112		return nil
113	}
114
115	peers, err := s.numPeers()
116	if err != nil {
117		return err
118	}
119
120	// Only do removals if a minority of servers will be affected
121	if removalCount < peers/2 {
122		for _, server := range failed {
123			s.logger.Printf("[INFO] autopilot: Attempting removal of failed server: %v", server)
124			go s.serfLAN.RemoveFailedNode(server)
125		}
126
127		minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members())
128		if err != nil {
129			return err
130		}
131		for _, raftServer := range staleRaftServers {
132			var future raft.Future
133			if minRaftProtocol >= 2 {
134				s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID)
135				future = s.raft.RemoveServer(raftServer.ID, 0, 0)
136			} else {
137				s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID)
138				future = s.raft.RemovePeer(raftServer.Address)
139			}
140			if err := future.Error(); err != nil {
141				return err
142			}
143		}
144	} else {
145		s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers)
146	}
147
148	return nil
149}
150
151// BasicAutopilot defines a policy for promoting non-voting servers in a way
152// that maintains an odd-numbered voter count.
153type BasicAutopilot struct {
154	server *Server
155}
156
157// PromoteNonVoters promotes eligible non-voting servers to voters.
158func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error {
159	minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers())
160	if err != nil {
161		return fmt.Errorf("error getting server raft protocol versions: %s", err)
162	}
163
164	// If we don't meet the minimum version for non-voter features, bail early
165	if minRaftProtocol < 3 {
166		return nil
167	}
168
169	future := b.server.raft.GetConfiguration()
170	if err := future.Error(); err != nil {
171		return fmt.Errorf("failed to get raft configuration: %v", err)
172	}
173
174	// Find any non-voters eligible for promotion
175	var promotions []raft.Server
176	voterCount := 0
177	for _, server := range future.Configuration().Servers {
178		// If this server has been stable and passing for long enough, promote it to a voter
179		if !isVoter(server.Suffrage) {
180			health := b.server.getServerHealth(string(server.ID))
181			if health.IsStable(time.Now(), autopilotConfig) {
182				promotions = append(promotions, server)
183			}
184		} else {
185			voterCount++
186		}
187	}
188
189	if _, err := b.server.handlePromotions(voterCount, promotions); err != nil {
190		return err
191	}
192
193	return nil
194}
195
196func (s *Server) handlePromotions(voterCount int, promotions []raft.Server) (bool, error) {
197	if len(promotions) == 0 {
198		return false, nil
199	}
200
201	// If there's currently an even number of servers, we can promote the first server in the list
202	// to get to an odd-sized quorum
203	newServers := false
204	if voterCount%2 == 0 {
205		addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0)
206		if err := addFuture.Error(); err != nil {
207			return newServers, fmt.Errorf("failed to add raft peer: %v", err)
208		}
209		promotions = promotions[1:]
210		newServers = true
211	}
212
213	// Promote remaining servers in twos to maintain an odd quorum size
214	for i := 0; i < len(promotions)-1; i += 2 {
215		addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0)
216		if err := addFirst.Error(); err != nil {
217			return newServers, fmt.Errorf("failed to add raft peer: %v", err)
218		}
219		addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0)
220		if err := addSecond.Error(); err != nil {
221			return newServers, fmt.Errorf("failed to add raft peer: %v", err)
222		}
223		newServers = true
224	}
225
226	// If we added a new server, trigger a check to remove dead servers
227	if newServers {
228		select {
229		case s.autopilotRemoveDeadCh <- struct{}{}:
230		default:
231		}
232	}
233
234	return newServers, nil
235}
236
237// serverHealthLoop monitors the health of the servers in the cluster
238func (s *Server) serverHealthLoop() {
239	// Monitor server health until shutdown
240	ticker := time.NewTicker(s.config.ServerHealthInterval)
241	defer ticker.Stop()
242
243	for {
244		select {
245		case <-s.shutdownCh:
246			return
247		case <-ticker.C:
248			if err := s.updateClusterHealth(); err != nil {
249				s.logger.Printf("[ERR] autopilot: error updating cluster health: %s", err)
250			}
251		}
252	}
253}
254
255// updateClusterHealth fetches the Raft stats of the other servers and updates
256// s.clusterHealth based on the configured Autopilot thresholds
257func (s *Server) updateClusterHealth() error {
258	// Don't do anything if the min Raft version is too low
259	minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers())
260	if err != nil {
261		return fmt.Errorf("error getting server raft protocol versions: %s", err)
262	}
263	if minRaftProtocol < 3 {
264		return nil
265	}
266
267	state := s.fsm.State()
268	_, autopilotConf, err := state.AutopilotConfig()
269	if err != nil {
270		return fmt.Errorf("error retrieving autopilot config: %s", err)
271	}
272	// Bail early if autopilot config hasn't been initialized yet
273	if autopilotConf == nil {
274		return nil
275	}
276
277	// Get the the serf members which are Consul servers
278	serverMap := make(map[string]*metadata.Server)
279	for _, member := range s.LANMembers() {
280		if member.Status == serf.StatusLeft {
281			continue
282		}
283
284		valid, parts := metadata.IsConsulServer(member)
285		if valid {
286			serverMap[parts.ID] = parts
287		}
288	}
289
290	future := s.raft.GetConfiguration()
291	if err := future.Error(); err != nil {
292		return fmt.Errorf("error getting Raft configuration %s", err)
293	}
294	servers := future.Configuration().Servers
295
296	// Fetch the health for each of the servers in parallel so we get as
297	// consistent of a sample as possible. We capture the leader's index
298	// here as well so it roughly lines up with the same point in time.
299	targetLastIndex := s.raft.LastIndex()
300	var fetchList []*metadata.Server
301	for _, server := range servers {
302		if parts, ok := serverMap[string(server.ID)]; ok {
303			fetchList = append(fetchList, parts)
304		}
305	}
306	d := time.Now().Add(s.config.ServerHealthInterval / 2)
307	ctx, cancel := context.WithDeadline(context.Background(), d)
308	defer cancel()
309	fetchedStats := s.statsFetcher.Fetch(ctx, fetchList)
310
311	// Build a current list of server healths
312	leader := s.raft.Leader()
313	var clusterHealth structs.OperatorHealthReply
314	voterCount := 0
315	healthyCount := 0
316	healthyVoterCount := 0
317	for _, server := range servers {
318		health := structs.ServerHealth{
319			ID:          string(server.ID),
320			Address:     string(server.Address),
321			Leader:      server.Address == leader,
322			LastContact: -1,
323			Voter:       server.Suffrage == raft.Voter,
324		}
325
326		parts, ok := serverMap[string(server.ID)]
327		if ok {
328			health.Name = parts.Name
329			health.SerfStatus = parts.Status
330			health.Version = parts.Build.String()
331			if stats, ok := fetchedStats[string(server.ID)]; ok {
332				if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil {
333					s.logger.Printf("[WARN] autopilot: error updating server health: %s", err)
334				}
335			}
336		} else {
337			health.SerfStatus = serf.StatusNone
338		}
339
340		if health.Voter {
341			voterCount++
342		}
343		if health.Healthy {
344			healthyCount++
345			if health.Voter {
346				healthyVoterCount++
347			}
348		}
349
350		clusterHealth.Servers = append(clusterHealth.Servers, health)
351	}
352	clusterHealth.Healthy = healthyCount == len(servers)
353
354	// If we have extra healthy voters, update FailureTolerance
355	requiredQuorum := voterCount/2 + 1
356	if healthyVoterCount > requiredQuorum {
357		clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum
358	}
359
360	// Heartbeat a metric for monitoring if we're the leader
361	if s.IsLeader() {
362		metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
363		metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
364		if clusterHealth.Healthy {
365			metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
366			metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
367		} else {
368			metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
369			metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
370		}
371	}
372
373	s.clusterHealthLock.Lock()
374	s.clusterHealth = clusterHealth
375	s.clusterHealthLock.Unlock()
376
377	return nil
378}
379
380// updateServerHealth computes the resulting health of the server based on its
381// fetched stats and the state of the leader.
382func (s *Server) updateServerHealth(health *structs.ServerHealth,
383	server *metadata.Server, stats *structs.ServerStats,
384	autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error {
385
386	health.LastTerm = stats.LastTerm
387	health.LastIndex = stats.LastIndex
388
389	if stats.LastContact != "never" {
390		var err error
391		health.LastContact, err = time.ParseDuration(stats.LastContact)
392		if err != nil {
393			return fmt.Errorf("error parsing last_contact duration: %s", err)
394		}
395	}
396
397	lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64)
398	if err != nil {
399		return fmt.Errorf("error parsing last_log_term: %s", err)
400	}
401	health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf)
402
403	// If this is a new server or the health changed, reset StableSince
404	lastHealth := s.getServerHealth(server.ID)
405	if lastHealth == nil || lastHealth.Healthy != health.Healthy {
406		health.StableSince = time.Now()
407	} else {
408		health.StableSince = lastHealth.StableSince
409	}
410
411	return nil
412}
413
414func (s *Server) getClusterHealth() structs.OperatorHealthReply {
415	s.clusterHealthLock.RLock()
416	defer s.clusterHealthLock.RUnlock()
417	return s.clusterHealth
418}
419
420func (s *Server) getServerHealth(id string) *structs.ServerHealth {
421	s.clusterHealthLock.RLock()
422	defer s.clusterHealthLock.RUnlock()
423	for _, health := range s.clusterHealth.Servers {
424		if health.ID == id {
425			return &health
426		}
427	}
428	return nil
429}
430
431func isVoter(suffrage raft.ServerSuffrage) bool {
432	switch suffrage {
433	case raft.Voter, raft.Staging:
434		return true
435	default:
436		return false
437	}
438}
439