1package autopilot 2 3// 4// The methods in this file are all mainly to provide synchronous methods 5// for Raft operations that would normally return futures. 6// 7 8import ( 9 "fmt" 10 "strconv" 11 12 "github.com/hashicorp/raft" 13) 14 15func requiredQuorum(voters int) int { 16 return (voters / 2) + 1 17} 18 19// NumVoters is a helper for calculating the number of voting peers in the 20// current raft configuration. This function ignores any autopilot state 21// and will make the calculation based on a newly retrieved Raft configuration. 22func (a *Autopilot) NumVoters() (int, error) { 23 cfg, err := a.getRaftConfiguration() 24 if err != nil { 25 return 0, err 26 } 27 28 var numVoters int 29 for _, server := range cfg.Servers { 30 if server.Suffrage == raft.Voter { 31 numVoters++ 32 } 33 } 34 35 return numVoters, nil 36} 37 38// AddServer is a helper for adding a new server to the raft configuration. 39// This may remove servers with duplicate addresses or ids first and after 40// its all done will trigger autopilot to remove dead servers if there 41// are any. Servers added by this method will start in a non-voting 42// state and later on autopilot will promote them to voting status 43// if desired by the configured promoter. If too many removals would 44// be required that would cause leadership loss then an error is returned 45// instead of performing any Raft configuration changes. 46func (a *Autopilot) AddServer(s *Server) error { 47 cfg, err := a.getRaftConfiguration() 48 if err != nil { 49 a.logger.Error("failed to get raft configuration", "error", err) 50 return err 51 } 52 53 var existingVoter bool 54 var voterRemovals []raft.ServerID 55 var nonVoterRemovals []raft.ServerID 56 var numVoters int 57 for _, server := range cfg.Servers { 58 if server.Suffrage == raft.Voter { 59 numVoters++ 60 } 61 62 if server.Address == s.Address && server.ID == s.ID { 63 // nothing to be done as the addr and ID both already match 64 return nil 65 } else if server.ID == s.ID { 66 // special case for address updates only. In this case we should be 67 // able to update the configuration without have to first remove the server 68 if server.Suffrage == raft.Voter || server.Suffrage == raft.Staging { 69 existingVoter = true 70 } 71 } else if server.Address == s.Address { 72 if server.Suffrage == raft.Voter { 73 voterRemovals = append(voterRemovals, server.ID) 74 } else { 75 nonVoterRemovals = append(nonVoterRemovals, server.ID) 76 } 77 } 78 } 79 80 requiredVoters := requiredQuorum(numVoters) 81 if len(voterRemovals) > numVoters-requiredVoters { 82 return fmt.Errorf("Preventing server addition that would require removal of too many servers and cause cluster instability") 83 } 84 85 for _, id := range voterRemovals { 86 if err := a.removeServer(id); err != nil { 87 return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err) 88 } 89 a.logger.Info("removed server with duplicate address", "address", s.Address) 90 } 91 92 for _, id := range nonVoterRemovals { 93 if err := a.removeServer(id); err != nil { 94 return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err) 95 } 96 a.logger.Info("removed server with duplicate address", "address", s.Address) 97 } 98 99 if existingVoter { 100 if err := a.addVoter(s.ID, s.Address); err != nil { 101 return err 102 } 103 } else { 104 if err := a.addNonVoter(s.ID, s.Address); err != nil { 105 return err 106 } 107 } 108 109 // Trigger a check to remove dead servers 110 a.RemoveDeadServers() 111 return nil 112} 113 114// RemoveServer is a helper to remove a server from Raft if it 115// exists in the latest Raft configuration 116func (a *Autopilot) RemoveServer(id raft.ServerID) error { 117 cfg, err := a.getRaftConfiguration() 118 if err != nil { 119 a.logger.Error("failed to get raft configuration", "error", err) 120 return err 121 } 122 123 // only remove servers currently in the configuration 124 for _, server := range cfg.Servers { 125 if server.ID == id { 126 return a.removeServer(server.ID) 127 } 128 } 129 130 return nil 131} 132 133// addNonVoter is a wrapper around calling the AddNonVoter method on the Raft 134// interface object provided to Autopilot 135func (a *Autopilot) addNonVoter(id raft.ServerID, addr raft.ServerAddress) error { 136 addFuture := a.raft.AddNonvoter(id, addr, 0, 0) 137 if err := addFuture.Error(); err != nil { 138 a.logger.Error("failed to add raft non-voting peer", "id", id, "address", addr, "error", err) 139 return err 140 } 141 return nil 142} 143 144// addVoter is a wrapper around calling the AddVoter method on the Raft 145// interface object provided to Autopilot 146func (a *Autopilot) addVoter(id raft.ServerID, addr raft.ServerAddress) error { 147 addFuture := a.raft.AddVoter(id, addr, 0, 0) 148 if err := addFuture.Error(); err != nil { 149 a.logger.Error("failed to add raft voting peer", "id", id, "address", addr, "error", err) 150 return err 151 } 152 return nil 153} 154 155func (a *Autopilot) demoteVoter(id raft.ServerID) error { 156 removeFuture := a.raft.DemoteVoter(id, 0, 0) 157 if err := removeFuture.Error(); err != nil { 158 a.logger.Error("failed to demote raft peer", "id", id, "error", err) 159 return err 160 } 161 return nil 162} 163 164// removeServer is a wrapper around calling the RemoveServer method on the 165// Raft interface object provided to Autopilot 166func (a *Autopilot) removeServer(id raft.ServerID) error { 167 a.logger.Debug("removing server by ID", "id", id) 168 future := a.raft.RemoveServer(id, 0, 0) 169 if err := future.Error(); err != nil { 170 a.logger.Error("failed to remove raft server", 171 "id", id, 172 "error", err, 173 ) 174 return err 175 } 176 a.logger.Info("removed server", "id", id) 177 return nil 178} 179 180// getRaftConfiguration a wrapper arond calling the GetConfiguration method 181// on the Raft interface object provided to Autopilot 182func (a *Autopilot) getRaftConfiguration() (*raft.Configuration, error) { 183 configFuture := a.raft.GetConfiguration() 184 if err := configFuture.Error(); err != nil { 185 return nil, err 186 } 187 cfg := configFuture.Configuration() 188 return &cfg, nil 189} 190 191// lastTerm will retrieve the raft stats and then pull the last term value out of it 192func (a *Autopilot) lastTerm() (uint64, error) { 193 return strconv.ParseUint(a.raft.Stats()["last_log_term"], 10, 64) 194} 195 196// leadershipTransfer will transfer leadership to the server with the specified id and address 197func (a *Autopilot) leadershipTransfer(id raft.ServerID, address raft.ServerAddress) error { 198 a.logger.Info("Transferring leadership to new server", "id", id, "address", address) 199 future := a.raft.LeadershipTransferToServer(id, address) 200 return future.Error() 201} 202