1// Package servers provides a Manager interface for Manager managed 2// metadata.Server objects. The servers package manages servers from a Consul 3// client's perspective (i.e. a list of servers that a client talks with for 4// RPCs). The servers package does not provide any API guarantees and should 5// be called only by `hashicorp/consul`. 6package router 7 8import ( 9 "math/rand" 10 "net" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/hashicorp/go-hclog" 16 17 "github.com/hashicorp/consul/agent/metadata" 18 "github.com/hashicorp/consul/logging" 19) 20 21// ManagerSerfCluster is an interface wrapper around Serf in order to make this 22// easier to unit test. 23type ManagerSerfCluster interface { 24 NumNodes() int 25} 26 27// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import 28// dependency. 29type Pinger interface { 30 Ping(dc, nodeName string, addr net.Addr) (bool, error) 31} 32 33// serverList is a local copy of the struct used to maintain the list of 34// Consul servers used by Manager. 35// 36// NOTE(sean@): We are explicitly relying on the fact that serverList will 37// be copied onto the stack. Please keep this structure light. 38type serverList struct { 39 // servers tracks the locally known servers. List membership is 40 // maintained by Serf. 41 servers []*metadata.Server 42} 43 44type Manager struct { 45 // listValue manages the atomic load/store of a Manager's serverList 46 listValue atomic.Value 47 listLock sync.Mutex 48 49 // rebalanceTimer controls the duration of the rebalance interval 50 rebalanceTimer *time.Timer 51 52 // shutdownCh is a copy of the channel in consul.Client 53 shutdownCh chan struct{} 54 55 logger hclog.Logger 56 57 // clusterInfo is used to estimate the approximate number of nodes in 58 // a cluster and limit the rate at which it rebalances server 59 // connections. ManagerSerfCluster is an interface that wraps serf. 60 clusterInfo ManagerSerfCluster 61 62 // connPoolPinger is used to test the health of a server in the 63 // connection pool. Pinger is an interface that wraps 64 // client.ConnPool. 65 connPoolPinger Pinger 66 67 rebalancer Rebalancer 68 69 // serverName has the name of the managers's server. This is used to 70 // short-circuit pinging to itself. 71 serverName string 72 73 // notifyFailedBarrier is acts as a barrier to prevent queuing behind 74 // serverListLog and acts as a TryLock(). 75 notifyFailedBarrier int32 76 77 // offline is used to indicate that there are no servers, or that all 78 // known servers have failed the ping test. 79 offline int32 80} 81 82// AddServer takes out an internal write lock and adds a new server. If the 83// server is not known, appends the server to the list. The new server will 84// begin seeing use after the rebalance timer fires or enough servers fail 85// organically. If the server is already known, merge the new server 86// details. 87func (m *Manager) AddServer(s *metadata.Server) { 88 m.listLock.Lock() 89 defer m.listLock.Unlock() 90 l := m.getServerList() 91 92 // Check if this server is known 93 found := false 94 for idx, existing := range l.servers { 95 if existing.Name == s.Name { 96 newServers := make([]*metadata.Server, len(l.servers)) 97 copy(newServers, l.servers) 98 99 // Overwrite the existing server details in order to 100 // possibly update metadata (e.g. server version) 101 newServers[idx] = s 102 103 l.servers = newServers 104 found = true 105 break 106 } 107 } 108 109 // Add to the list if not known 110 if !found { 111 newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1) 112 copy(newServers, l.servers) 113 newServers = append(newServers, s) 114 l.servers = newServers 115 } 116 117 // Assume we are no longer offline since we've just seen a new server. 118 atomic.StoreInt32(&m.offline, 0) 119 120 // Start using this list of servers. 121 m.saveServerList(l) 122} 123 124// UpdateTLS updates the TLS setting for the servers in this manager 125func (m *Manager) UpdateTLS(useTLS bool) { 126 m.listLock.Lock() 127 defer m.listLock.Unlock() 128 129 list := m.getServerList() 130 for _, server := range list.servers { 131 server.UseTLS = useTLS 132 } 133 m.saveServerList(list) 134} 135 136// cycleServers returns a new list of servers that has dequeued the first 137// server and enqueued it at the end of the list. cycleServers assumes the 138// caller is holding the listLock. cycleServer does not test or ping 139// the next server inline. cycleServer may be called when the environment 140// has just entered an unhealthy situation and blocking on a server test is 141// less desirable than just returning the next server in the firing line. If 142// the next server fails, it will fail fast enough and cycleServer will be 143// called again. 144func (l *serverList) cycleServer() (servers []*metadata.Server) { 145 numServers := len(l.servers) 146 if numServers < 2 { 147 return servers // No action required 148 } 149 150 newServers := make([]*metadata.Server, 0, numServers) 151 newServers = append(newServers, l.servers[1:]...) 152 newServers = append(newServers, l.servers[0]) 153 154 return newServers 155} 156 157// removeServerByKey performs an inline removal of the first matching server 158func (l *serverList) removeServerByKey(targetKey *metadata.Key) { 159 for i, s := range l.servers { 160 if targetKey.Equal(s.Key()) { 161 copy(l.servers[i:], l.servers[i+1:]) 162 l.servers[len(l.servers)-1] = nil 163 l.servers = l.servers[:len(l.servers)-1] 164 return 165 } 166 } 167} 168 169// shuffleServers shuffles the server list in place 170func (l *serverList) shuffleServers() { 171 for i := len(l.servers) - 1; i > 0; i-- { 172 j := rand.Int31n(int32(i + 1)) 173 l.servers[i], l.servers[j] = l.servers[j], l.servers[i] 174 } 175} 176 177// IsOffline checks to see if all the known servers have failed their ping 178// test during the last rebalance. 179func (m *Manager) IsOffline() bool { 180 offline := atomic.LoadInt32(&m.offline) 181 return offline == 1 182} 183 184// FindServer takes out an internal "read lock" and searches through the list 185// of servers to find a "healthy" server. If the server is actually 186// unhealthy, we rely on Serf to detect this and remove the node from the 187// server list. If the server at the front of the list has failed or fails 188// during an RPC call, it is rotated to the end of the list. If there are no 189// servers available, return nil. 190func (m *Manager) FindServer() *metadata.Server { 191 l := m.getServerList() 192 numServers := len(l.servers) 193 if numServers == 0 { 194 m.logger.Warn("No servers available") 195 return nil 196 } 197 198 // Return whatever is at the front of the list because it is 199 // assumed to be the oldest in the server list (unless - 200 // hypothetically - the server list was rotated right after a 201 // server was added). 202 return l.servers[0] 203} 204 205func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool { 206 if m == nil { 207 return true 208 } 209 210 for _, srv := range m.getServerList().servers { 211 if !fn(srv) { 212 return false 213 } 214 } 215 return true 216} 217 218func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) { 219 _ = m.checkServers(fn) 220} 221 222// getServerList is a convenience method which hides the locking semantics 223// of atomic.Value from the caller. 224func (m *Manager) getServerList() serverList { 225 if m == nil { 226 return serverList{} 227 } 228 return m.listValue.Load().(serverList) 229} 230 231// saveServerList is a convenience method which hides the locking semantics 232// of atomic.Value from the caller. 233func (m *Manager) saveServerList(l serverList) { 234 m.listValue.Store(l) 235} 236 237// New is the only way to safely create a new Manager struct. 238func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string, rb Rebalancer) (m *Manager) { 239 if logger == nil { 240 logger = hclog.New(&hclog.LoggerOptions{}) 241 } 242 243 m = new(Manager) 244 m.logger = logger.Named(logging.Manager) 245 m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle 246 m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle 247 m.rebalanceTimer = time.NewTimer(delayer.MinDelay) 248 m.shutdownCh = shutdownCh 249 m.rebalancer = rb 250 m.serverName = serverName 251 atomic.StoreInt32(&m.offline, 1) 252 253 l := serverList{} 254 l.servers = make([]*metadata.Server, 0) 255 m.saveServerList(l) 256 return m 257} 258 259// NotifyFailedServer marks the passed in server as "failed" by rotating it 260// to the end of the server list. 261func (m *Manager) NotifyFailedServer(s *metadata.Server) { 262 l := m.getServerList() 263 264 // If the server being failed is not the first server on the list, 265 // this is a noop. If, however, the server is failed and first on 266 // the list, acquire the lock, retest, and take the penalty of moving 267 // the server to the end of the list. 268 269 // Only rotate the server list when there is more than one server 270 if len(l.servers) > 1 && l.servers[0].Name == s.Name && 271 // Use atomic.CAS to emulate a TryLock(). 272 atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) { 273 defer atomic.StoreInt32(&m.notifyFailedBarrier, 0) 274 275 // Grab a lock, retest, and take the hit of cycling the first 276 // server to the end. 277 m.listLock.Lock() 278 defer m.listLock.Unlock() 279 l = m.getServerList() 280 281 if len(l.servers) > 1 && l.servers[0].Name == s.Name { 282 l.servers = l.cycleServer() 283 m.saveServerList(l) 284 m.logger.Debug("cycled away from server", "server", s.String()) 285 } 286 } 287} 288 289// NumServers takes out an internal "read lock" and returns the number of 290// servers. numServers includes both healthy and unhealthy servers. 291func (m *Manager) NumServers() int { 292 l := m.getServerList() 293 return len(l.servers) 294} 295 296func (m *Manager) healthyServer(server *metadata.Server) bool { 297 // Check to see if the manager is trying to ping itself. This 298 // is a small optimization to avoid performing an unnecessary 299 // RPC call. 300 // If this is true, we know there are healthy servers for this 301 // manager and we don't need to continue. 302 if m.serverName != "" && server.Name == m.serverName { 303 return true 304 } 305 if ok, err := m.connPoolPinger.Ping(server.Datacenter, server.ShortName, server.Addr); !ok { 306 m.logger.Debug("pinging server failed", 307 "server", server.String(), 308 "error", err, 309 ) 310 return false 311 } 312 return true 313} 314 315// RebalanceServers shuffles the list of servers on this metadata. The server 316// at the front of the list is selected for the next RPC. RPC calls that 317// fail for a particular server are rotated to the end of the list. This 318// method reshuffles the list periodically in order to redistribute work 319// across all known consul servers (i.e. guarantee that the order of servers 320// in the server list is not positively correlated with the age of a server 321// in the Consul cluster). Periodically shuffling the server list prevents 322// long-lived clients from fixating on long-lived servers. 323// 324// Unhealthy servers are removed when serf notices the server has been 325// deregistered. Before the newly shuffled server list is saved, the new 326// remote endpoint is tested to ensure its responsive. 327func (m *Manager) RebalanceServers() { 328 // Obtain a copy of the current serverList 329 l := m.getServerList() 330 331 // Shuffle servers so we have a chance of picking a new one. 332 l.shuffleServers() 333 334 // Iterate through the shuffled server list to find an assumed 335 // healthy server. NOTE: Do not iterate on the list directly because 336 // this loop mutates the server list in-place. 337 var foundHealthyServer bool 338 for i := 0; i < len(l.servers); i++ { 339 // Always test the first server. Failed servers are cycled 340 // while Serf detects the node has failed. 341 if m.healthyServer(l.servers[0]) { 342 foundHealthyServer = true 343 break 344 } 345 l.servers = l.cycleServer() 346 } 347 348 // If no healthy servers were found, sleep and wait for Serf to make 349 // the world a happy place again. Update the offline status. 350 if foundHealthyServer { 351 atomic.StoreInt32(&m.offline, 0) 352 } else { 353 atomic.StoreInt32(&m.offline, 1) 354 m.logger.Debug("No healthy servers during rebalance, aborting") 355 return 356 } 357 358 // Verify that all servers are present 359 if m.reconcileServerList(&l) { 360 m.logger.Debug("Rebalanced servers, new active server", 361 "number_of_servers", len(l.servers), 362 "active_server", l.servers[0].String(), 363 ) 364 } 365 // else { 366 // reconcileServerList failed because Serf removed the server 367 // that was at the front of the list that had successfully 368 // been Ping'ed. Between the Ping and reconcile, a Serf 369 // event had shown up removing the node. 370 // 371 // Instead of doing any heroics, "freeze in place" and 372 // continue to use the existing connection until the next 373 // rebalance occurs. 374 // } 375} 376 377// reconcileServerList returns true when the first server in serverList 378// exists in the receiver's serverList. If true, the merged serverList is 379// stored as the receiver's serverList. Returns false if the first server 380// does not exist in the list (i.e. was removed by Serf during a 381// PingConsulServer() call. Newly added servers are appended to the list and 382// other missing servers are removed from the list. 383func (m *Manager) reconcileServerList(l *serverList) bool { 384 m.listLock.Lock() 385 defer m.listLock.Unlock() 386 387 // newServerCfg is a serverList that has been kept up to date with 388 // Serf node join and node leave events. 389 newServerCfg := m.getServerList() 390 391 // If Serf has removed all nodes, or there is no selected server 392 // (zero nodes in serverList), abort early. 393 if len(newServerCfg.servers) == 0 || len(l.servers) == 0 { 394 return false 395 } 396 397 type targetServer struct { 398 server *metadata.Server 399 400 // 'b' == both 401 // 'o' == original 402 // 'n' == new 403 state byte 404 } 405 mergedList := make(map[metadata.Key]*targetServer, len(l.servers)) 406 for _, s := range l.servers { 407 mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} 408 } 409 for _, s := range newServerCfg.servers { 410 k := s.Key() 411 _, found := mergedList[*k] 412 if found { 413 mergedList[*k].state = 'b' 414 } else { 415 mergedList[*k] = &targetServer{server: s, state: 'n'} 416 } 417 } 418 419 // Ensure the selected server has not been removed by Serf 420 selectedServerKey := l.servers[0].Key() 421 if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { 422 return false 423 } 424 425 // Append any new servers and remove any old servers 426 for k, v := range mergedList { 427 switch v.state { 428 case 'b': 429 // Do nothing, server exists in both 430 case 'o': 431 // Server has been removed 432 l.removeServerByKey(&k) 433 case 'n': 434 // Server added 435 l.servers = append(l.servers, v.server) 436 default: 437 panic("unknown merge list state") 438 } 439 } 440 441 m.saveServerList(*l) 442 return true 443} 444 445// RemoveServer takes out an internal write lock and removes a server from 446// the server list. 447func (m *Manager) RemoveServer(s *metadata.Server) { 448 m.listLock.Lock() 449 defer m.listLock.Unlock() 450 l := m.getServerList() 451 452 // Remove the server if known 453 for i := range l.servers { 454 if l.servers[i].Name == s.Name { 455 newServers := make([]*metadata.Server, 0, len(l.servers)-1) 456 newServers = append(newServers, l.servers[:i]...) 457 newServers = append(newServers, l.servers[i+1:]...) 458 l.servers = newServers 459 460 m.saveServerList(l) 461 return 462 } 463 } 464} 465 466// ResetRebalanceTimer resets the rebalance timer. This method exists for 467// testing and should not be used directly. 468func (m *Manager) ResetRebalanceTimer() { 469 m.listLock.Lock() 470 defer m.listLock.Unlock() 471 m.rebalanceTimer.Reset(delayer.MinDelay) 472} 473 474// Run periodically shuffles the list of servers to evenly distribute load. 475// Run exits when shutdownCh is closed. 476// 477// When a server fails it is moved to the end of the list, and new servers are 478// appended to the end of the list. Run ensures that load is distributed evenly 479// to all servers by randomly shuffling the list. 480func (m *Manager) Run() { 481 for { 482 select { 483 case <-m.rebalanceTimer.C: 484 m.rebalancer() 485 m.RebalanceServers() 486 delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes()) 487 m.rebalanceTimer.Reset(delay) 488 489 case <-m.shutdownCh: 490 m.logger.Info("shutting down") 491 return 492 } 493 } 494} 495 496// delayer is used to calculate the time to wait between calls to rebalance the 497// servers. Rebalancing is necessary to ensure that load is balanced evenly 498// across all the servers. 499// 500// The values used by delayer must balance perfectly distributed server load 501// against the overhead of a client reconnecting to a server. Rebalancing on 502// every request would cause a lot of unnecessary load as clients reconnect, 503// where as never rebalancing would lead to situations where one or two servers 504// handle a lot more requests than others. 505// 506// These values result in a minimum delay of 120-180s. Once the number of 507// nodes/server exceeds 11520, the value will be determined by multiplying the 508// node/server ratio by 15.625ms. 509var delayer = rebalanceDelayer{ 510 MinDelay: 2 * time.Minute, 511 MaxJitter: time.Minute, 512 // Once the number of nodes/server exceeds 11520 this value is used to 513 // increase the delay between rebalances to set a limit on the number of 514 // reconnections per server in a given time frame. 515 // 516 // A higher value comes at the cost of increased recovery time after a 517 // partition. 518 // 519 // For example, in a 100,000 node consul cluster with 5 servers, it will 520 // take ~5min for all clients to rebalance their connections. If 521 // 99,995 agents are in the minority talking to only one server, it 522 // will take ~26min for all clients to rebalance. A 10K cluster in 523 // the same scenario will take ~2.6min to rebalance. 524 DelayPerNode: 15*time.Millisecond + 625*time.Microsecond, 525} 526 527type rebalanceDelayer struct { 528 // MinDelay that may be returned by Delay 529 MinDelay time.Duration 530 // MaxJitter to add to MinDelay to ensure there is some randomness in the 531 // delay. 532 MaxJitter time.Duration 533 // DelayPerNode is the duration to add to each node when calculating delay. 534 // The value is divided by the number of servers to arrive at the final 535 // delay value. 536 DelayPerNode time.Duration 537} 538 539func (d *rebalanceDelayer) Delay(servers int, nodes int) time.Duration { 540 min := d.MinDelay + time.Duration(rand.Int63n(int64(d.MaxJitter))) 541 if servers == 0 { 542 return min 543 } 544 545 delay := time.Duration(float64(nodes) * float64(d.DelayPerNode) / float64(servers)) 546 if delay < min { 547 return min 548 } 549 return delay 550} 551