1package autopilot
2
3//
4// The methods in this file are all mainly to provide synchronous methods
5// for Raft operations that would normally return futures.
6//
7
8import (
9	"fmt"
10	"strconv"
11
12	"github.com/hashicorp/raft"
13)
14
15func requiredQuorum(voters int) int {
16	return (voters / 2) + 1
17}
18
19// NumVoters is a helper for calculating the number of voting peers in the
20// current raft configuration. This function ignores any autopilot state
21// and will make the calculation based on a newly retrieved Raft configuration.
22func (a *Autopilot) NumVoters() (int, error) {
23	cfg, err := a.getRaftConfiguration()
24	if err != nil {
25		return 0, err
26	}
27
28	var numVoters int
29	for _, server := range cfg.Servers {
30		if server.Suffrage == raft.Voter {
31			numVoters++
32		}
33	}
34
35	return numVoters, nil
36}
37
38// AddServer is a helper for adding a new server to the raft configuration.
39// This may remove servers with duplicate addresses or ids first and after
40// its all done will trigger autopilot to remove dead servers if there
41// are any. Servers added by this method will start in a non-voting
42// state and later on autopilot will promote them to voting status
43// if desired by the configured promoter. If too many removals would
44// be required that would cause leadership loss then an error is returned
45// instead of performing any Raft configuration changes.
46func (a *Autopilot) AddServer(s *Server) error {
47	cfg, err := a.getRaftConfiguration()
48	if err != nil {
49		a.logger.Error("failed to get raft configuration", "error", err)
50		return err
51	}
52
53	var existingVoter bool
54	var voterRemovals []raft.ServerID
55	var nonVoterRemovals []raft.ServerID
56	var numVoters int
57	for _, server := range cfg.Servers {
58		if server.Suffrage == raft.Voter {
59			numVoters++
60		}
61
62		if server.Address == s.Address && server.ID == s.ID {
63			// nothing to be done as the addr and ID both already match
64			return nil
65		} else if server.ID == s.ID {
66			// special case for address updates only. In this case we should be
67			// able to update the configuration without have to first remove the server
68			if server.Suffrage == raft.Voter || server.Suffrage == raft.Staging {
69				existingVoter = true
70			}
71		} else if server.Address == s.Address {
72			if server.Suffrage == raft.Voter {
73				voterRemovals = append(voterRemovals, server.ID)
74			} else {
75				nonVoterRemovals = append(nonVoterRemovals, server.ID)
76			}
77		}
78	}
79
80	requiredVoters := requiredQuorum(numVoters)
81	if len(voterRemovals) > numVoters-requiredVoters {
82		return fmt.Errorf("Preventing server addition that would require removal of too many servers and cause cluster instability")
83	}
84
85	for _, id := range voterRemovals {
86		if err := a.removeServer(id); err != nil {
87			return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
88		}
89		a.logger.Info("removed server with duplicate address", "address", s.Address)
90	}
91
92	for _, id := range nonVoterRemovals {
93		if err := a.removeServer(id); err != nil {
94			return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
95		}
96		a.logger.Info("removed server with duplicate address", "address", s.Address)
97	}
98
99	if existingVoter {
100		if err := a.addVoter(s.ID, s.Address); err != nil {
101			return err
102		}
103	} else {
104		if err := a.addNonVoter(s.ID, s.Address); err != nil {
105			return err
106		}
107	}
108
109	// Trigger a check to remove dead servers
110	a.RemoveDeadServers()
111	return nil
112}
113
114// RemoveServer is a helper to remove a server from Raft if it
115// exists in the latest Raft configuration
116func (a *Autopilot) RemoveServer(id raft.ServerID) error {
117	cfg, err := a.getRaftConfiguration()
118	if err != nil {
119		a.logger.Error("failed to get raft configuration", "error", err)
120		return err
121	}
122
123	// only remove servers currently in the configuration
124	for _, server := range cfg.Servers {
125		if server.ID == id {
126			return a.removeServer(server.ID)
127		}
128	}
129
130	return nil
131}
132
133// addNonVoter is a wrapper around calling the AddNonVoter method on the Raft
134// interface object provided to Autopilot
135func (a *Autopilot) addNonVoter(id raft.ServerID, addr raft.ServerAddress) error {
136	addFuture := a.raft.AddNonvoter(id, addr, 0, 0)
137	if err := addFuture.Error(); err != nil {
138		a.logger.Error("failed to add raft non-voting peer", "id", id, "address", addr, "error", err)
139		return err
140	}
141	return nil
142}
143
144// addVoter is a wrapper around calling the AddVoter method on the Raft
145// interface object provided to Autopilot
146func (a *Autopilot) addVoter(id raft.ServerID, addr raft.ServerAddress) error {
147	addFuture := a.raft.AddVoter(id, addr, 0, 0)
148	if err := addFuture.Error(); err != nil {
149		a.logger.Error("failed to add raft voting peer", "id", id, "address", addr, "error", err)
150		return err
151	}
152	return nil
153}
154
155func (a *Autopilot) demoteVoter(id raft.ServerID) error {
156	removeFuture := a.raft.DemoteVoter(id, 0, 0)
157	if err := removeFuture.Error(); err != nil {
158		a.logger.Error("failed to demote raft peer", "id", id, "error", err)
159		return err
160	}
161	return nil
162}
163
164// removeServer is a wrapper around calling the RemoveServer method on the
165// Raft interface object provided to Autopilot
166func (a *Autopilot) removeServer(id raft.ServerID) error {
167	a.logger.Debug("removing server by ID", "id", id)
168	future := a.raft.RemoveServer(id, 0, 0)
169	if err := future.Error(); err != nil {
170		a.logger.Error("failed to remove raft server",
171			"id", id,
172			"error", err,
173		)
174		return err
175	}
176	a.logger.Info("removed server", "id", id)
177	return nil
178}
179
180// getRaftConfiguration a wrapper arond calling the GetConfiguration method
181// on the Raft interface object provided to Autopilot
182func (a *Autopilot) getRaftConfiguration() (*raft.Configuration, error) {
183	configFuture := a.raft.GetConfiguration()
184	if err := configFuture.Error(); err != nil {
185		return nil, err
186	}
187	cfg := configFuture.Configuration()
188	return &cfg, nil
189}
190
191// lastTerm will retrieve the raft stats and then pull the last term value out of it
192func (a *Autopilot) lastTerm() (uint64, error) {
193	return strconv.ParseUint(a.raft.Stats()["last_log_term"], 10, 64)
194}
195
196// leadershipTransfer will transfer leadership to the server with the specified id and address
197func (a *Autopilot) leadershipTransfer(id raft.ServerID, address raft.ServerAddress) error {
198	a.logger.Info("Transferring leadership to new server", "id", id, "address", address)
199	future := a.raft.LeadershipTransferToServer(id, address)
200	return future.Error()
201}
202