1// Package servers provides a Manager interface for Manager managed
2// metadata.Server objects.  The servers package manages servers from a Consul
3// client's perspective (i.e. a list of servers that a client talks with for
4// RPCs).  The servers package does not provide any API guarantees and should
5// be called only by `hashicorp/consul`.
6package router
7
8import (
9	"math/rand"
10	"net"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/hashicorp/consul/agent/metadata"
16	"github.com/hashicorp/consul/lib"
17	"github.com/hashicorp/consul/logging"
18	"github.com/hashicorp/go-hclog"
19)
20
21const (
22	// clientRPCJitterFraction determines the amount of jitter added to
23	// clientRPCMinReuseDuration before a connection is expired and a new
24	// connection is established in order to rebalance load across consul
25	// servers.  The cluster-wide number of connections per second from
26	// rebalancing is applied after this jitter to ensure the CPU impact
27	// is always finite.  See newRebalanceConnsPerSecPerServer's comment
28	// for additional commentary.
29	//
30	// For example, in a 10K consul cluster with 5x servers, this default
31	// averages out to ~13 new connections from rebalancing per server
32	// per second (each connection is reused for 120s to 180s).
33	clientRPCJitterFraction = 2
34
35	// clientRPCMinReuseDuration controls the minimum amount of time RPC
36	// queries are sent over an established connection to a single server
37	clientRPCMinReuseDuration = 120 * time.Second
38
39	// Limit the number of new connections a server receives per second
40	// for connection rebalancing.  This limit caps the load caused by
41	// continual rebalancing efforts when a cluster is in equilibrium.  A
42	// lower value comes at the cost of increased recovery time after a
43	// partition.  This parameter begins to take effect when there are
44	// more than ~48K clients querying 5x servers or at lower server
45	// values when there is a partition.
46	//
47	// For example, in a 100K consul cluster with 5x servers, it will
48	// take ~5min for all servers to rebalance their connections.  If
49	// 99,995 agents are in the minority talking to only one server, it
50	// will take ~26min for all servers to rebalance.  A 10K cluster in
51	// the same scenario will take ~2.6min to rebalance.
52	newRebalanceConnsPerSecPerServer = 64
53)
54
55// ManagerSerfCluster is an interface wrapper around Serf in order to make this
56// easier to unit test.
57type ManagerSerfCluster interface {
58	NumNodes() int
59}
60
61// Pinger is an interface wrapping client.ConnPool to prevent a cyclic import
62// dependency.
63type Pinger interface {
64	Ping(dc string, addr net.Addr, version int, useTLS bool) (bool, error)
65}
66
67// serverList is a local copy of the struct used to maintain the list of
68// Consul servers used by Manager.
69//
70// NOTE(sean@): We are explicitly relying on the fact that serverList will
71// be copied onto the stack.  Please keep this structure light.
72type serverList struct {
73	// servers tracks the locally known servers.  List membership is
74	// maintained by Serf.
75	servers []*metadata.Server
76}
77
78type Manager struct {
79	// listValue manages the atomic load/store of a Manager's serverList
80	listValue atomic.Value
81	listLock  sync.Mutex
82
83	// rebalanceTimer controls the duration of the rebalance interval
84	rebalanceTimer *time.Timer
85
86	// shutdownCh is a copy of the channel in consul.Client
87	shutdownCh chan struct{}
88
89	logger hclog.Logger
90
91	// clusterInfo is used to estimate the approximate number of nodes in
92	// a cluster and limit the rate at which it rebalances server
93	// connections.  ManagerSerfCluster is an interface that wraps serf.
94	clusterInfo ManagerSerfCluster
95
96	// connPoolPinger is used to test the health of a server in the
97	// connection pool.  Pinger is an interface that wraps
98	// client.ConnPool.
99	connPoolPinger Pinger
100
101	// notifyFailedBarrier is acts as a barrier to prevent queuing behind
102	// serverListLog and acts as a TryLock().
103	notifyFailedBarrier int32
104
105	// offline is used to indicate that there are no servers, or that all
106	// known servers have failed the ping test.
107	offline int32
108}
109
110// AddServer takes out an internal write lock and adds a new server.  If the
111// server is not known, appends the server to the list.  The new server will
112// begin seeing use after the rebalance timer fires or enough servers fail
113// organically.  If the server is already known, merge the new server
114// details.
115func (m *Manager) AddServer(s *metadata.Server) {
116	m.listLock.Lock()
117	defer m.listLock.Unlock()
118	l := m.getServerList()
119
120	// Check if this server is known
121	found := false
122	for idx, existing := range l.servers {
123		if existing.Name == s.Name {
124			newServers := make([]*metadata.Server, len(l.servers))
125			copy(newServers, l.servers)
126
127			// Overwrite the existing server details in order to
128			// possibly update metadata (e.g. server version)
129			newServers[idx] = s
130
131			l.servers = newServers
132			found = true
133			break
134		}
135	}
136
137	// Add to the list if not known
138	if !found {
139		newServers := make([]*metadata.Server, len(l.servers), len(l.servers)+1)
140		copy(newServers, l.servers)
141		newServers = append(newServers, s)
142		l.servers = newServers
143	}
144
145	// Assume we are no longer offline since we've just seen a new server.
146	atomic.StoreInt32(&m.offline, 0)
147
148	// Start using this list of servers.
149	m.saveServerList(l)
150}
151
152// UpdateTLS updates the TLS setting for the servers in this manager
153func (m *Manager) UpdateTLS(useTLS bool) {
154	m.listLock.Lock()
155	defer m.listLock.Unlock()
156
157	list := m.getServerList()
158	for _, server := range list.servers {
159		server.UseTLS = useTLS
160	}
161	m.saveServerList(list)
162}
163
164// cycleServers returns a new list of servers that has dequeued the first
165// server and enqueued it at the end of the list.  cycleServers assumes the
166// caller is holding the listLock.  cycleServer does not test or ping
167// the next server inline.  cycleServer may be called when the environment
168// has just entered an unhealthy situation and blocking on a server test is
169// less desirable than just returning the next server in the firing line.  If
170// the next server fails, it will fail fast enough and cycleServer will be
171// called again.
172func (l *serverList) cycleServer() (servers []*metadata.Server) {
173	numServers := len(l.servers)
174	if numServers < 2 {
175		return servers // No action required
176	}
177
178	newServers := make([]*metadata.Server, 0, numServers)
179	newServers = append(newServers, l.servers[1:]...)
180	newServers = append(newServers, l.servers[0])
181
182	return newServers
183}
184
185// removeServerByKey performs an inline removal of the first matching server
186func (l *serverList) removeServerByKey(targetKey *metadata.Key) {
187	for i, s := range l.servers {
188		if targetKey.Equal(s.Key()) {
189			copy(l.servers[i:], l.servers[i+1:])
190			l.servers[len(l.servers)-1] = nil
191			l.servers = l.servers[:len(l.servers)-1]
192			return
193		}
194	}
195}
196
197// shuffleServers shuffles the server list in place
198func (l *serverList) shuffleServers() {
199	for i := len(l.servers) - 1; i > 0; i-- {
200		j := rand.Int31n(int32(i + 1))
201		l.servers[i], l.servers[j] = l.servers[j], l.servers[i]
202	}
203}
204
205// IsOffline checks to see if all the known servers have failed their ping
206// test during the last rebalance.
207func (m *Manager) IsOffline() bool {
208	offline := atomic.LoadInt32(&m.offline)
209	return offline == 1
210}
211
212// FindServer takes out an internal "read lock" and searches through the list
213// of servers to find a "healthy" server.  If the server is actually
214// unhealthy, we rely on Serf to detect this and remove the node from the
215// server list.  If the server at the front of the list has failed or fails
216// during an RPC call, it is rotated to the end of the list.  If there are no
217// servers available, return nil.
218func (m *Manager) FindServer() *metadata.Server {
219	l := m.getServerList()
220	numServers := len(l.servers)
221	if numServers == 0 {
222		m.logger.Warn("No servers available")
223		return nil
224	}
225
226	// Return whatever is at the front of the list because it is
227	// assumed to be the oldest in the server list (unless -
228	// hypothetically - the server list was rotated right after a
229	// server was added).
230	return l.servers[0]
231}
232
233func (m *Manager) checkServers(fn func(srv *metadata.Server) bool) bool {
234	for _, srv := range m.getServerList().servers {
235		if !fn(srv) {
236			return false
237		}
238	}
239	return true
240}
241
242func (m *Manager) CheckServers(fn func(srv *metadata.Server) bool) {
243	_ = m.checkServers(fn)
244}
245
246// getServerList is a convenience method which hides the locking semantics
247// of atomic.Value from the caller.
248func (m *Manager) getServerList() serverList {
249	return m.listValue.Load().(serverList)
250}
251
252// saveServerList is a convenience method which hides the locking semantics
253// of atomic.Value from the caller.
254func (m *Manager) saveServerList(l serverList) {
255	m.listValue.Store(l)
256}
257
258// New is the only way to safely create a new Manager struct.
259func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger) (m *Manager) {
260	if logger == nil {
261		logger = hclog.New(&hclog.LoggerOptions{})
262	}
263
264	m = new(Manager)
265	m.logger = logger.Named(logging.Manager)
266	m.clusterInfo = clusterInfo       // can't pass *consul.Client: import cycle
267	m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
268	m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
269	m.shutdownCh = shutdownCh
270	atomic.StoreInt32(&m.offline, 1)
271
272	l := serverList{}
273	l.servers = make([]*metadata.Server, 0)
274	m.saveServerList(l)
275	return m
276}
277
278// NotifyFailedServer marks the passed in server as "failed" by rotating it
279// to the end of the server list.
280func (m *Manager) NotifyFailedServer(s *metadata.Server) {
281	l := m.getServerList()
282
283	// If the server being failed is not the first server on the list,
284	// this is a noop.  If, however, the server is failed and first on
285	// the list, acquire the lock, retest, and take the penalty of moving
286	// the server to the end of the list.
287
288	// Only rotate the server list when there is more than one server
289	if len(l.servers) > 1 && l.servers[0].Name == s.Name &&
290		// Use atomic.CAS to emulate a TryLock().
291		atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) {
292		defer atomic.StoreInt32(&m.notifyFailedBarrier, 0)
293
294		// Grab a lock, retest, and take the hit of cycling the first
295		// server to the end.
296		m.listLock.Lock()
297		defer m.listLock.Unlock()
298		l = m.getServerList()
299
300		if len(l.servers) > 1 && l.servers[0].Name == s.Name {
301			l.servers = l.cycleServer()
302			m.saveServerList(l)
303			m.logger.Debug("cycled away from server", "server", s.String())
304		}
305	}
306}
307
308// NumServers takes out an internal "read lock" and returns the number of
309// servers.  numServers includes both healthy and unhealthy servers.
310func (m *Manager) NumServers() int {
311	l := m.getServerList()
312	return len(l.servers)
313}
314
315// RebalanceServers shuffles the list of servers on this metadata.  The server
316// at the front of the list is selected for the next RPC.  RPC calls that
317// fail for a particular server are rotated to the end of the list.  This
318// method reshuffles the list periodically in order to redistribute work
319// across all known consul servers (i.e. guarantee that the order of servers
320// in the server list is not positively correlated with the age of a server
321// in the Consul cluster).  Periodically shuffling the server list prevents
322// long-lived clients from fixating on long-lived servers.
323//
324// Unhealthy servers are removed when serf notices the server has been
325// deregistered.  Before the newly shuffled server list is saved, the new
326// remote endpoint is tested to ensure its responsive.
327func (m *Manager) RebalanceServers() {
328	// Obtain a copy of the current serverList
329	l := m.getServerList()
330
331	// Shuffle servers so we have a chance of picking a new one.
332	l.shuffleServers()
333
334	// Iterate through the shuffled server list to find an assumed
335	// healthy server.  NOTE: Do not iterate on the list directly because
336	// this loop mutates the server list in-place.
337	var foundHealthyServer bool
338	for i := 0; i < len(l.servers); i++ {
339		// Always test the first server.  Failed servers are cycled
340		// while Serf detects the node has failed.
341		srv := l.servers[0]
342
343		ok, err := m.connPoolPinger.Ping(srv.Datacenter, srv.Addr, srv.Version, srv.UseTLS)
344		if ok {
345			foundHealthyServer = true
346			break
347		}
348		m.logger.Debug("pinging server failed",
349			"server", srv.String(),
350			"error", err,
351		)
352		l.servers = l.cycleServer()
353	}
354
355	// If no healthy servers were found, sleep and wait for Serf to make
356	// the world a happy place again. Update the offline status.
357	if foundHealthyServer {
358		atomic.StoreInt32(&m.offline, 0)
359	} else {
360		atomic.StoreInt32(&m.offline, 1)
361		m.logger.Debug("No healthy servers during rebalance, aborting")
362		return
363	}
364
365	// Verify that all servers are present
366	if m.reconcileServerList(&l) {
367		m.logger.Debug("Rebalanced servers, new active server",
368			"number_of_servers", len(l.servers),
369			"active_server", l.servers[0].String(),
370		)
371	} else {
372		// reconcileServerList failed because Serf removed the server
373		// that was at the front of the list that had successfully
374		// been Ping'ed.  Between the Ping and reconcile, a Serf
375		// event had shown up removing the node.
376		//
377		// Instead of doing any heroics, "freeze in place" and
378		// continue to use the existing connection until the next
379		// rebalance occurs.
380	}
381}
382
383// reconcileServerList returns true when the first server in serverList
384// exists in the receiver's serverList.  If true, the merged serverList is
385// stored as the receiver's serverList.  Returns false if the first server
386// does not exist in the list (i.e. was removed by Serf during a
387// PingConsulServer() call.  Newly added servers are appended to the list and
388// other missing servers are removed from the list.
389func (m *Manager) reconcileServerList(l *serverList) bool {
390	m.listLock.Lock()
391	defer m.listLock.Unlock()
392
393	// newServerCfg is a serverList that has been kept up to date with
394	// Serf node join and node leave events.
395	newServerCfg := m.getServerList()
396
397	// If Serf has removed all nodes, or there is no selected server
398	// (zero nodes in serverList), abort early.
399	if len(newServerCfg.servers) == 0 || len(l.servers) == 0 {
400		return false
401	}
402
403	type targetServer struct {
404		server *metadata.Server
405
406		//   'b' == both
407		//   'o' == original
408		//   'n' == new
409		state byte
410	}
411	mergedList := make(map[metadata.Key]*targetServer, len(l.servers))
412	for _, s := range l.servers {
413		mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
414	}
415	for _, s := range newServerCfg.servers {
416		k := s.Key()
417		_, found := mergedList[*k]
418		if found {
419			mergedList[*k].state = 'b'
420		} else {
421			mergedList[*k] = &targetServer{server: s, state: 'n'}
422		}
423	}
424
425	// Ensure the selected server has not been removed by Serf
426	selectedServerKey := l.servers[0].Key()
427	if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
428		return false
429	}
430
431	// Append any new servers and remove any old servers
432	for k, v := range mergedList {
433		switch v.state {
434		case 'b':
435			// Do nothing, server exists in both
436		case 'o':
437			// Server has been removed
438			l.removeServerByKey(&k)
439		case 'n':
440			// Server added
441			l.servers = append(l.servers, v.server)
442		default:
443			panic("unknown merge list state")
444		}
445	}
446
447	m.saveServerList(*l)
448	return true
449}
450
451// RemoveServer takes out an internal write lock and removes a server from
452// the server list.
453func (m *Manager) RemoveServer(s *metadata.Server) {
454	m.listLock.Lock()
455	defer m.listLock.Unlock()
456	l := m.getServerList()
457
458	// Remove the server if known
459	for i := range l.servers {
460		if l.servers[i].Name == s.Name {
461			newServers := make([]*metadata.Server, 0, len(l.servers)-1)
462			newServers = append(newServers, l.servers[:i]...)
463			newServers = append(newServers, l.servers[i+1:]...)
464			l.servers = newServers
465
466			m.saveServerList(l)
467			return
468		}
469	}
470}
471
472// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
473func (m *Manager) refreshServerRebalanceTimer() time.Duration {
474	l := m.getServerList()
475	numServers := len(l.servers)
476	// Limit this connection's life based on the size (and health) of the
477	// cluster.  Never rebalance a connection more frequently than
478	// connReuseLowWatermarkDuration, and make sure we never exceed
479	// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
480	clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
481	connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
482	numLANMembers := m.clusterInfo.NumNodes()
483	connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
484
485	m.rebalanceTimer.Reset(connRebalanceTimeout)
486	return connRebalanceTimeout
487}
488
489// ResetRebalanceTimer resets the rebalance timer.  This method exists for
490// testing and should not be used directly.
491func (m *Manager) ResetRebalanceTimer() {
492	m.listLock.Lock()
493	defer m.listLock.Unlock()
494	m.rebalanceTimer.Reset(clientRPCMinReuseDuration)
495}
496
497// Start is used to start and manage the task of automatically shuffling and
498// rebalancing the list of Consul servers.  This maintenance only happens
499// periodically based on the expiration of the timer.  Failed servers are
500// automatically cycled to the end of the list.  New servers are appended to
501// the list.  The order of the server list must be shuffled periodically to
502// distribute load across all known and available Consul servers.
503func (m *Manager) Start() {
504	for {
505		select {
506		case <-m.rebalanceTimer.C:
507			m.RebalanceServers()
508			m.refreshServerRebalanceTimer()
509
510		case <-m.shutdownCh:
511			m.logger.Info("shutting down")
512			return
513		}
514	}
515}
516