1package consul 2 3import ( 4 "context" 5 "fmt" 6 "strconv" 7 "sync" 8 "time" 9 10 "github.com/armon/go-metrics" 11 "github.com/hashicorp/consul/agent/metadata" 12 "github.com/hashicorp/consul/agent/structs" 13 "github.com/hashicorp/go-version" 14 "github.com/hashicorp/raft" 15 "github.com/hashicorp/serf/serf" 16) 17 18// AutopilotPolicy is the interface for the Autopilot mechanism 19type AutopilotPolicy interface { 20 // PromoteNonVoters defines the handling of non-voting servers 21 PromoteNonVoters(*structs.AutopilotConfig) error 22} 23 24func (s *Server) startAutopilot() { 25 s.autopilotShutdownCh = make(chan struct{}) 26 s.autopilotWaitGroup = sync.WaitGroup{} 27 s.autopilotWaitGroup.Add(1) 28 29 go s.autopilotLoop() 30} 31 32func (s *Server) stopAutopilot() { 33 close(s.autopilotShutdownCh) 34 s.autopilotWaitGroup.Wait() 35} 36 37var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) 38 39// autopilotLoop periodically looks for nonvoting servers to promote and dead servers to remove. 40func (s *Server) autopilotLoop() { 41 defer s.autopilotWaitGroup.Done() 42 43 // Monitor server health until shutdown 44 ticker := time.NewTicker(s.config.AutopilotInterval) 45 defer ticker.Stop() 46 47 for { 48 select { 49 case <-s.autopilotShutdownCh: 50 return 51 case <-ticker.C: 52 autopilotConfig, ok := s.getOrCreateAutopilotConfig() 53 if !ok { 54 continue 55 } 56 57 if err := s.autopilotPolicy.PromoteNonVoters(autopilotConfig); err != nil { 58 s.logger.Printf("[ERR] autopilot: error checking for non-voters to promote: %s", err) 59 } 60 61 if err := s.pruneDeadServers(autopilotConfig); err != nil { 62 s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err) 63 } 64 case <-s.autopilotRemoveDeadCh: 65 autopilotConfig, ok := s.getOrCreateAutopilotConfig() 66 if !ok { 67 continue 68 } 69 70 if err := s.pruneDeadServers(autopilotConfig); err != nil { 71 s.logger.Printf("[ERR] autopilot: error checking for dead servers to remove: %s", err) 72 } 73 } 74 } 75} 76 77// pruneDeadServers removes up to numPeers/2 failed servers 78func (s *Server) pruneDeadServers(autopilotConfig *structs.AutopilotConfig) error { 79 // Find any failed servers 80 var failed []string 81 staleRaftServers := make(map[string]raft.Server) 82 if autopilotConfig.CleanupDeadServers { 83 future := s.raft.GetConfiguration() 84 if err := future.Error(); err != nil { 85 return err 86 } 87 88 for _, server := range future.Configuration().Servers { 89 staleRaftServers[string(server.Address)] = server 90 } 91 92 for _, member := range s.serfLAN.Members() { 93 valid, parts := metadata.IsConsulServer(member) 94 95 if valid { 96 // Remove this server from the stale list; it has a serf entry 97 if _, ok := staleRaftServers[parts.Addr.String()]; ok { 98 delete(staleRaftServers, parts.Addr.String()) 99 } 100 101 if member.Status == serf.StatusFailed { 102 failed = append(failed, member.Name) 103 } 104 } 105 } 106 } 107 108 removalCount := len(failed) + len(staleRaftServers) 109 110 // Nothing to remove, return early 111 if removalCount == 0 { 112 return nil 113 } 114 115 peers, err := s.numPeers() 116 if err != nil { 117 return err 118 } 119 120 // Only do removals if a minority of servers will be affected 121 if removalCount < peers/2 { 122 for _, server := range failed { 123 s.logger.Printf("[INFO] autopilot: Attempting removal of failed server: %v", server) 124 go s.serfLAN.RemoveFailedNode(server) 125 } 126 127 minRaftProtocol, err := ServerMinRaftProtocol(s.serfLAN.Members()) 128 if err != nil { 129 return err 130 } 131 for _, raftServer := range staleRaftServers { 132 var future raft.Future 133 if minRaftProtocol >= 2 { 134 s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID) 135 future = s.raft.RemoveServer(raftServer.ID, 0, 0) 136 } else { 137 s.logger.Printf("[INFO] autopilot: Attempting removal of stale raft server : %v", raftServer.ID) 138 future = s.raft.RemovePeer(raftServer.Address) 139 } 140 if err := future.Error(); err != nil { 141 return err 142 } 143 } 144 } else { 145 s.logger.Printf("[DEBUG] autopilot: Failed to remove dead servers: too many dead servers: %d/%d", removalCount, peers) 146 } 147 148 return nil 149} 150 151// BasicAutopilot defines a policy for promoting non-voting servers in a way 152// that maintains an odd-numbered voter count. 153type BasicAutopilot struct { 154 server *Server 155} 156 157// PromoteNonVoters promotes eligible non-voting servers to voters. 158func (b *BasicAutopilot) PromoteNonVoters(autopilotConfig *structs.AutopilotConfig) error { 159 minRaftProtocol, err := ServerMinRaftProtocol(b.server.LANMembers()) 160 if err != nil { 161 return fmt.Errorf("error getting server raft protocol versions: %s", err) 162 } 163 164 // If we don't meet the minimum version for non-voter features, bail early 165 if minRaftProtocol < 3 { 166 return nil 167 } 168 169 future := b.server.raft.GetConfiguration() 170 if err := future.Error(); err != nil { 171 return fmt.Errorf("failed to get raft configuration: %v", err) 172 } 173 174 // Find any non-voters eligible for promotion 175 var promotions []raft.Server 176 voterCount := 0 177 for _, server := range future.Configuration().Servers { 178 // If this server has been stable and passing for long enough, promote it to a voter 179 if !isVoter(server.Suffrage) { 180 health := b.server.getServerHealth(string(server.ID)) 181 if health.IsStable(time.Now(), autopilotConfig) { 182 promotions = append(promotions, server) 183 } 184 } else { 185 voterCount++ 186 } 187 } 188 189 if _, err := b.server.handlePromotions(voterCount, promotions); err != nil { 190 return err 191 } 192 193 return nil 194} 195 196func (s *Server) handlePromotions(voterCount int, promotions []raft.Server) (bool, error) { 197 if len(promotions) == 0 { 198 return false, nil 199 } 200 201 // If there's currently an even number of servers, we can promote the first server in the list 202 // to get to an odd-sized quorum 203 newServers := false 204 if voterCount%2 == 0 { 205 addFuture := s.raft.AddVoter(promotions[0].ID, promotions[0].Address, 0, 0) 206 if err := addFuture.Error(); err != nil { 207 return newServers, fmt.Errorf("failed to add raft peer: %v", err) 208 } 209 promotions = promotions[1:] 210 newServers = true 211 } 212 213 // Promote remaining servers in twos to maintain an odd quorum size 214 for i := 0; i < len(promotions)-1; i += 2 { 215 addFirst := s.raft.AddVoter(promotions[i].ID, promotions[i].Address, 0, 0) 216 if err := addFirst.Error(); err != nil { 217 return newServers, fmt.Errorf("failed to add raft peer: %v", err) 218 } 219 addSecond := s.raft.AddVoter(promotions[i+1].ID, promotions[i+1].Address, 0, 0) 220 if err := addSecond.Error(); err != nil { 221 return newServers, fmt.Errorf("failed to add raft peer: %v", err) 222 } 223 newServers = true 224 } 225 226 // If we added a new server, trigger a check to remove dead servers 227 if newServers { 228 select { 229 case s.autopilotRemoveDeadCh <- struct{}{}: 230 default: 231 } 232 } 233 234 return newServers, nil 235} 236 237// serverHealthLoop monitors the health of the servers in the cluster 238func (s *Server) serverHealthLoop() { 239 // Monitor server health until shutdown 240 ticker := time.NewTicker(s.config.ServerHealthInterval) 241 defer ticker.Stop() 242 243 for { 244 select { 245 case <-s.shutdownCh: 246 return 247 case <-ticker.C: 248 if err := s.updateClusterHealth(); err != nil { 249 s.logger.Printf("[ERR] autopilot: error updating cluster health: %s", err) 250 } 251 } 252 } 253} 254 255// updateClusterHealth fetches the Raft stats of the other servers and updates 256// s.clusterHealth based on the configured Autopilot thresholds 257func (s *Server) updateClusterHealth() error { 258 // Don't do anything if the min Raft version is too low 259 minRaftProtocol, err := ServerMinRaftProtocol(s.LANMembers()) 260 if err != nil { 261 return fmt.Errorf("error getting server raft protocol versions: %s", err) 262 } 263 if minRaftProtocol < 3 { 264 return nil 265 } 266 267 state := s.fsm.State() 268 _, autopilotConf, err := state.AutopilotConfig() 269 if err != nil { 270 return fmt.Errorf("error retrieving autopilot config: %s", err) 271 } 272 // Bail early if autopilot config hasn't been initialized yet 273 if autopilotConf == nil { 274 return nil 275 } 276 277 // Get the the serf members which are Consul servers 278 serverMap := make(map[string]*metadata.Server) 279 for _, member := range s.LANMembers() { 280 if member.Status == serf.StatusLeft { 281 continue 282 } 283 284 valid, parts := metadata.IsConsulServer(member) 285 if valid { 286 serverMap[parts.ID] = parts 287 } 288 } 289 290 future := s.raft.GetConfiguration() 291 if err := future.Error(); err != nil { 292 return fmt.Errorf("error getting Raft configuration %s", err) 293 } 294 servers := future.Configuration().Servers 295 296 // Fetch the health for each of the servers in parallel so we get as 297 // consistent of a sample as possible. We capture the leader's index 298 // here as well so it roughly lines up with the same point in time. 299 targetLastIndex := s.raft.LastIndex() 300 var fetchList []*metadata.Server 301 for _, server := range servers { 302 if parts, ok := serverMap[string(server.ID)]; ok { 303 fetchList = append(fetchList, parts) 304 } 305 } 306 d := time.Now().Add(s.config.ServerHealthInterval / 2) 307 ctx, cancel := context.WithDeadline(context.Background(), d) 308 defer cancel() 309 fetchedStats := s.statsFetcher.Fetch(ctx, fetchList) 310 311 // Build a current list of server healths 312 leader := s.raft.Leader() 313 var clusterHealth structs.OperatorHealthReply 314 voterCount := 0 315 healthyCount := 0 316 healthyVoterCount := 0 317 for _, server := range servers { 318 health := structs.ServerHealth{ 319 ID: string(server.ID), 320 Address: string(server.Address), 321 Leader: server.Address == leader, 322 LastContact: -1, 323 Voter: server.Suffrage == raft.Voter, 324 } 325 326 parts, ok := serverMap[string(server.ID)] 327 if ok { 328 health.Name = parts.Name 329 health.SerfStatus = parts.Status 330 health.Version = parts.Build.String() 331 if stats, ok := fetchedStats[string(server.ID)]; ok { 332 if err := s.updateServerHealth(&health, parts, stats, autopilotConf, targetLastIndex); err != nil { 333 s.logger.Printf("[WARN] autopilot: error updating server health: %s", err) 334 } 335 } 336 } else { 337 health.SerfStatus = serf.StatusNone 338 } 339 340 if health.Voter { 341 voterCount++ 342 } 343 if health.Healthy { 344 healthyCount++ 345 if health.Voter { 346 healthyVoterCount++ 347 } 348 } 349 350 clusterHealth.Servers = append(clusterHealth.Servers, health) 351 } 352 clusterHealth.Healthy = healthyCount == len(servers) 353 354 // If we have extra healthy voters, update FailureTolerance 355 requiredQuorum := voterCount/2 + 1 356 if healthyVoterCount > requiredQuorum { 357 clusterHealth.FailureTolerance = healthyVoterCount - requiredQuorum 358 } 359 360 // Heartbeat a metric for monitoring if we're the leader 361 if s.IsLeader() { 362 metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) 363 metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance)) 364 if clusterHealth.Healthy { 365 metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1) 366 metrics.SetGauge([]string{"autopilot", "healthy"}, 1) 367 } else { 368 metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0) 369 metrics.SetGauge([]string{"autopilot", "healthy"}, 0) 370 } 371 } 372 373 s.clusterHealthLock.Lock() 374 s.clusterHealth = clusterHealth 375 s.clusterHealthLock.Unlock() 376 377 return nil 378} 379 380// updateServerHealth computes the resulting health of the server based on its 381// fetched stats and the state of the leader. 382func (s *Server) updateServerHealth(health *structs.ServerHealth, 383 server *metadata.Server, stats *structs.ServerStats, 384 autopilotConf *structs.AutopilotConfig, targetLastIndex uint64) error { 385 386 health.LastTerm = stats.LastTerm 387 health.LastIndex = stats.LastIndex 388 389 if stats.LastContact != "never" { 390 var err error 391 health.LastContact, err = time.ParseDuration(stats.LastContact) 392 if err != nil { 393 return fmt.Errorf("error parsing last_contact duration: %s", err) 394 } 395 } 396 397 lastTerm, err := strconv.ParseUint(s.raft.Stats()["last_log_term"], 10, 64) 398 if err != nil { 399 return fmt.Errorf("error parsing last_log_term: %s", err) 400 } 401 health.Healthy = health.IsHealthy(lastTerm, targetLastIndex, autopilotConf) 402 403 // If this is a new server or the health changed, reset StableSince 404 lastHealth := s.getServerHealth(server.ID) 405 if lastHealth == nil || lastHealth.Healthy != health.Healthy { 406 health.StableSince = time.Now() 407 } else { 408 health.StableSince = lastHealth.StableSince 409 } 410 411 return nil 412} 413 414func (s *Server) getClusterHealth() structs.OperatorHealthReply { 415 s.clusterHealthLock.RLock() 416 defer s.clusterHealthLock.RUnlock() 417 return s.clusterHealth 418} 419 420func (s *Server) getServerHealth(id string) *structs.ServerHealth { 421 s.clusterHealthLock.RLock() 422 defer s.clusterHealthLock.RUnlock() 423 for _, health := range s.clusterHealth.Servers { 424 if health.ID == id { 425 return &health 426 } 427 } 428 return nil 429} 430 431func isVoter(suffrage raft.ServerSuffrage) bool { 432 switch suffrage { 433 case raft.Voter, raft.Staging: 434 return true 435 default: 436 return false 437 } 438} 439