1package router
2
3import (
4	"fmt"
5	"sort"
6	"sync"
7
8	"github.com/hashicorp/consul/agent/metadata"
9	"github.com/hashicorp/consul/agent/structs"
10	"github.com/hashicorp/consul/lib"
11	"github.com/hashicorp/consul/logging"
12	"github.com/hashicorp/consul/types"
13	"github.com/hashicorp/go-hclog"
14	"github.com/hashicorp/serf/coordinate"
15	"github.com/hashicorp/serf/serf"
16)
17
18// Router keeps track of a set of network areas and their associated Serf
19// membership of Consul servers. It then indexes this by datacenter to provide
20// healthy routes to servers by datacenter.
21type Router struct {
22	// logger is used for diagnostic output.
23	logger hclog.Logger
24
25	// localDatacenter has the name of the router's home datacenter. This is
26	// used to short-circuit RTT calculations for local servers.
27	localDatacenter string
28
29	// serverName has the name of the router's server. This is used to
30	// short-circuit pinging to itself.
31	serverName string
32
33	// areas maps area IDs to structures holding information about that
34	// area.
35	areas map[types.AreaID]*areaInfo
36
37	// managers is an index from datacenter names to a list of server
38	// managers for that datacenter. This is used to quickly lookup routes.
39	managers map[string][]*Manager
40
41	// routeFn is a hook to actually do the routing.
42	routeFn func(datacenter string) (*Manager, *metadata.Server, bool)
43
44	// isShutdown prevents adding new routes to a router after it is shut
45	// down.
46	isShutdown bool
47
48	// This top-level lock covers all the internal state.
49	sync.RWMutex
50}
51
52// RouterSerfCluster is an interface wrapper around Serf in order to make this
53// easier to unit test.
54type RouterSerfCluster interface {
55	NumNodes() int
56	Members() []serf.Member
57	GetCoordinate() (*coordinate.Coordinate, error)
58	GetCachedCoordinate(name string) (*coordinate.Coordinate, bool)
59}
60
61// managerInfo holds a server manager for a datacenter along with its associated
62// shutdown channel.
63type managerInfo struct {
64	// manager is notified about servers for this datacenter.
65	manager *Manager
66
67	// shutdownCh is only given to this manager so we can shut it down when
68	// all servers for this datacenter are gone.
69	shutdownCh chan struct{}
70}
71
72// areaInfo holds information about a given network area.
73type areaInfo struct {
74	// cluster is the Serf instance for this network area.
75	cluster RouterSerfCluster
76
77	// pinger is used to ping servers in this network area when trying to
78	// find a new, healthy server to talk to.
79	pinger Pinger
80
81	// managers maps datacenter names to managers for that datacenter in
82	// this area.
83	managers map[string]*managerInfo
84
85	// useTLS specifies whether to use TLS to communicate for this network area.
86	useTLS bool
87}
88
89// NewRouter returns a new Router with the given configuration.
90func NewRouter(logger hclog.Logger, localDatacenter, serverName string) *Router {
91	if logger == nil {
92		logger = hclog.New(&hclog.LoggerOptions{})
93	}
94
95	router := &Router{
96		logger:          logger.Named(logging.Router),
97		localDatacenter: localDatacenter,
98		serverName:      serverName,
99		areas:           make(map[types.AreaID]*areaInfo),
100		managers:        make(map[string][]*Manager),
101	}
102
103	// Hook the direct route lookup by default.
104	router.routeFn = router.findDirectRoute
105
106	return router
107}
108
109// Shutdown removes all areas from the router, which stops all their respective
110// managers. No new areas can be added after the router is shut down.
111func (r *Router) Shutdown() {
112	r.Lock()
113	defer r.Unlock()
114
115	for areaID, area := range r.areas {
116		for datacenter, info := range area.managers {
117			r.removeManagerFromIndex(datacenter, info.manager)
118			close(info.shutdownCh)
119		}
120
121		delete(r.areas, areaID)
122	}
123
124	r.isShutdown = true
125}
126
127// AddArea registers a new network area with the router.
128func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
129	r.Lock()
130	defer r.Unlock()
131
132	if r.isShutdown {
133		return fmt.Errorf("cannot add area, router is shut down")
134	}
135
136	if _, ok := r.areas[areaID]; ok {
137		return fmt.Errorf("area ID %q already exists", areaID)
138	}
139
140	area := &areaInfo{
141		cluster:  cluster,
142		pinger:   pinger,
143		managers: make(map[string]*managerInfo),
144	}
145	r.areas[areaID] = area
146
147	// Do an initial populate of the manager so that we don't have to wait
148	// for events to fire. This lets us attempt to use all the known servers
149	// initially, and then will quickly detect that they are failed if we
150	// can't reach them.
151	for _, m := range cluster.Members() {
152		ok, parts := metadata.IsConsulServer(m)
153		if !ok {
154			r.logger.Warn("Non-server in server-only area",
155				"non_server", m.Name,
156				"area", areaID,
157			)
158			continue
159		}
160
161		if err := r.addServer(area, parts); err != nil {
162			return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
163		}
164	}
165
166	return nil
167}
168
169// GetServerMetadataByAddr returns server metadata by dc and address. If it
170// didn't find anything, nil is returned.
171func (r *Router) GetServerMetadataByAddr(dc, addr string) *metadata.Server {
172	r.RLock()
173	defer r.RUnlock()
174	if ms, ok := r.managers[dc]; ok {
175		for _, m := range ms {
176			for _, s := range m.getServerList().servers {
177				if s.Addr.String() == addr {
178					return s
179				}
180			}
181		}
182	}
183	return nil
184}
185
186// removeManagerFromIndex does cleanup to take a manager out of the index of
187// datacenters. This assumes the lock is already held for writing, and will
188// panic if the given manager isn't found.
189func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
190	managers := r.managers[datacenter]
191	for i := 0; i < len(managers); i++ {
192		if managers[i] == manager {
193			r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
194			if len(r.managers[datacenter]) == 0 {
195				delete(r.managers, datacenter)
196			}
197			return
198		}
199	}
200	panic("managers index out of sync")
201}
202
203// Returns whether TLS is enabled for the given area ID
204func (r *Router) TLSEnabled(areaID types.AreaID) (bool, error) {
205	r.RLock()
206	defer r.RUnlock()
207
208	area, ok := r.areas[areaID]
209	if !ok {
210		return false, fmt.Errorf("area ID %q does not exist", areaID)
211	}
212
213	return area.useTLS, nil
214}
215
216// RemoveArea removes an existing network area from the router.
217func (r *Router) RemoveArea(areaID types.AreaID) error {
218	r.Lock()
219	defer r.Unlock()
220
221	area, ok := r.areas[areaID]
222	if !ok {
223		return fmt.Errorf("area ID %q does not exist", areaID)
224	}
225
226	// Remove all of this area's managers from the index and shut them down.
227	for datacenter, info := range area.managers {
228		r.removeManagerFromIndex(datacenter, info.manager)
229		close(info.shutdownCh)
230	}
231
232	delete(r.areas, areaID)
233	return nil
234}
235
236// addServer does the work of AddServer once the write lock is held.
237func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
238	// Make the manager on the fly if this is the first we've seen of it,
239	// and add it to the index.
240	info, ok := area.managers[s.Datacenter]
241	if !ok {
242		shutdownCh := make(chan struct{})
243		manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
244		info = &managerInfo{
245			manager:    manager,
246			shutdownCh: shutdownCh,
247		}
248		area.managers[s.Datacenter] = info
249
250		managers := r.managers[s.Datacenter]
251		r.managers[s.Datacenter] = append(managers, manager)
252		go manager.Start()
253	}
254
255	// If TLS is enabled for the area, set it on the server so the manager
256	// knows to use TLS when pinging it.
257	if area.useTLS {
258		s.UseTLS = true
259	}
260
261	info.manager.AddServer(s)
262	return nil
263}
264
265// AddServer should be called whenever a new server joins an area. This is
266// typically hooked into the Serf event handler area for this area.
267func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error {
268	r.Lock()
269	defer r.Unlock()
270
271	area, ok := r.areas[areaID]
272	if !ok {
273		return fmt.Errorf("area ID %q does not exist", areaID)
274	}
275	return r.addServer(area, s)
276}
277
278// RemoveServer should be called whenever a server is removed from an area. This
279// is typically hooked into the Serf event handler area for this area.
280func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
281	r.Lock()
282	defer r.Unlock()
283
284	area, ok := r.areas[areaID]
285	if !ok {
286		return fmt.Errorf("area ID %q does not exist", areaID)
287	}
288
289	// If the manager has already been removed we just quietly exit. This
290	// can get called by Serf events, so the timing isn't totally
291	// deterministic.
292	info, ok := area.managers[s.Datacenter]
293	if !ok {
294		return nil
295	}
296	info.manager.RemoveServer(s)
297
298	// If this manager is empty then remove it so we don't accumulate cruft
299	// and waste time during request routing.
300	if num := info.manager.NumServers(); num == 0 {
301		r.removeManagerFromIndex(s.Datacenter, info.manager)
302		close(info.shutdownCh)
303		delete(area.managers, s.Datacenter)
304	}
305
306	return nil
307}
308
309// FailServer should be called whenever a server is failed in an area. This
310// is typically hooked into the Serf event handler area for this area. We will
311// immediately shift traffic away from this server, but it will remain in the
312// list of servers.
313func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error {
314	r.RLock()
315	defer r.RUnlock()
316
317	area, ok := r.areas[areaID]
318	if !ok {
319		return fmt.Errorf("area ID %q does not exist", areaID)
320	}
321
322	// If the manager has already been removed we just quietly exit. This
323	// can get called by Serf events, so the timing isn't totally
324	// deterministic.
325	info, ok := area.managers[s.Datacenter]
326	if !ok {
327		return nil
328	}
329
330	info.manager.NotifyFailedServer(s)
331	return nil
332}
333
334// FindRoute returns a healthy server with a route to the given datacenter. The
335// Boolean return parameter will indicate if a server was available. In some
336// cases this may return a best-effort unhealthy server that can be used for a
337// connection attempt. If any problem occurs with the given server, the caller
338// should feed that back to the manager associated with the server, which is
339// also returned, by calling NotifyFailedServer().
340func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) {
341	return r.routeFn(datacenter)
342}
343
344// findDirectRoute looks for a route to the given datacenter if it's directly
345// adjacent to the server.
346func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) {
347	r.RLock()
348	defer r.RUnlock()
349
350	// Get the list of managers for this datacenter. This will usually just
351	// have one entry, but it's possible to have a user-defined area + WAN.
352	managers, ok := r.managers[datacenter]
353	if !ok {
354		return nil, nil, false
355	}
356
357	// Try each manager until we get a server.
358	for _, manager := range managers {
359		if manager.IsOffline() {
360			continue
361		}
362
363		if s := manager.FindServer(); s != nil {
364			return manager, s, true
365		}
366	}
367
368	// Didn't find a route (even via an unhealthy server).
369	return nil, nil, false
370}
371
372// CheckServers returns thwo things
373// 1. bool to indicate whether any servers were processed
374// 2. error if any propagated from the fn
375//
376// The fn called should return a bool indicating whether checks should continue and an error
377// If an error is returned then checks will stop immediately
378func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) {
379	r.RLock()
380	defer r.RUnlock()
381
382	managers, ok := r.managers[dc]
383	if !ok {
384		return
385	}
386
387	for _, m := range managers {
388		if !m.checkServers(fn) {
389			return
390		}
391	}
392}
393
394// GetDatacenters returns a list of datacenters known to the router, sorted by
395// name.
396func (r *Router) GetDatacenters() []string {
397	r.RLock()
398	defer r.RUnlock()
399
400	dcs := make([]string, 0, len(r.managers))
401	for dc := range r.managers {
402		dcs = append(dcs, dc)
403	}
404
405	sort.Strings(dcs)
406	return dcs
407}
408
409// HasDatacenter checks whether dc is defined in WAN
410func (r *Router) HasDatacenter(dc string) bool {
411	r.RLock()
412	defer r.RUnlock()
413	_, ok := r.managers[dc]
414	return ok
415}
416
417// datacenterSorter takes a list of DC names and a parallel vector of distances
418// and implements sort.Interface, keeping both structures coherent and sorting
419// by distance.
420type datacenterSorter struct {
421	Names []string
422	Vec   []float64
423}
424
425// See sort.Interface.
426func (n *datacenterSorter) Len() int {
427	return len(n.Names)
428}
429
430// See sort.Interface.
431func (n *datacenterSorter) Swap(i, j int) {
432	n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
433	n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
434}
435
436// See sort.Interface.
437func (n *datacenterSorter) Less(i, j int) bool {
438	return n.Vec[i] < n.Vec[j]
439}
440
441// GetDatacentersByDistance returns a list of datacenters known to the router,
442// sorted by median RTT from this server to the servers in each datacenter. If
443// there are multiple areas that reach a given datacenter, this will use the
444// lowest RTT for the sort.
445func (r *Router) GetDatacentersByDistance() ([]string, error) {
446	r.RLock()
447	defer r.RUnlock()
448
449	// Go through each area and aggregate the median RTT from the current
450	// server to the other servers in each datacenter.
451	dcs := make(map[string]float64)
452	for areaID, info := range r.areas {
453		index := make(map[string][]float64)
454		coord, err := info.cluster.GetCoordinate()
455		if err != nil {
456			return nil, err
457		}
458
459		for _, m := range info.cluster.Members() {
460			ok, parts := metadata.IsConsulServer(m)
461			if !ok {
462				r.logger.Warn("Non-server in server-only area",
463					"non_server", m.Name,
464					"area", areaID,
465				)
466				continue
467			}
468
469			if m.Status == serf.StatusLeft {
470				r.logger.Debug("server in area left, skipping",
471					"server", m.Name,
472					"area", areaID,
473				)
474				continue
475			}
476
477			existing := index[parts.Datacenter]
478			if parts.Datacenter == r.localDatacenter {
479				// Everything in the local datacenter looks like zero RTT.
480				index[parts.Datacenter] = append(existing, 0.0)
481			} else {
482				// It's OK to get a nil coordinate back, ComputeDistance
483				// will put the RTT at positive infinity.
484				other, _ := info.cluster.GetCachedCoordinate(parts.Name)
485				rtt := lib.ComputeDistance(coord, other)
486				index[parts.Datacenter] = append(existing, rtt)
487			}
488		}
489
490		// Compute the median RTT between this server and the servers
491		// in each datacenter. We accumulate the lowest RTT to each DC
492		// in the master map, since a given DC might appear in multiple
493		// areas.
494		for dc, rtts := range index {
495			sort.Float64s(rtts)
496			rtt := rtts[len(rtts)/2]
497
498			current, ok := dcs[dc]
499			if !ok || (ok && rtt < current) {
500				dcs[dc] = rtt
501			}
502		}
503	}
504
505	// First sort by DC name, since we do a stable sort later.
506	names := make([]string, 0, len(dcs))
507	for dc := range dcs {
508		names = append(names, dc)
509	}
510	sort.Strings(names)
511
512	// Then stable sort by median RTT.
513	rtts := make([]float64, 0, len(dcs))
514	for _, dc := range names {
515		rtts = append(rtts, dcs[dc])
516	}
517	sort.Stable(&datacenterSorter{names, rtts})
518	return names, nil
519}
520
521// GetDatacenterMaps returns a structure with the raw network coordinates of
522// each known server, organized by datacenter and network area.
523func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
524	r.RLock()
525	defer r.RUnlock()
526
527	var maps []structs.DatacenterMap
528	for areaID, info := range r.areas {
529		index := make(map[string]structs.Coordinates)
530		for _, m := range info.cluster.Members() {
531			ok, parts := metadata.IsConsulServer(m)
532			if !ok {
533				r.logger.Warn("Non-server in server-only area",
534					"non_server", m.Name,
535					"area", areaID,
536				)
537				continue
538			}
539
540			if m.Status == serf.StatusLeft {
541				r.logger.Debug("server in area left, skipping",
542					"server", m.Name,
543					"area", areaID,
544				)
545				continue
546			}
547
548			coord, ok := info.cluster.GetCachedCoordinate(parts.Name)
549			if ok {
550				entry := &structs.Coordinate{
551					Node:  parts.Name,
552					Coord: coord,
553				}
554				existing := index[parts.Datacenter]
555				index[parts.Datacenter] = append(existing, entry)
556			}
557		}
558
559		for dc, coords := range index {
560			entry := structs.DatacenterMap{
561				Datacenter:  dc,
562				AreaID:      areaID,
563				Coordinates: coords,
564			}
565			maps = append(maps, entry)
566		}
567	}
568	return maps, nil
569}
570