1package router 2 3import ( 4 "fmt" 5 "sort" 6 "sync" 7 8 "github.com/hashicorp/go-hclog" 9 "github.com/hashicorp/serf/coordinate" 10 "github.com/hashicorp/serf/serf" 11 12 "github.com/hashicorp/consul/agent/metadata" 13 "github.com/hashicorp/consul/agent/structs" 14 "github.com/hashicorp/consul/lib" 15 "github.com/hashicorp/consul/logging" 16 "github.com/hashicorp/consul/types" 17) 18 19// Router keeps track of a set of network areas and their associated Serf 20// membership of Consul servers. It then indexes this by datacenter to provide 21// healthy routes to servers by datacenter. 22type Router struct { 23 // logger is used for diagnostic output. 24 logger hclog.Logger 25 26 // localDatacenter has the name of the router's home datacenter. This is 27 // used to short-circuit RTT calculations for local servers. 28 localDatacenter string 29 30 // serverName has the name of the router's server. This is used to 31 // short-circuit pinging to itself. 32 serverName string 33 34 // areas maps area IDs to structures holding information about that 35 // area. 36 areas map[types.AreaID]*areaInfo 37 38 // managers is an index from datacenter names to a list of server 39 // managers for that datacenter. This is used to quickly lookup routes. 40 managers map[string][]*Manager 41 42 // routeFn is a hook to actually do the routing. 43 routeFn func(datacenter string) (*Manager, *metadata.Server, bool) 44 45 // grpcServerTracker is used to balance grpc connections across servers, 46 // and has callbacks for adding or removing a server. 47 grpcServerTracker ServerTracker 48 49 // isShutdown prevents adding new routes to a router after it is shut 50 // down. 51 isShutdown bool 52 53 // This top-level lock covers all the internal state. 54 sync.RWMutex 55} 56 57// RouterSerfCluster is an interface wrapper around Serf in order to make this 58// easier to unit test. 59type RouterSerfCluster interface { 60 NumNodes() int 61 Members() []serf.Member 62 GetCoordinate() (*coordinate.Coordinate, error) 63 GetCachedCoordinate(name string) (*coordinate.Coordinate, bool) 64} 65 66// managerInfo holds a server manager for a datacenter along with its associated 67// shutdown channel. 68type managerInfo struct { 69 // manager is notified about servers for this datacenter. 70 manager *Manager 71 72 // shutdownCh is only given to this manager so we can shut it down when 73 // all servers for this datacenter are gone. 74 shutdownCh chan struct{} 75} 76 77// areaInfo holds information about a given network area. 78type areaInfo struct { 79 // cluster is the Serf instance for this network area. 80 cluster RouterSerfCluster 81 82 // pinger is used to ping servers in this network area when trying to 83 // find a new, healthy server to talk to. 84 pinger Pinger 85 86 // managers maps datacenter names to managers for that datacenter in 87 // this area. 88 managers map[string]*managerInfo 89 90 // useTLS specifies whether to use TLS to communicate for this network area. 91 useTLS bool 92} 93 94// NewRouter returns a new Router with the given configuration. 95func NewRouter(logger hclog.Logger, localDatacenter, serverName string, tracker ServerTracker) *Router { 96 if logger == nil { 97 logger = hclog.New(&hclog.LoggerOptions{}) 98 } 99 if tracker == nil { 100 tracker = NoOpServerTracker{} 101 } 102 103 router := &Router{ 104 logger: logger.Named(logging.Router), 105 localDatacenter: localDatacenter, 106 serverName: serverName, 107 areas: make(map[types.AreaID]*areaInfo), 108 managers: make(map[string][]*Manager), 109 grpcServerTracker: tracker, 110 } 111 112 // Hook the direct route lookup by default. 113 router.routeFn = router.findDirectRoute 114 115 return router 116} 117 118// Shutdown removes all areas from the router, which stops all their respective 119// managers. No new areas can be added after the router is shut down. 120func (r *Router) Shutdown() { 121 r.Lock() 122 defer r.Unlock() 123 124 for areaID, area := range r.areas { 125 for datacenter, info := range area.managers { 126 r.removeManagerFromIndex(datacenter, info.manager) 127 close(info.shutdownCh) 128 } 129 130 delete(r.areas, areaID) 131 } 132 133 r.isShutdown = true 134} 135 136// AddArea registers a new network area with the router. 137func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error { 138 r.Lock() 139 defer r.Unlock() 140 141 if r.isShutdown { 142 return fmt.Errorf("cannot add area, router is shut down") 143 } 144 145 if _, ok := r.areas[areaID]; ok { 146 return fmt.Errorf("area ID %q already exists", areaID) 147 } 148 149 area := &areaInfo{ 150 cluster: cluster, 151 pinger: pinger, 152 managers: make(map[string]*managerInfo), 153 } 154 r.areas[areaID] = area 155 156 // always ensure we have a started manager for the LAN area 157 if areaID == types.AreaLAN { 158 r.logger.Info("Initializing LAN area manager") 159 r.maybeInitializeManager(area, r.localDatacenter) 160 } 161 162 // Do an initial populate of the manager so that we don't have to wait 163 // for events to fire. This lets us attempt to use all the known servers 164 // initially, and then will quickly detect that they are failed if we 165 // can't reach them. 166 for _, m := range cluster.Members() { 167 ok, parts := metadata.IsConsulServer(m) 168 if !ok { 169 if areaID != types.AreaLAN { 170 r.logger.Warn("Non-server in server-only area", 171 "non_server", m.Name, 172 "area", areaID, 173 ) 174 } 175 continue 176 } 177 178 if err := r.addServer(area, parts); err != nil { 179 return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err) 180 } 181 } 182 183 return nil 184} 185 186// GetServerMetadataByAddr returns server metadata by dc and address. If it 187// didn't find anything, nil is returned. 188func (r *Router) GetServerMetadataByAddr(dc, addr string) *metadata.Server { 189 r.RLock() 190 defer r.RUnlock() 191 if ms, ok := r.managers[dc]; ok { 192 for _, m := range ms { 193 for _, s := range m.getServerList().servers { 194 if s.Addr.String() == addr { 195 return s 196 } 197 } 198 } 199 } 200 return nil 201} 202 203// removeManagerFromIndex does cleanup to take a manager out of the index of 204// datacenters. This assumes the lock is already held for writing, and will 205// panic if the given manager isn't found. 206func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) { 207 managers := r.managers[datacenter] 208 for i := 0; i < len(managers); i++ { 209 if managers[i] == manager { 210 r.managers[datacenter] = append(managers[:i], managers[i+1:]...) 211 if len(r.managers[datacenter]) == 0 { 212 delete(r.managers, datacenter) 213 } 214 return 215 } 216 } 217 panic("managers index out of sync") 218} 219 220// Returns whether TLS is enabled for the given area ID 221func (r *Router) TLSEnabled(areaID types.AreaID) (bool, error) { 222 r.RLock() 223 defer r.RUnlock() 224 225 area, ok := r.areas[areaID] 226 if !ok { 227 return false, fmt.Errorf("area ID %q does not exist", areaID) 228 } 229 230 return area.useTLS, nil 231} 232 233// RemoveArea removes an existing network area from the router. 234func (r *Router) RemoveArea(areaID types.AreaID) error { 235 r.Lock() 236 defer r.Unlock() 237 238 area, ok := r.areas[areaID] 239 if !ok { 240 return fmt.Errorf("area ID %q does not exist", areaID) 241 } 242 243 // Remove all of this area's managers from the index and shut them down. 244 for datacenter, info := range area.managers { 245 r.removeManagerFromIndex(datacenter, info.manager) 246 close(info.shutdownCh) 247 } 248 249 delete(r.areas, areaID) 250 return nil 251} 252 253// maybeInitializeManager will initialize a new manager for the given area/dc 254// if its not already created. Calling this function should only be done if 255// holding a write lock on the Router. 256func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager { 257 info, ok := area.managers[dc] 258 if ok { 259 return info.manager 260 } 261 262 shutdownCh := make(chan struct{}) 263 rb := r.grpcServerTracker.NewRebalancer(dc) 264 manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName, rb) 265 info = &managerInfo{ 266 manager: manager, 267 shutdownCh: shutdownCh, 268 } 269 area.managers[dc] = info 270 271 managers := r.managers[dc] 272 r.managers[dc] = append(managers, manager) 273 go manager.Run() 274 275 return manager 276} 277 278// addServer does the work of AddServer once the write lock is held. 279func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { 280 // Make the manager on the fly if this is the first we've seen of it, 281 // and add it to the index. 282 manager := r.maybeInitializeManager(area, s.Datacenter) 283 284 // If TLS is enabled for the area, set it on the server so the manager 285 // knows to use TLS when pinging it. 286 if area.useTLS { 287 s.UseTLS = true 288 } 289 290 manager.AddServer(s) 291 r.grpcServerTracker.AddServer(s) 292 return nil 293} 294 295// AddServer should be called whenever a new server joins an area. This is 296// typically hooked into the Serf event handler area for this area. 297func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error { 298 r.Lock() 299 defer r.Unlock() 300 301 area, ok := r.areas[areaID] 302 if !ok { 303 return fmt.Errorf("area ID %q does not exist", areaID) 304 } 305 return r.addServer(area, s) 306} 307 308// RemoveServer should be called whenever a server is removed from an area. This 309// is typically hooked into the Serf event handler area for this area. 310func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error { 311 r.Lock() 312 defer r.Unlock() 313 314 area, ok := r.areas[areaID] 315 if !ok { 316 return fmt.Errorf("area ID %q does not exist", areaID) 317 } 318 319 // If the manager has already been removed we just quietly exit. This 320 // can get called by Serf events, so the timing isn't totally 321 // deterministic. 322 info, ok := area.managers[s.Datacenter] 323 if !ok { 324 return nil 325 } 326 info.manager.RemoveServer(s) 327 r.grpcServerTracker.RemoveServer(s) 328 329 // If this manager is empty then remove it so we don't accumulate cruft 330 // and waste time during request routing. 331 if num := info.manager.NumServers(); num == 0 { 332 r.removeManagerFromIndex(s.Datacenter, info.manager) 333 close(info.shutdownCh) 334 delete(area.managers, s.Datacenter) 335 } 336 337 return nil 338} 339 340// FailServer should be called whenever a server is failed in an area. This 341// is typically hooked into the Serf event handler area for this area. We will 342// immediately shift traffic away from this server, but it will remain in the 343// list of servers. 344func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error { 345 r.RLock() 346 defer r.RUnlock() 347 348 area, ok := r.areas[areaID] 349 if !ok { 350 return fmt.Errorf("area ID %q does not exist", areaID) 351 } 352 353 // If the manager has already been removed we just quietly exit. This 354 // can get called by Serf events, so the timing isn't totally 355 // deterministic. 356 info, ok := area.managers[s.Datacenter] 357 if !ok { 358 return nil 359 } 360 361 info.manager.NotifyFailedServer(s) 362 return nil 363} 364 365// FindRoute returns a healthy server with a route to the given datacenter. The 366// Boolean return parameter will indicate if a server was available. In some 367// cases this may return a best-effort unhealthy server that can be used for a 368// connection attempt. If any problem occurs with the given server, the caller 369// should feed that back to the manager associated with the server, which is 370// also returned, by calling NotifyFailedServer(). 371func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) { 372 return r.routeFn(datacenter) 373} 374 375// FindLANRoute returns a healthy server within the local datacenter. In some 376// cases this may return a best-effort unhealthy server that can be used for a 377// connection attempt. If any problem occurs with the given server, the caller 378// should feed that back to the manager associated with the server, which is 379// also returned, by calling NotifyFailedServer(). 380func (r *Router) FindLANRoute() (*Manager, *metadata.Server) { 381 mgr := r.GetLANManager() 382 383 if mgr == nil { 384 return nil, nil 385 } 386 387 return mgr, mgr.FindServer() 388} 389 390// FindLANServer will look for a server in the local datacenter. 391// This function may return a nil value if no server is available. 392func (r *Router) FindLANServer() *metadata.Server { 393 _, srv := r.FindLANRoute() 394 return srv 395} 396 397// findDirectRoute looks for a route to the given datacenter if it's directly 398// adjacent to the server. 399func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) { 400 r.RLock() 401 defer r.RUnlock() 402 403 // Get the list of managers for this datacenter. This will usually just 404 // have one entry, but it's possible to have a user-defined area + WAN. 405 managers, ok := r.managers[datacenter] 406 if !ok { 407 return nil, nil, false 408 } 409 410 // Try each manager until we get a server. 411 for _, manager := range managers { 412 if manager.IsOffline() { 413 continue 414 } 415 416 if s := manager.FindServer(); s != nil { 417 return manager, s, true 418 } 419 } 420 421 // Didn't find a route (even via an unhealthy server). 422 return nil, nil, false 423} 424 425// CheckServers returns thwo things 426// 1. bool to indicate whether any servers were processed 427// 2. error if any propagated from the fn 428// 429// The fn called should return a bool indicating whether checks should continue and an error 430// If an error is returned then checks will stop immediately 431func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) { 432 r.RLock() 433 defer r.RUnlock() 434 435 managers, ok := r.managers[dc] 436 if !ok { 437 return 438 } 439 440 for _, m := range managers { 441 if !m.checkServers(fn) { 442 return 443 } 444 } 445} 446 447// GetDatacenters returns a list of datacenters known to the router, sorted by 448// name. 449func (r *Router) GetDatacenters() []string { 450 r.RLock() 451 defer r.RUnlock() 452 453 dcs := make([]string, 0, len(r.managers)) 454 for dc := range r.managers { 455 dcs = append(dcs, dc) 456 } 457 458 sort.Strings(dcs) 459 return dcs 460} 461 462// GetRemoteDatacenters returns a list of remote datacenters known to the router, sorted by 463// name. 464func (r *Router) GetRemoteDatacenters(local string) []string { 465 r.RLock() 466 defer r.RUnlock() 467 468 dcs := make([]string, 0, len(r.managers)) 469 for dc := range r.managers { 470 if dc == local { 471 continue 472 } 473 dcs = append(dcs, dc) 474 } 475 476 sort.Strings(dcs) 477 return dcs 478} 479 480// HasDatacenter checks whether dc is defined in WAN 481func (r *Router) HasDatacenter(dc string) bool { 482 r.RLock() 483 defer r.RUnlock() 484 _, ok := r.managers[dc] 485 return ok 486} 487 488// GetLANManager returns the Manager for the LAN area and the local datacenter 489func (r *Router) GetLANManager() *Manager { 490 r.RLock() 491 defer r.RUnlock() 492 493 area, ok := r.areas[types.AreaLAN] 494 if !ok { 495 return nil 496 } 497 498 managerInfo, ok := area.managers[r.localDatacenter] 499 if !ok { 500 return nil 501 } 502 503 return managerInfo.manager 504} 505 506// datacenterSorter takes a list of DC names and a parallel vector of distances 507// and implements sort.Interface, keeping both structures coherent and sorting 508// by distance. 509type datacenterSorter struct { 510 Names []string 511 Vec []float64 512} 513 514// See sort.Interface. 515func (n *datacenterSorter) Len() int { 516 return len(n.Names) 517} 518 519// See sort.Interface. 520func (n *datacenterSorter) Swap(i, j int) { 521 n.Names[i], n.Names[j] = n.Names[j], n.Names[i] 522 n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i] 523} 524 525// See sort.Interface. 526func (n *datacenterSorter) Less(i, j int) bool { 527 return n.Vec[i] < n.Vec[j] 528} 529 530// GetDatacentersByDistance returns a list of datacenters known to the router, 531// sorted by median RTT from this server to the servers in each datacenter. If 532// there are multiple areas that reach a given datacenter, this will use the 533// lowest RTT for the sort. 534func (r *Router) GetDatacentersByDistance() ([]string, error) { 535 r.RLock() 536 defer r.RUnlock() 537 538 // Go through each area and aggregate the median RTT from the current 539 // server to the other servers in each datacenter. 540 dcs := make(map[string]float64) 541 for areaID, info := range r.areas { 542 index := make(map[string][]float64) 543 coord, err := info.cluster.GetCoordinate() 544 if err != nil { 545 return nil, err 546 } 547 548 for _, m := range info.cluster.Members() { 549 ok, parts := metadata.IsConsulServer(m) 550 if !ok { 551 if areaID != types.AreaLAN { 552 r.logger.Warn("Non-server in server-only area", 553 "non_server", m.Name, 554 "area", areaID, 555 "func", "GetDatacentersByDistance", 556 ) 557 } 558 continue 559 } 560 561 if m.Status == serf.StatusLeft { 562 r.logger.Debug("server in area left, skipping", 563 "server", m.Name, 564 "area", areaID, 565 "func", "GetDatacentersByDistance", 566 ) 567 continue 568 } 569 570 existing := index[parts.Datacenter] 571 if parts.Datacenter == r.localDatacenter { 572 // Everything in the local datacenter looks like zero RTT. 573 index[parts.Datacenter] = append(existing, 0.0) 574 } else { 575 // It's OK to get a nil coordinate back, ComputeDistance 576 // will put the RTT at positive infinity. 577 other, _ := info.cluster.GetCachedCoordinate(parts.Name) 578 rtt := lib.ComputeDistance(coord, other) 579 index[parts.Datacenter] = append(existing, rtt) 580 } 581 } 582 583 // Compute the median RTT between this server and the servers 584 // in each datacenter. We accumulate the lowest RTT to each DC 585 // in the master map, since a given DC might appear in multiple 586 // areas. 587 for dc, rtts := range index { 588 sort.Float64s(rtts) 589 rtt := rtts[len(rtts)/2] 590 591 current, ok := dcs[dc] 592 if !ok || (ok && rtt < current) { 593 dcs[dc] = rtt 594 } 595 } 596 } 597 598 // First sort by DC name, since we do a stable sort later. 599 names := make([]string, 0, len(dcs)) 600 for dc := range dcs { 601 names = append(names, dc) 602 } 603 sort.Strings(names) 604 605 // Then stable sort by median RTT. 606 rtts := make([]float64, 0, len(dcs)) 607 for _, dc := range names { 608 rtts = append(rtts, dcs[dc]) 609 } 610 sort.Stable(&datacenterSorter{names, rtts}) 611 return names, nil 612} 613 614// GetDatacenterMaps returns a structure with the raw network coordinates of 615// each known server, organized by datacenter and network area. 616func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { 617 r.RLock() 618 defer r.RUnlock() 619 620 var maps []structs.DatacenterMap 621 for areaID, info := range r.areas { 622 index := make(map[string]structs.Coordinates) 623 for _, m := range info.cluster.Members() { 624 ok, parts := metadata.IsConsulServer(m) 625 if !ok { 626 if areaID != types.AreaLAN { 627 r.logger.Warn("Non-server in server-only area", 628 "non_server", m.Name, 629 "area", areaID, 630 "func", "GetDatacenterMaps", 631 ) 632 } 633 continue 634 } 635 636 if m.Status == serf.StatusLeft { 637 r.logger.Debug("server in area left, skipping", 638 "server", m.Name, 639 "area", areaID, 640 "func", "GetDatacenterMaps", 641 ) 642 continue 643 } 644 645 coord, ok := info.cluster.GetCachedCoordinate(parts.Name) 646 if ok { 647 entry := &structs.Coordinate{ 648 Node: parts.Name, 649 Coord: coord, 650 } 651 existing := index[parts.Datacenter] 652 index[parts.Datacenter] = append(existing, entry) 653 } 654 } 655 656 for dc, coords := range index { 657 entry := structs.DatacenterMap{ 658 Datacenter: dc, 659 AreaID: areaID, 660 Coordinates: coords, 661 } 662 maps = append(maps, entry) 663 } 664 } 665 return maps, nil 666} 667