1// Package servers provides a Manager interface for Manager managed 2// metadata.Server objects. The servers package manages servers from a Consul 3// client's perspective (i.e. a list of servers that a client talks with for 4// RPCs). The servers package does not provide any API guarantees and should 5// be called only by `hashicorp/consul`. 6package router 7 8import ( 9 "math/rand" 10 "net" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/hashicorp/consul/agent/metadata" 16 "github.com/hashicorp/consul/lib" 17 "github.com/hashicorp/consul/logging" 18 "github.com/hashicorp/go-hclog" 19) 20 21const ( 22 // clientRPCJitterFraction determines the amount of jitter added to 23 // clientRPCMinReuseDuration before a connection is expired and a new 24 // connection is established in order to rebalance load across consul 25 // servers. The cluster-wide number of connections per second from 26 // rebalancing is applied after this jitter to ensure the CPU impact 27 // is always finite. See newRebalanceConnsPerSecPerServer's comment 28 // for additional commentary. 29 // 30 // For example, in a 10K consul cluster with 5x servers, this default 31 // averages out to ~13 new connections from rebalancing per server 32 // per second (each connection is reused for 120s to 180s). 33 clientRPCJitterFraction = 2 34 35 // clientRPCMinReuseDuration controls the minimum amount of time RPC 36 // queries are sent over an established connection to a single server 37 clientRPCMinReuseDuration = 120 * time.Second 38 39 // Limit the number of new connections a server receives per second 40 // for connection rebalancing. This limit caps the load caused by 41 // continual rebalancing efforts when a cluster is in equilibrium. A 42 // lower value comes at the cost of increased recovery time after a 43 // partition. This parameter begins to take effect when there are 44 // more than ~48K clients querying 5x servers or at lower server 45 // values when there is a partition. 46 // 47 // For example, in a 100K consul cluster with 5x servers, it will 48 // take ~5min for all servers to rebalance their connections. If 49 // 99,995 agents are in the minority talking to only one server, it 50 // will take ~26min for all servers to rebalance. A 10K cluster in 51 // the same scenario will take ~2.6min to rebalance. 52 newRebalanceConnsPerSecPerServer = 64 53) 54 55// ManagerSerfCluster is an interface wrapper around Serf in order to make this 56// easier to unit test. 57type ManagerSerfCluster interface { 58 NumNodes() int 59} 60 61// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import 62// dependency. 63type Pinger interface { 64 Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error) 65} 66 67// serverList is a local copy of the struct used to maintain the list of 68// Consul servers used by Manager. 69// 70// NOTE(sean@): We are explicitly relying on the fact that serverList will 71// be copied onto the stack. Please keep this structure light. 72type serverList struct { 73 // servers tracks the locally known servers. List membership is 74 // maintained by Serf. 75 servers []*metadata.Server 76} 77 78type Manager struct { 79 // listValue manages the atomic load/store of a Manager's serverList 80 listValue atomic.Value 81 listLock sync.Mutex 82 83 // rebalanceTimer controls the duration of the rebalance interval 84 rebalanceTimer *time.Timer 85 86 // shutdownCh is a copy of the channel in consul.Client 87 shutdownCh chan struct{} 88 89 logger hclog.Logger 90 91 // clusterInfo is used to estimate the approximate number of nodes in 92 // a cluster and limit the rate at which it rebalances server 93 // connections. ManagerSerfCluster is an interface that wraps serf. 94 clusterInfo ManagerSerfCluster 95 96 // connPoolPinger is used to test the health of a server in the 97 // connection pool. Pinger is an interface that wraps 98 // client.ConnPool. 99 connPoolPinger Pinger 100 101 // notifyFailedBarrier is acts as a barrier to prevent queuing behind 102 // serverListLog and acts as a TryLock(). 103 notifyFailedBarrier int32 104 105 // offline is used to indicate that there are no servers, or that all 106 // known servers have failed the ping test. 107 offline int32 108} 109 110// AddServer takes out an internal write lock and adds a new server. If the 111// server is not known, appends the server to the list. The new server will 112// begin seeing use after the rebalance timer fires or enough servers fail 113// organically. If the server is already known, merge the new server 114// details. 115func (m *Manager) AddServer(s *metadata.Server) { 116 m.listLock.Lock() 117 defer m.listLock.Unlock() 118 l := m.getServerList() 119 120 // Check if this server is known 121 found := false 122 for idx, existing := range l.servers { 123 if existing.Name == s.Name { 124 newServers := make([]*metadata.Server, len(l.servers)) 125 copy(newServers, l.servers) 126 127 // Overwrite the existing server details in order to 128 // possibly update metadata (e.g. server version) 129 newServers[idx] = s 130 131 l.servers = newServers 132 found = true 133 break 134 } 135 } 136 137 // Add to the list if not known 138 if !found { 139 newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1) 140 copy(newServers, l.servers) 141 newServers = append(newServers, s) 142 l.servers = newServers 143 } 144 145 // Assume we are no longer offline since we've just seen a new server. 146 atomic.StoreInt32(&m.offline, 0) 147 148 // Start using this list of servers. 149 m.saveServerList(l) 150} 151 152// UpdateTLS updates the TLS setting for the servers in this manager 153func (m *Manager) UpdateTLS(useTLS bool) { 154 m.listLock.Lock() 155 defer m.listLock.Unlock() 156 157 list := m.getServerList() 158 for _, server := range list.servers { 159 server.UseTLS = useTLS 160 } 161 m.saveServerList(list) 162} 163 164// cycleServers returns a new list of servers that has dequeued the first 165// server and enqueued it at the end of the list. cycleServers assumes the 166// caller is holding the listLock. cycleServer does not test or ping 167// the next server inline. cycleServer may be called when the environment 168// has just entered an unhealthy situation and blocking on a server test is 169// less desirable than just returning the next server in the firing line. If 170// the next server fails, it will fail fast enough and cycleServer will be 171// called again. 172func (l *serverList) cycleServer() (servers []*metadata.Server) { 173 numServers := len(l.servers) 174 if numServers < 2 { 175 return servers // No action required 176 } 177 178 newServers := make([]*metadata.Server, 0, numServers) 179 newServers = append(newServers, l.servers[1:]...) 180 newServers = append(newServers, l.servers[0]) 181 182 return newServers 183} 184 185// removeServerByKey performs an inline removal of the first matching server 186func (l *serverList) removeServerByKey(targetKey *metadata.Key) { 187 for i, s := range l.servers { 188 if targetKey.Equal(s.Key()) { 189 copy(l.servers[i:], l.servers[i+1:]) 190 l.servers[len(l.servers)-1] = nil 191 l.servers = l.servers[:len(l.servers)-1] 192 return 193 } 194 } 195} 196 197// shuffleServers shuffles the server list in place 198func (l *serverList) shuffleServers() { 199 for i := len(l.servers) - 1; i > 0; i-- { 200 j := rand.Int31n(int32(i + 1)) 201 l.servers[i], l.servers[j] = l.servers[j], l.servers[i] 202 } 203} 204 205// IsOffline checks to see if all the known servers have failed their ping 206// test during the last rebalance. 207func (m *Manager) IsOffline() bool { 208 offline := atomic.LoadInt32(&m.offline) 209 return offline == 1 210} 211 212// FindServer takes out an internal "read lock" and searches through the list 213// of servers to find a "healthy" server. If the server is actually 214// unhealthy, we rely on Serf to detect this and remove the node from the 215// server list. If the server at the front of the list has failed or fails 216// during an RPC call, it is rotated to the end of the list. If there are no 217// servers available, return nil. 218func (m *Manager) FindServer() *metadata.Server { 219 l := m.getServerList() 220 numServers := len(l.servers) 221 if numServers == 0 { 222 m.logger.Warn("No servers available") 223 return nil 224 } 225 226 // Return whatever is at the front of the list because it is 227 // assumed to be the oldest in the server list (unless - 228 // hypothetically - the server list was rotated right after a 229 // server was added). 230 return l.servers[0] 231} 232 233func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool { 234 for _, srv := range m.getServerList().servers { 235 if !fn(srv) { 236 return false 237 } 238 } 239 return true 240} 241 242func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) { 243 _ = m.checkServers(fn) 244} 245 246// getServerList is a convenience method which hides the locking semantics 247// of atomic.Value from the caller. 248func (m *Manager) getServerList() serverList { 249 return m.listValue.Load().(serverList) 250} 251 252// saveServerList is a convenience method which hides the locking semantics 253// of atomic.Value from the caller. 254func (m *Manager) saveServerList(l serverList) { 255 m.listValue.Store(l) 256} 257 258// New is the only way to safely create a new Manager struct. 259func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager) { 260 if logger == nil { 261 logger = hclog.New(&hclog.LoggerOptions{}) 262 } 263 264 m = new(Manager) 265 m.logger = logger.Named(logging.Manager) 266 m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle 267 m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle 268 m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) 269 m.shutdownCh = shutdownCh 270 atomic.StoreInt32(&m.offline, 1) 271 272 l := serverList{} 273 l.servers = make([]*metadata.Server, 0) 274 m.saveServerList(l) 275 return m 276} 277 278// NotifyFailedServer marks the passed in server as "failed" by rotating it 279// to the end of the server list. 280func (m *Manager) NotifyFailedServer(s *metadata.Server) { 281 l := m.getServerList() 282 283 // If the server being failed is not the first server on the list, 284 // this is a noop. If, however, the server is failed and first on 285 // the list, acquire the lock, retest, and take the penalty of moving 286 // the server to the end of the list. 287 288 // Only rotate the server list when there is more than one server 289 if len(l.servers) > 1 && l.servers[0].Name == s.Name && 290 // Use atomic.CAS to emulate a TryLock(). 291 atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) { 292 defer atomic.StoreInt32(&m.notifyFailedBarrier, 0) 293 294 // Grab a lock, retest, and take the hit of cycling the first 295 // server to the end. 296 m.listLock.Lock() 297 defer m.listLock.Unlock() 298 l = m.getServerList() 299 300 if len(l.servers) > 1 && l.servers[0].Name == s.Name { 301 l.servers = l.cycleServer() 302 m.saveServerList(l) 303 m.logger.Debug("cycled away from server", "server", s.String()) 304 } 305 } 306} 307 308// NumServers takes out an internal "read lock" and returns the number of 309// servers. numServers includes both healthy and unhealthy servers. 310func (m *Manager) NumServers() int { 311 l := m.getServerList() 312 return len(l.servers) 313} 314 315// RebalanceServers shuffles the list of servers on this metadata. The server 316// at the front of the list is selected for the next RPC. RPC calls that 317// fail for a particular server are rotated to the end of the list. This 318// method reshuffles the list periodically in order to redistribute work 319// across all known consul servers (i.e. guarantee that the order of servers 320// in the server list is not positively correlated with the age of a server 321// in the Consul cluster). Periodically shuffling the server list prevents 322// long-lived clients from fixating on long-lived servers. 323// 324// Unhealthy servers are removed when serf notices the server has been 325// deregistered. Before the newly shuffled server list is saved, the new 326// remote endpoint is tested to ensure its responsive. 327func (m *Manager) RebalanceServers() { 328 // Obtain a copy of the current serverList 329 l := m.getServerList() 330 331 // Shuffle servers so we have a chance of picking a new one. 332 l.shuffleServers() 333 334 // Iterate through the shuffled server list to find an assumed 335 // healthy server. NOTE: Do not iterate on the list directly because 336 // this loop mutates the server list in-place. 337 var foundHealthyServer bool 338 for i := 0; i < len(l.servers); i++ { 339 // Always test the first server. Failed servers are cycled 340 // while Serf detects the node has failed. 341 srv := l.servers[0] 342 343 ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.Addr, srv.Version, srv.UseTLS) 344 if ok { 345 foundHealthyServer = true 346 break 347 } 348 m.logger.Debug("pinging server failed", 349 "server", srv.String(), 350 "error", err, 351 ) 352 l.servers = l.cycleServer() 353 } 354 355 // If no healthy servers were found, sleep and wait for Serf to make 356 // the world a happy place again. Update the offline status. 357 if foundHealthyServer { 358 atomic.StoreInt32(&m.offline, 0) 359 } else { 360 atomic.StoreInt32(&m.offline, 1) 361 m.logger.Debug("No healthy servers during rebalance, aborting") 362 return 363 } 364 365 // Verify that all servers are present 366 if m.reconcileServerList(&l) { 367 m.logger.Debug("Rebalanced servers, new active server", 368 "number_of_servers", len(l.servers), 369 "active_server", l.servers[0].String(), 370 ) 371 } else { 372 // reconcileServerList failed because Serf removed the server 373 // that was at the front of the list that had successfully 374 // been Ping'ed. Between the Ping and reconcile, a Serf 375 // event had shown up removing the node. 376 // 377 // Instead of doing any heroics, "freeze in place" and 378 // continue to use the existing connection until the next 379 // rebalance occurs. 380 } 381} 382 383// reconcileServerList returns true when the first server in serverList 384// exists in the receiver's serverList. If true, the merged serverList is 385// stored as the receiver's serverList. Returns false if the first server 386// does not exist in the list (i.e. was removed by Serf during a 387// PingConsulServer() call. Newly added servers are appended to the list and 388// other missing servers are removed from the list. 389func (m *Manager) reconcileServerList(l *serverList) bool { 390 m.listLock.Lock() 391 defer m.listLock.Unlock() 392 393 // newServerCfg is a serverList that has been kept up to date with 394 // Serf node join and node leave events. 395 newServerCfg := m.getServerList() 396 397 // If Serf has removed all nodes, or there is no selected server 398 // (zero nodes in serverList), abort early. 399 if len(newServerCfg.servers) == 0 || len(l.servers) == 0 { 400 return false 401 } 402 403 type targetServer struct { 404 server *metadata.Server 405 406 // 'b' == both 407 // 'o' == original 408 // 'n' == new 409 state byte 410 } 411 mergedList := make(map[metadata.Key]*targetServer, len(l.servers)) 412 for _, s := range l.servers { 413 mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} 414 } 415 for _, s := range newServerCfg.servers { 416 k := s.Key() 417 _, found := mergedList[*k] 418 if found { 419 mergedList[*k].state = 'b' 420 } else { 421 mergedList[*k] = &targetServer{server: s, state: 'n'} 422 } 423 } 424 425 // Ensure the selected server has not been removed by Serf 426 selectedServerKey := l.servers[0].Key() 427 if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { 428 return false 429 } 430 431 // Append any new servers and remove any old servers 432 for k, v := range mergedList { 433 switch v.state { 434 case 'b': 435 // Do nothing, server exists in both 436 case 'o': 437 // Server has been removed 438 l.removeServerByKey(&k) 439 case 'n': 440 // Server added 441 l.servers = append(l.servers, v.server) 442 default: 443 panic("unknown merge list state") 444 } 445 } 446 447 m.saveServerList(*l) 448 return true 449} 450 451// RemoveServer takes out an internal write lock and removes a server from 452// the server list. 453func (m *Manager) RemoveServer(s *metadata.Server) { 454 m.listLock.Lock() 455 defer m.listLock.Unlock() 456 l := m.getServerList() 457 458 // Remove the server if known 459 for i := range l.servers { 460 if l.servers[i].Name == s.Name { 461 newServers := make([]*metadata.Server, 0, len(l.servers)-1) 462 newServers = append(newServers, l.servers[:i]...) 463 newServers = append(newServers, l.servers[i+1:]...) 464 l.servers = newServers 465 466 m.saveServerList(l) 467 return 468 } 469 } 470} 471 472// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. 473func (m *Manager) refreshServerRebalanceTimer() time.Duration { 474 l := m.getServerList() 475 numServers := len(l.servers) 476 // Limit this connection's life based on the size (and health) of the 477 // cluster. Never rebalance a connection more frequently than 478 // connReuseLowWatermarkDuration, and make sure we never exceed 479 // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. 480 clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer) 481 connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) 482 numLANMembers := m.clusterInfo.NumNodes() 483 connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) 484 485 m.rebalanceTimer.Reset(connRebalanceTimeout) 486 return connRebalanceTimeout 487} 488 489// ResetRebalanceTimer resets the rebalance timer. This method exists for 490// testing and should not be used directly. 491func (m *Manager) ResetRebalanceTimer() { 492 m.listLock.Lock() 493 defer m.listLock.Unlock() 494 m.rebalanceTimer.Reset(clientRPCMinReuseDuration) 495} 496 497// Start is used to start and manage the task of automatically shuffling and 498// rebalancing the list of Consul servers. This maintenance only happens 499// periodically based on the expiration of the timer. Failed servers are 500// automatically cycled to the end of the list. New servers are appended to 501// the list. The order of the server list must be shuffled periodically to 502// distribute load across all known and available Consul servers. 503func (m *Manager) Start() { 504 for { 505 select { 506 case <-m.rebalanceTimer.C: 507 m.RebalanceServers() 508 m.refreshServerRebalanceTimer() 509 510 case <-m.shutdownCh: 511 m.logger.Info("shutting down") 512 return 513 } 514 } 515} 516