1package router
2
3import (
4	"fmt"
5	"sort"
6	"sync"
7
8	"github.com/hashicorp/go-hclog"
9	"github.com/hashicorp/serf/coordinate"
10	"github.com/hashicorp/serf/serf"
11
12	"github.com/hashicorp/consul/agent/metadata"
13	"github.com/hashicorp/consul/agent/structs"
14	"github.com/hashicorp/consul/lib"
15	"github.com/hashicorp/consul/logging"
16	"github.com/hashicorp/consul/types"
17)
18
19// Router keeps track of a set of network areas and their associated Serf
20// membership of Consul servers. It then indexes this by datacenter to provide
21// healthy routes to servers by datacenter.
22type Router struct {
23	// logger is used for diagnostic output.
24	logger hclog.Logger
25
26	// localDatacenter has the name of the router's home datacenter. This is
27	// used to short-circuit RTT calculations for local servers.
28	localDatacenter string
29
30	// serverName has the name of the router's server. This is used to
31	// short-circuit pinging to itself.
32	serverName string
33
34	// areas maps area IDs to structures holding information about that
35	// area.
36	areas map[types.AreaID]*areaInfo
37
38	// managers is an index from datacenter names to a list of server
39	// managers for that datacenter. This is used to quickly lookup routes.
40	managers map[string][]*Manager
41
42	// routeFn is a hook to actually do the routing.
43	routeFn func(datacenter string) (*Manager, *metadata.Server, bool)
44
45	// grpcServerTracker is used to balance grpc connections across servers,
46	// and has callbacks for adding or removing a server.
47	grpcServerTracker ServerTracker
48
49	// isShutdown prevents adding new routes to a router after it is shut
50	// down.
51	isShutdown bool
52
53	// This top-level lock covers all the internal state.
54	sync.RWMutex
55}
56
57// RouterSerfCluster is an interface wrapper around Serf in order to make this
58// easier to unit test.
59type RouterSerfCluster interface {
60	NumNodes() int
61	Members() []serf.Member
62	GetCoordinate() (*coordinate.Coordinate, error)
63	GetCachedCoordinate(name string) (*coordinate.Coordinate, bool)
64}
65
66// managerInfo holds a server manager for a datacenter along with its associated
67// shutdown channel.
68type managerInfo struct {
69	// manager is notified about servers for this datacenter.
70	manager *Manager
71
72	// shutdownCh is only given to this manager so we can shut it down when
73	// all servers for this datacenter are gone.
74	shutdownCh chan struct{}
75}
76
77// areaInfo holds information about a given network area.
78type areaInfo struct {
79	// cluster is the Serf instance for this network area.
80	cluster RouterSerfCluster
81
82	// pinger is used to ping servers in this network area when trying to
83	// find a new, healthy server to talk to.
84	pinger Pinger
85
86	// managers maps datacenter names to managers for that datacenter in
87	// this area.
88	managers map[string]*managerInfo
89
90	// useTLS specifies whether to use TLS to communicate for this network area.
91	useTLS bool
92}
93
94// NewRouter returns a new Router with the given configuration.
95func NewRouter(logger hclog.Logger, localDatacenter, serverName string, tracker ServerTracker) *Router {
96	if logger == nil {
97		logger = hclog.New(&hclog.LoggerOptions{})
98	}
99	if tracker == nil {
100		tracker = NoOpServerTracker{}
101	}
102
103	router := &Router{
104		logger:            logger.Named(logging.Router),
105		localDatacenter:   localDatacenter,
106		serverName:        serverName,
107		areas:             make(map[types.AreaID]*areaInfo),
108		managers:          make(map[string][]*Manager),
109		grpcServerTracker: tracker,
110	}
111
112	// Hook the direct route lookup by default.
113	router.routeFn = router.findDirectRoute
114
115	return router
116}
117
118// Shutdown removes all areas from the router, which stops all their respective
119// managers. No new areas can be added after the router is shut down.
120func (r *Router) Shutdown() {
121	r.Lock()
122	defer r.Unlock()
123
124	for areaID, area := range r.areas {
125		for datacenter, info := range area.managers {
126			r.removeManagerFromIndex(datacenter, info.manager)
127			close(info.shutdownCh)
128		}
129
130		delete(r.areas, areaID)
131	}
132
133	r.isShutdown = true
134}
135
136// AddArea registers a new network area with the router.
137func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
138	r.Lock()
139	defer r.Unlock()
140
141	if r.isShutdown {
142		return fmt.Errorf("cannot add area, router is shut down")
143	}
144
145	if _, ok := r.areas[areaID]; ok {
146		return fmt.Errorf("area ID %q already exists", areaID)
147	}
148
149	area := &areaInfo{
150		cluster:  cluster,
151		pinger:   pinger,
152		managers: make(map[string]*managerInfo),
153	}
154	r.areas[areaID] = area
155
156	// always ensure we have a started manager for the LAN area
157	if areaID == types.AreaLAN {
158		r.logger.Info("Initializing LAN area manager")
159		r.maybeInitializeManager(area, r.localDatacenter)
160	}
161
162	// Do an initial populate of the manager so that we don't have to wait
163	// for events to fire. This lets us attempt to use all the known servers
164	// initially, and then will quickly detect that they are failed if we
165	// can't reach them.
166	for _, m := range cluster.Members() {
167		ok, parts := metadata.IsConsulServer(m)
168		if !ok {
169			if areaID != types.AreaLAN {
170				r.logger.Warn("Non-server in server-only area",
171					"non_server", m.Name,
172					"area", areaID,
173				)
174			}
175			continue
176		}
177
178		if err := r.addServer(area, parts); err != nil {
179			return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
180		}
181	}
182
183	return nil
184}
185
186// GetServerMetadataByAddr returns server metadata by dc and address. If it
187// didn't find anything, nil is returned.
188func (r *Router) GetServerMetadataByAddr(dc, addr string) *metadata.Server {
189	r.RLock()
190	defer r.RUnlock()
191	if ms, ok := r.managers[dc]; ok {
192		for _, m := range ms {
193			for _, s := range m.getServerList().servers {
194				if s.Addr.String() == addr {
195					return s
196				}
197			}
198		}
199	}
200	return nil
201}
202
203// removeManagerFromIndex does cleanup to take a manager out of the index of
204// datacenters. This assumes the lock is already held for writing, and will
205// panic if the given manager isn't found.
206func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
207	managers := r.managers[datacenter]
208	for i := 0; i < len(managers); i++ {
209		if managers[i] == manager {
210			r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
211			if len(r.managers[datacenter]) == 0 {
212				delete(r.managers, datacenter)
213			}
214			return
215		}
216	}
217	panic("managers index out of sync")
218}
219
220// Returns whether TLS is enabled for the given area ID
221func (r *Router) TLSEnabled(areaID types.AreaID) (bool, error) {
222	r.RLock()
223	defer r.RUnlock()
224
225	area, ok := r.areas[areaID]
226	if !ok {
227		return false, fmt.Errorf("area ID %q does not exist", areaID)
228	}
229
230	return area.useTLS, nil
231}
232
233// RemoveArea removes an existing network area from the router.
234func (r *Router) RemoveArea(areaID types.AreaID) error {
235	r.Lock()
236	defer r.Unlock()
237
238	area, ok := r.areas[areaID]
239	if !ok {
240		return fmt.Errorf("area ID %q does not exist", areaID)
241	}
242
243	// Remove all of this area's managers from the index and shut them down.
244	for datacenter, info := range area.managers {
245		r.removeManagerFromIndex(datacenter, info.manager)
246		close(info.shutdownCh)
247	}
248
249	delete(r.areas, areaID)
250	return nil
251}
252
253// maybeInitializeManager will initialize a new manager for the given area/dc
254// if its not already created. Calling this function should only be done if
255// holding a write lock on the Router.
256func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
257	info, ok := area.managers[dc]
258	if ok {
259		return info.manager
260	}
261
262	shutdownCh := make(chan struct{})
263	rb := r.grpcServerTracker.NewRebalancer(dc)
264	manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName, rb)
265	info = &managerInfo{
266		manager:    manager,
267		shutdownCh: shutdownCh,
268	}
269	area.managers[dc] = info
270
271	managers := r.managers[dc]
272	r.managers[dc] = append(managers, manager)
273	go manager.Run()
274
275	return manager
276}
277
278// addServer does the work of AddServer once the write lock is held.
279func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
280	// Make the manager on the fly if this is the first we've seen of it,
281	// and add it to the index.
282	manager := r.maybeInitializeManager(area, s.Datacenter)
283
284	// If TLS is enabled for the area, set it on the server so the manager
285	// knows to use TLS when pinging it.
286	if area.useTLS {
287		s.UseTLS = true
288	}
289
290	manager.AddServer(s)
291	r.grpcServerTracker.AddServer(s)
292	return nil
293}
294
295// AddServer should be called whenever a new server joins an area. This is
296// typically hooked into the Serf event handler area for this area.
297func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error {
298	r.Lock()
299	defer r.Unlock()
300
301	area, ok := r.areas[areaID]
302	if !ok {
303		return fmt.Errorf("area ID %q does not exist", areaID)
304	}
305	return r.addServer(area, s)
306}
307
308// RemoveServer should be called whenever a server is removed from an area. This
309// is typically hooked into the Serf event handler area for this area.
310func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
311	r.Lock()
312	defer r.Unlock()
313
314	area, ok := r.areas[areaID]
315	if !ok {
316		return fmt.Errorf("area ID %q does not exist", areaID)
317	}
318
319	// If the manager has already been removed we just quietly exit. This
320	// can get called by Serf events, so the timing isn't totally
321	// deterministic.
322	info, ok := area.managers[s.Datacenter]
323	if !ok {
324		return nil
325	}
326	info.manager.RemoveServer(s)
327	r.grpcServerTracker.RemoveServer(s)
328
329	// If this manager is empty then remove it so we don't accumulate cruft
330	// and waste time during request routing.
331	if num := info.manager.NumServers(); num == 0 {
332		r.removeManagerFromIndex(s.Datacenter, info.manager)
333		close(info.shutdownCh)
334		delete(area.managers, s.Datacenter)
335	}
336
337	return nil
338}
339
340// FailServer should be called whenever a server is failed in an area. This
341// is typically hooked into the Serf event handler area for this area. We will
342// immediately shift traffic away from this server, but it will remain in the
343// list of servers.
344func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error {
345	r.RLock()
346	defer r.RUnlock()
347
348	area, ok := r.areas[areaID]
349	if !ok {
350		return fmt.Errorf("area ID %q does not exist", areaID)
351	}
352
353	// If the manager has already been removed we just quietly exit. This
354	// can get called by Serf events, so the timing isn't totally
355	// deterministic.
356	info, ok := area.managers[s.Datacenter]
357	if !ok {
358		return nil
359	}
360
361	info.manager.NotifyFailedServer(s)
362	return nil
363}
364
365// FindRoute returns a healthy server with a route to the given datacenter. The
366// Boolean return parameter will indicate if a server was available. In some
367// cases this may return a best-effort unhealthy server that can be used for a
368// connection attempt. If any problem occurs with the given server, the caller
369// should feed that back to the manager associated with the server, which is
370// also returned, by calling NotifyFailedServer().
371func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) {
372	return r.routeFn(datacenter)
373}
374
375// FindLANRoute returns a healthy server within the local datacenter. In some
376// cases this may return a best-effort unhealthy server that can be used for a
377// connection attempt. If any problem occurs with the given server, the caller
378// should feed that back to the manager associated with the server, which is
379// also returned, by calling NotifyFailedServer().
380func (r *Router) FindLANRoute() (*Manager, *metadata.Server) {
381	mgr := r.GetLANManager()
382
383	if mgr == nil {
384		return nil, nil
385	}
386
387	return mgr, mgr.FindServer()
388}
389
390// FindLANServer will look for a server in the local datacenter.
391// This function may return a nil value if no server is available.
392func (r *Router) FindLANServer() *metadata.Server {
393	_, srv := r.FindLANRoute()
394	return srv
395}
396
397// findDirectRoute looks for a route to the given datacenter if it's directly
398// adjacent to the server.
399func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) {
400	r.RLock()
401	defer r.RUnlock()
402
403	// Get the list of managers for this datacenter. This will usually just
404	// have one entry, but it's possible to have a user-defined area + WAN.
405	managers, ok := r.managers[datacenter]
406	if !ok {
407		return nil, nil, false
408	}
409
410	// Try each manager until we get a server.
411	for _, manager := range managers {
412		if manager.IsOffline() {
413			continue
414		}
415
416		if s := manager.FindServer(); s != nil {
417			return manager, s, true
418		}
419	}
420
421	// Didn't find a route (even via an unhealthy server).
422	return nil, nil, false
423}
424
425// CheckServers returns thwo things
426// 1. bool to indicate whether any servers were processed
427// 2. error if any propagated from the fn
428//
429// The fn called should return a bool indicating whether checks should continue and an error
430// If an error is returned then checks will stop immediately
431func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) {
432	r.RLock()
433	defer r.RUnlock()
434
435	managers, ok := r.managers[dc]
436	if !ok {
437		return
438	}
439
440	for _, m := range managers {
441		if !m.checkServers(fn) {
442			return
443		}
444	}
445}
446
447// GetDatacenters returns a list of datacenters known to the router, sorted by
448// name.
449func (r *Router) GetDatacenters() []string {
450	r.RLock()
451	defer r.RUnlock()
452
453	dcs := make([]string, 0, len(r.managers))
454	for dc := range r.managers {
455		dcs = append(dcs, dc)
456	}
457
458	sort.Strings(dcs)
459	return dcs
460}
461
462// GetRemoteDatacenters returns a list of remote datacenters known to the router, sorted by
463// name.
464func (r *Router) GetRemoteDatacenters(local string) []string {
465	r.RLock()
466	defer r.RUnlock()
467
468	dcs := make([]string, 0, len(r.managers))
469	for dc := range r.managers {
470		if dc == local {
471			continue
472		}
473		dcs = append(dcs, dc)
474	}
475
476	sort.Strings(dcs)
477	return dcs
478}
479
480// HasDatacenter checks whether dc is defined in WAN
481func (r *Router) HasDatacenter(dc string) bool {
482	r.RLock()
483	defer r.RUnlock()
484	_, ok := r.managers[dc]
485	return ok
486}
487
488// GetLANManager returns the Manager for the LAN area and the local datacenter
489func (r *Router) GetLANManager() *Manager {
490	r.RLock()
491	defer r.RUnlock()
492
493	area, ok := r.areas[types.AreaLAN]
494	if !ok {
495		return nil
496	}
497
498	managerInfo, ok := area.managers[r.localDatacenter]
499	if !ok {
500		return nil
501	}
502
503	return managerInfo.manager
504}
505
506// datacenterSorter takes a list of DC names and a parallel vector of distances
507// and implements sort.Interface, keeping both structures coherent and sorting
508// by distance.
509type datacenterSorter struct {
510	Names []string
511	Vec   []float64
512}
513
514// See sort.Interface.
515func (n *datacenterSorter) Len() int {
516	return len(n.Names)
517}
518
519// See sort.Interface.
520func (n *datacenterSorter) Swap(i, j int) {
521	n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
522	n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
523}
524
525// See sort.Interface.
526func (n *datacenterSorter) Less(i, j int) bool {
527	return n.Vec[i] < n.Vec[j]
528}
529
530// GetDatacentersByDistance returns a list of datacenters known to the router,
531// sorted by median RTT from this server to the servers in each datacenter. If
532// there are multiple areas that reach a given datacenter, this will use the
533// lowest RTT for the sort.
534func (r *Router) GetDatacentersByDistance() ([]string, error) {
535	r.RLock()
536	defer r.RUnlock()
537
538	// Go through each area and aggregate the median RTT from the current
539	// server to the other servers in each datacenter.
540	dcs := make(map[string]float64)
541	for areaID, info := range r.areas {
542		index := make(map[string][]float64)
543		coord, err := info.cluster.GetCoordinate()
544		if err != nil {
545			return nil, err
546		}
547
548		for _, m := range info.cluster.Members() {
549			ok, parts := metadata.IsConsulServer(m)
550			if !ok {
551				if areaID != types.AreaLAN {
552					r.logger.Warn("Non-server in server-only area",
553						"non_server", m.Name,
554						"area", areaID,
555						"func", "GetDatacentersByDistance",
556					)
557				}
558				continue
559			}
560
561			if m.Status == serf.StatusLeft {
562				r.logger.Debug("server in area left, skipping",
563					"server", m.Name,
564					"area", areaID,
565					"func", "GetDatacentersByDistance",
566				)
567				continue
568			}
569
570			existing := index[parts.Datacenter]
571			if parts.Datacenter == r.localDatacenter {
572				// Everything in the local datacenter looks like zero RTT.
573				index[parts.Datacenter] = append(existing, 0.0)
574			} else {
575				// It's OK to get a nil coordinate back, ComputeDistance
576				// will put the RTT at positive infinity.
577				other, _ := info.cluster.GetCachedCoordinate(parts.Name)
578				rtt := lib.ComputeDistance(coord, other)
579				index[parts.Datacenter] = append(existing, rtt)
580			}
581		}
582
583		// Compute the median RTT between this server and the servers
584		// in each datacenter. We accumulate the lowest RTT to each DC
585		// in the master map, since a given DC might appear in multiple
586		// areas.
587		for dc, rtts := range index {
588			sort.Float64s(rtts)
589			rtt := rtts[len(rtts)/2]
590
591			current, ok := dcs[dc]
592			if !ok || (ok && rtt < current) {
593				dcs[dc] = rtt
594			}
595		}
596	}
597
598	// First sort by DC name, since we do a stable sort later.
599	names := make([]string, 0, len(dcs))
600	for dc := range dcs {
601		names = append(names, dc)
602	}
603	sort.Strings(names)
604
605	// Then stable sort by median RTT.
606	rtts := make([]float64, 0, len(dcs))
607	for _, dc := range names {
608		rtts = append(rtts, dcs[dc])
609	}
610	sort.Stable(&datacenterSorter{names, rtts})
611	return names, nil
612}
613
614// GetDatacenterMaps returns a structure with the raw network coordinates of
615// each known server, organized by datacenter and network area.
616func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
617	r.RLock()
618	defer r.RUnlock()
619
620	var maps []structs.DatacenterMap
621	for areaID, info := range r.areas {
622		index := make(map[string]structs.Coordinates)
623		for _, m := range info.cluster.Members() {
624			ok, parts := metadata.IsConsulServer(m)
625			if !ok {
626				if areaID != types.AreaLAN {
627					r.logger.Warn("Non-server in server-only area",
628						"non_server", m.Name,
629						"area", areaID,
630						"func", "GetDatacenterMaps",
631					)
632				}
633				continue
634			}
635
636			if m.Status == serf.StatusLeft {
637				r.logger.Debug("server in area left, skipping",
638					"server", m.Name,
639					"area", areaID,
640					"func", "GetDatacenterMaps",
641				)
642				continue
643			}
644
645			coord, ok := info.cluster.GetCachedCoordinate(parts.Name)
646			if ok {
647				entry := &structs.Coordinate{
648					Node:  parts.Name,
649					Coord: coord,
650				}
651				existing := index[parts.Datacenter]
652				index[parts.Datacenter] = append(existing, entry)
653			}
654		}
655
656		for dc, coords := range index {
657			entry := structs.DatacenterMap{
658				Datacenter:  dc,
659				AreaID:      areaID,
660				Coordinates: coords,
661			}
662			maps = append(maps, entry)
663		}
664	}
665	return maps, nil
666}
667