1// Package servers provides a Manager interface for Manager managed
2// metadata.Server objects.  The servers package manages servers from a Consul
3// client's perspective (i.e. a list of servers that a client talks with for
4// RPCs).  The servers package does not provide any API guarantees and should
5// be called only by `hashicorp/consul`.
6package router
7
8import (
9	"math/rand"
10	"net"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/hashicorp/go-hclog"
16
17	"github.com/hashicorp/consul/agent/metadata"
18	"github.com/hashicorp/consul/logging"
19)
20
21// ManagerSerfCluster is an interface wrapper around Serf in order to make this
22// easier to unit test.
23type ManagerSerfCluster interface {
24	NumNodes() int
25}
26
27// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
28// dependency.
29type Pinger interface {
30	Ping(dc, nodeName string, addr net.Addr) (bool, error)
31}
32
33// serverList is a local copy of the struct used to maintain the list of
34// Consul servers used by Manager.
35//
36// NOTE(sean@): We are explicitly relying on the fact that serverList will
37// be copied onto the stack.  Please keep this structure light.
38type serverList struct {
39	// servers tracks the locally known servers.  List membership is
40	// maintained by Serf.
41	servers []*metadata.Server
42}
43
44type Manager struct {
45	// listValue manages the atomic load/store of a Manager's serverList
46	listValue atomic.Value
47	listLock  sync.Mutex
48
49	// rebalanceTimer controls the duration of the rebalance interval
50	rebalanceTimer *time.Timer
51
52	// shutdownCh is a copy of the channel in consul.Client
53	shutdownCh chan struct{}
54
55	logger hclog.Logger
56
57	// clusterInfo is used to estimate the approximate number of nodes in
58	// a cluster and limit the rate at which it rebalances server
59	// connections.  ManagerSerfCluster is an interface that wraps serf.
60	clusterInfo ManagerSerfCluster
61
62	// connPoolPinger is used to test the health of a server in the
63	// connection pool.  Pinger is an interface that wraps
64	// client.ConnPool.
65	connPoolPinger Pinger
66
67	rebalancer Rebalancer
68
69	// serverName has the name of the managers's server. This is used to
70	// short-circuit pinging to itself.
71	serverName string
72
73	// notifyFailedBarrier is acts as a barrier to prevent queuing behind
74	// serverListLog and acts as a TryLock().
75	notifyFailedBarrier int32
76
77	// offline is used to indicate that there are no servers, or that all
78	// known servers have failed the ping test.
79	offline int32
80}
81
82// AddServer takes out an internal write lock and adds a new server.  If the
83// server is not known, appends the server to the list.  The new server will
84// begin seeing use after the rebalance timer fires or enough servers fail
85// organically.  If the server is already known, merge the new server
86// details.
87func (m *Manager) AddServer(s *metadata.Server) {
88	m.listLock.Lock()
89	defer m.listLock.Unlock()
90	l := m.getServerList()
91
92	// Check if this server is known
93	found := false
94	for idx, existing := range l.servers {
95		if existing.Name == s.Name {
96			newServers := make([]*metadata.Server, len(l.servers))
97			copy(newServers, l.servers)
98
99			// Overwrite the existing server details in order to
100			// possibly update metadata (e.g. server version)
101			newServers[idx] = s
102
103			l.servers = newServers
104			found = true
105			break
106		}
107	}
108
109	// Add to the list if not known
110	if !found {
111		newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1)
112		copy(newServers, l.servers)
113		newServers = append(newServers, s)
114		l.servers = newServers
115	}
116
117	// Assume we are no longer offline since we've just seen a new server.
118	atomic.StoreInt32(&m.offline, 0)
119
120	// Start using this list of servers.
121	m.saveServerList(l)
122}
123
124// UpdateTLS updates the TLS setting for the servers in this manager
125func (m *Manager) UpdateTLS(useTLS bool) {
126	m.listLock.Lock()
127	defer m.listLock.Unlock()
128
129	list := m.getServerList()
130	for _, server := range list.servers {
131		server.UseTLS = useTLS
132	}
133	m.saveServerList(list)
134}
135
136// cycleServers returns a new list of servers that has dequeued the first
137// server and enqueued it at the end of the list.  cycleServers assumes the
138// caller is holding the listLock.  cycleServer does not test or ping
139// the next server inline.  cycleServer may be called when the environment
140// has just entered an unhealthy situation and blocking on a server test is
141// less desirable than just returning the next server in the firing line.  If
142// the next server fails, it will fail fast enough and cycleServer will be
143// called again.
144func (l *serverList) cycleServer() (servers []*metadata.Server) {
145	numServers := len(l.servers)
146	if numServers < 2 {
147		return servers // No action required
148	}
149
150	newServers := make([]*metadata.Server, 0, numServers)
151	newServers = append(newServers, l.servers[1:]...)
152	newServers = append(newServers, l.servers[0])
153
154	return newServers
155}
156
157// removeServerByKey performs an inline removal of the first matching server
158func (l *serverList) removeServerByKey(targetKey *metadata.Key) {
159	for i, s := range l.servers {
160		if targetKey.Equal(s.Key()) {
161			copy(l.servers[i:], l.servers[i+1:])
162			l.servers[len(l.servers)-1] = nil
163			l.servers = l.servers[:len(l.servers)-1]
164			return
165		}
166	}
167}
168
169// shuffleServers shuffles the server list in place
170func (l *serverList) shuffleServers() {
171	for i := len(l.servers) - 1; i > 0; i-- {
172		j := rand.Int31n(int32(i + 1))
173		l.servers[i], l.servers[j] = l.servers[j], l.servers[i]
174	}
175}
176
177// IsOffline checks to see if all the known servers have failed their ping
178// test during the last rebalance.
179func (m *Manager) IsOffline() bool {
180	offline := atomic.LoadInt32(&m.offline)
181	return offline == 1
182}
183
184// FindServer takes out an internal "read lock" and searches through the list
185// of servers to find a "healthy" server.  If the server is actually
186// unhealthy, we rely on Serf to detect this and remove the node from the
187// server list.  If the server at the front of the list has failed or fails
188// during an RPC call, it is rotated to the end of the list.  If there are no
189// servers available, return nil.
190func (m *Manager) FindServer() *metadata.Server {
191	l := m.getServerList()
192	numServers := len(l.servers)
193	if numServers == 0 {
194		m.logger.Warn("No servers available")
195		return nil
196	}
197
198	// Return whatever is at the front of the list because it is
199	// assumed to be the oldest in the server list (unless -
200	// hypothetically - the server list was rotated right after a
201	// server was added).
202	return l.servers[0]
203}
204
205func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool {
206	if m == nil {
207		return true
208	}
209
210	for _, srv := range m.getServerList().servers {
211		if !fn(srv) {
212			return false
213		}
214	}
215	return true
216}
217
218func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) {
219	_ = m.checkServers(fn)
220}
221
222// getServerList is a convenience method which hides the locking semantics
223// of atomic.Value from the caller.
224func (m *Manager) getServerList() serverList {
225	if m == nil {
226		return serverList{}
227	}
228	return m.listValue.Load().(serverList)
229}
230
231// saveServerList is a convenience method which hides the locking semantics
232// of atomic.Value from the caller.
233func (m *Manager) saveServerList(l serverList) {
234	m.listValue.Store(l)
235}
236
237// New is the only way to safely create a new Manager struct.
238func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string, rb Rebalancer) (m *Manager) {
239	if logger == nil {
240		logger = hclog.New(&hclog.LoggerOptions{})
241	}
242
243	m = new(Manager)
244	m.logger = logger.Named(logging.Manager)
245	m.clusterInfo = clusterInfo       // can't pass *consul.Client: import cycle
246	m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
247	m.rebalanceTimer = time.NewTimer(delayer.MinDelay)
248	m.shutdownCh = shutdownCh
249	m.rebalancer = rb
250	m.serverName = serverName
251	atomic.StoreInt32(&m.offline, 1)
252
253	l := serverList{}
254	l.servers = make([]*metadata.Server, 0)
255	m.saveServerList(l)
256	return m
257}
258
259// NotifyFailedServer marks the passed in server as "failed" by rotating it
260// to the end of the server list.
261func (m *Manager) NotifyFailedServer(s *metadata.Server) {
262	l := m.getServerList()
263
264	// If the server being failed is not the first server on the list,
265	// this is a noop.  If, however, the server is failed and first on
266	// the list, acquire the lock, retest, and take the penalty of moving
267	// the server to the end of the list.
268
269	// Only rotate the server list when there is more than one server
270	if len(l.servers) > 1 && l.servers[0].Name == s.Name &&
271		// Use atomic.CAS to emulate a TryLock().
272		atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) {
273		defer atomic.StoreInt32(&m.notifyFailedBarrier, 0)
274
275		// Grab a lock, retest, and take the hit of cycling the first
276		// server to the end.
277		m.listLock.Lock()
278		defer m.listLock.Unlock()
279		l = m.getServerList()
280
281		if len(l.servers) > 1 && l.servers[0].Name == s.Name {
282			l.servers = l.cycleServer()
283			m.saveServerList(l)
284			m.logger.Debug("cycled away from server", "server", s.String())
285		}
286	}
287}
288
289// NumServers takes out an internal "read lock" and returns the number of
290// servers.  numServers includes both healthy and unhealthy servers.
291func (m *Manager) NumServers() int {
292	l := m.getServerList()
293	return len(l.servers)
294}
295
296func (m *Manager) healthyServer(server *metadata.Server) bool {
297	// Check to see if the manager is trying to ping itself. This
298	// is a small optimization to avoid performing an unnecessary
299	// RPC call.
300	// If this is true, we know there are healthy servers for this
301	// manager and we don't need to continue.
302	if m.serverName != "" && server.Name == m.serverName {
303		return true
304	}
305	if ok, err := m.connPoolPinger.Ping(server.Datacenter, server.ShortName, server.Addr); !ok {
306		m.logger.Debug("pinging server failed",
307			"server", server.String(),
308			"error", err,
309		)
310		return false
311	}
312	return true
313}
314
315// RebalanceServers shuffles the list of servers on this metadata.  The server
316// at the front of the list is selected for the next RPC.  RPC calls that
317// fail for a particular server are rotated to the end of the list.  This
318// method reshuffles the list periodically in order to redistribute work
319// across all known consul servers (i.e. guarantee that the order of servers
320// in the server list is not positively correlated with the age of a server
321// in the Consul cluster).  Periodically shuffling the server list prevents
322// long-lived clients from fixating on long-lived servers.
323//
324// Unhealthy servers are removed when serf notices the server has been
325// deregistered.  Before the newly shuffled server list is saved, the new
326// remote endpoint is tested to ensure its responsive.
327func (m *Manager) RebalanceServers() {
328	// Obtain a copy of the current serverList
329	l := m.getServerList()
330
331	// Shuffle servers so we have a chance of picking a new one.
332	l.shuffleServers()
333
334	// Iterate through the shuffled server list to find an assumed
335	// healthy server.  NOTE: Do not iterate on the list directly because
336	// this loop mutates the server list in-place.
337	var foundHealthyServer bool
338	for i := 0; i < len(l.servers); i++ {
339		// Always test the first server. Failed servers are cycled
340		// while Serf detects the node has failed.
341		if m.healthyServer(l.servers[0]) {
342			foundHealthyServer = true
343			break
344		}
345		l.servers = l.cycleServer()
346	}
347
348	// If no healthy servers were found, sleep and wait for Serf to make
349	// the world a happy place again. Update the offline status.
350	if foundHealthyServer {
351		atomic.StoreInt32(&m.offline, 0)
352	} else {
353		atomic.StoreInt32(&m.offline, 1)
354		m.logger.Debug("No healthy servers during rebalance, aborting")
355		return
356	}
357
358	// Verify that all servers are present
359	if m.reconcileServerList(&l) {
360		m.logger.Debug("Rebalanced servers, new active server",
361			"number_of_servers", len(l.servers),
362			"active_server", l.servers[0].String(),
363		)
364	}
365	// else {
366	// reconcileServerList failed because Serf removed the server
367	// that was at the front of the list that had successfully
368	// been Ping'ed.  Between the Ping and reconcile, a Serf
369	// event had shown up removing the node.
370	//
371	// Instead of doing any heroics, "freeze in place" and
372	// continue to use the existing connection until the next
373	// rebalance occurs.
374	// }
375}
376
377// reconcileServerList returns true when the first server in serverList
378// exists in the receiver's serverList.  If true, the merged serverList is
379// stored as the receiver's serverList.  Returns false if the first server
380// does not exist in the list (i.e. was removed by Serf during a
381// PingConsulServer() call.  Newly added servers are appended to the list and
382// other missing servers are removed from the list.
383func (m *Manager) reconcileServerList(l *serverList) bool {
384	m.listLock.Lock()
385	defer m.listLock.Unlock()
386
387	// newServerCfg is a serverList that has been kept up to date with
388	// Serf node join and node leave events.
389	newServerCfg := m.getServerList()
390
391	// If Serf has removed all nodes, or there is no selected server
392	// (zero nodes in serverList), abort early.
393	if len(newServerCfg.servers) == 0 || len(l.servers) == 0 {
394		return false
395	}
396
397	type targetServer struct {
398		server *metadata.Server
399
400		//   'b' == both
401		//   'o' == original
402		//   'n' == new
403		state byte
404	}
405	mergedList := make(map[metadata.Key]*targetServer, len(l.servers))
406	for _, s := range l.servers {
407		mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
408	}
409	for _, s := range newServerCfg.servers {
410		k := s.Key()
411		_, found := mergedList[*k]
412		if found {
413			mergedList[*k].state = 'b'
414		} else {
415			mergedList[*k] = &targetServer{server: s, state: 'n'}
416		}
417	}
418
419	// Ensure the selected server has not been removed by Serf
420	selectedServerKey := l.servers[0].Key()
421	if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
422		return false
423	}
424
425	// Append any new servers and remove any old servers
426	for k, v := range mergedList {
427		switch v.state {
428		case 'b':
429			// Do nothing, server exists in both
430		case 'o':
431			// Server has been removed
432			l.removeServerByKey(&k)
433		case 'n':
434			// Server added
435			l.servers = append(l.servers, v.server)
436		default:
437			panic("unknown merge list state")
438		}
439	}
440
441	m.saveServerList(*l)
442	return true
443}
444
445// RemoveServer takes out an internal write lock and removes a server from
446// the server list.
447func (m *Manager) RemoveServer(s *metadata.Server) {
448	m.listLock.Lock()
449	defer m.listLock.Unlock()
450	l := m.getServerList()
451
452	// Remove the server if known
453	for i := range l.servers {
454		if l.servers[i].Name == s.Name {
455			newServers := make([]*metadata.Server, 0, len(l.servers)-1)
456			newServers = append(newServers, l.servers[:i]...)
457			newServers = append(newServers, l.servers[i+1:]...)
458			l.servers = newServers
459
460			m.saveServerList(l)
461			return
462		}
463	}
464}
465
466// ResetRebalanceTimer resets the rebalance timer.  This method exists for
467// testing and should not be used directly.
468func (m *Manager) ResetRebalanceTimer() {
469	m.listLock.Lock()
470	defer m.listLock.Unlock()
471	m.rebalanceTimer.Reset(delayer.MinDelay)
472}
473
474// Run periodically shuffles the list of servers to evenly distribute load.
475// Run exits when shutdownCh is closed.
476//
477// When a server fails it is moved to the end of the list, and new servers are
478// appended to the end of the list. Run ensures that load is distributed evenly
479// to all servers by randomly shuffling the list.
480func (m *Manager) Run() {
481	for {
482		select {
483		case <-m.rebalanceTimer.C:
484			m.rebalancer()
485			m.RebalanceServers()
486			delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes())
487			m.rebalanceTimer.Reset(delay)
488
489		case <-m.shutdownCh:
490			m.logger.Info("shutting down")
491			return
492		}
493	}
494}
495
496// delayer is used to calculate the time to wait between calls to rebalance the
497// servers. Rebalancing is necessary to ensure that load is balanced evenly
498// across all the servers.
499//
500// The values used by delayer must balance perfectly distributed server load
501// against the overhead of a client reconnecting to a server. Rebalancing on
502// every request would cause a lot of unnecessary load as clients reconnect,
503// where as never rebalancing would lead to situations where one or two servers
504// handle a lot more requests than others.
505//
506// These values result in a minimum delay of 120-180s. Once the number of
507// nodes/server exceeds 11520, the value will be determined by multiplying the
508// node/server ratio by 15.625ms.
509var delayer = rebalanceDelayer{
510	MinDelay:  2 * time.Minute,
511	MaxJitter: time.Minute,
512	// Once the number of nodes/server exceeds 11520 this value is used to
513	// increase the delay between rebalances to set a limit on the number of
514	// reconnections per server in a given time frame.
515	//
516	// A higher value comes at the cost of increased recovery time after a
517	// partition.
518	//
519	// For example, in a 100,000 node consul cluster with 5 servers, it will
520	// take ~5min for all clients to rebalance their connections.  If
521	// 99,995 agents are in the minority talking to only one server, it
522	// will take ~26min for all clients to rebalance.  A 10K cluster in
523	// the same scenario will take ~2.6min to rebalance.
524	DelayPerNode: 15*time.Millisecond + 625*time.Microsecond,
525}
526
527type rebalanceDelayer struct {
528	// MinDelay that may be returned by Delay
529	MinDelay time.Duration
530	// MaxJitter to add to MinDelay to ensure there is some randomness in the
531	// delay.
532	MaxJitter time.Duration
533	// DelayPerNode is the duration to add to each node when calculating delay.
534	// The value is divided by the number of servers to arrive at the final
535	// delay value.
536	DelayPerNode time.Duration
537}
538
539func (d *rebalanceDelayer) Delay(servers int, nodes int) time.Duration {
540	min := d.MinDelay + time.Duration(rand.Int63n(int64(d.MaxJitter)))
541	if servers == 0 {
542		return min
543	}
544
545	delay := time.Duration(float64(nodes) * float64(d.DelayPerNode) / float64(servers))
546	if delay < min {
547		return min
548	}
549	return delay
550}
551