1package router 2 3import ( 4 "fmt" 5 "sort" 6 "sync" 7 8 "github.com/hashicorp/consul/agent/metadata" 9 "github.com/hashicorp/consul/agent/structs" 10 "github.com/hashicorp/consul/lib" 11 "github.com/hashicorp/consul/logging" 12 "github.com/hashicorp/consul/types" 13 "github.com/hashicorp/go-hclog" 14 "github.com/hashicorp/serf/coordinate" 15 "github.com/hashicorp/serf/serf" 16) 17 18// Router keeps track of a set of network areas and their associated Serf 19// membership of Consul servers. It then indexes this by datacenter to provide 20// healthy routes to servers by datacenter. 21type Router struct { 22 // logger is used for diagnostic output. 23 logger hclog.Logger 24 25 // localDatacenter has the name of the router's home datacenter. This is 26 // used to short-circuit RTT calculations for local servers. 27 localDatacenter string 28 29 // serverName has the name of the router's server. This is used to 30 // short-circuit pinging to itself. 31 serverName string 32 33 // areas maps area IDs to structures holding information about that 34 // area. 35 areas map[types.AreaID]*areaInfo 36 37 // managers is an index from datacenter names to a list of server 38 // managers for that datacenter. This is used to quickly lookup routes. 39 managers map[string][]*Manager 40 41 // routeFn is a hook to actually do the routing. 42 routeFn func(datacenter string) (*Manager, *metadata.Server, bool) 43 44 // isShutdown prevents adding new routes to a router after it is shut 45 // down. 46 isShutdown bool 47 48 // This top-level lock covers all the internal state. 49 sync.RWMutex 50} 51 52// RouterSerfCluster is an interface wrapper around Serf in order to make this 53// easier to unit test. 54type RouterSerfCluster interface { 55 NumNodes() int 56 Members() []serf.Member 57 GetCoordinate() (*coordinate.Coordinate, error) 58 GetCachedCoordinate(name string) (*coordinate.Coordinate, bool) 59} 60 61// managerInfo holds a server manager for a datacenter along with its associated 62// shutdown channel. 63type managerInfo struct { 64 // manager is notified about servers for this datacenter. 65 manager *Manager 66 67 // shutdownCh is only given to this manager so we can shut it down when 68 // all servers for this datacenter are gone. 69 shutdownCh chan struct{} 70} 71 72// areaInfo holds information about a given network area. 73type areaInfo struct { 74 // cluster is the Serf instance for this network area. 75 cluster RouterSerfCluster 76 77 // pinger is used to ping servers in this network area when trying to 78 // find a new, healthy server to talk to. 79 pinger Pinger 80 81 // managers maps datacenter names to managers for that datacenter in 82 // this area. 83 managers map[string]*managerInfo 84 85 // useTLS specifies whether to use TLS to communicate for this network area. 86 useTLS bool 87} 88 89// NewRouter returns a new Router with the given configuration. 90func NewRouter(logger hclog.Logger, localDatacenter, serverName string) *Router { 91 if logger == nil { 92 logger = hclog.New(&hclog.LoggerOptions{}) 93 } 94 95 router := &Router{ 96 logger: logger.Named(logging.Router), 97 localDatacenter: localDatacenter, 98 serverName: serverName, 99 areas: make(map[types.AreaID]*areaInfo), 100 managers: make(map[string][]*Manager), 101 } 102 103 // Hook the direct route lookup by default. 104 router.routeFn = router.findDirectRoute 105 106 return router 107} 108 109// Shutdown removes all areas from the router, which stops all their respective 110// managers. No new areas can be added after the router is shut down. 111func (r *Router) Shutdown() { 112 r.Lock() 113 defer r.Unlock() 114 115 for areaID, area := range r.areas { 116 for datacenter, info := range area.managers { 117 r.removeManagerFromIndex(datacenter, info.manager) 118 close(info.shutdownCh) 119 } 120 121 delete(r.areas, areaID) 122 } 123 124 r.isShutdown = true 125} 126 127// AddArea registers a new network area with the router. 128func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error { 129 r.Lock() 130 defer r.Unlock() 131 132 if r.isShutdown { 133 return fmt.Errorf("cannot add area, router is shut down") 134 } 135 136 if _, ok := r.areas[areaID]; ok { 137 return fmt.Errorf("area ID %q already exists", areaID) 138 } 139 140 area := &areaInfo{ 141 cluster: cluster, 142 pinger: pinger, 143 managers: make(map[string]*managerInfo), 144 } 145 r.areas[areaID] = area 146 147 // Do an initial populate of the manager so that we don't have to wait 148 // for events to fire. This lets us attempt to use all the known servers 149 // initially, and then will quickly detect that they are failed if we 150 // can't reach them. 151 for _, m := range cluster.Members() { 152 ok, parts := metadata.IsConsulServer(m) 153 if !ok { 154 r.logger.Warn("Non-server in server-only area", 155 "non_server", m.Name, 156 "area", areaID, 157 ) 158 continue 159 } 160 161 if err := r.addServer(area, parts); err != nil { 162 return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err) 163 } 164 } 165 166 return nil 167} 168 169// GetServerMetadataByAddr returns server metadata by dc and address. If it 170// didn't find anything, nil is returned. 171func (r *Router) GetServerMetadataByAddr(dc, addr string) *metadata.Server { 172 r.RLock() 173 defer r.RUnlock() 174 if ms, ok := r.managers[dc]; ok { 175 for _, m := range ms { 176 for _, s := range m.getServerList().servers { 177 if s.Addr.String() == addr { 178 return s 179 } 180 } 181 } 182 } 183 return nil 184} 185 186// removeManagerFromIndex does cleanup to take a manager out of the index of 187// datacenters. This assumes the lock is already held for writing, and will 188// panic if the given manager isn't found. 189func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) { 190 managers := r.managers[datacenter] 191 for i := 0; i < len(managers); i++ { 192 if managers[i] == manager { 193 r.managers[datacenter] = append(managers[:i], managers[i+1:]...) 194 if len(r.managers[datacenter]) == 0 { 195 delete(r.managers, datacenter) 196 } 197 return 198 } 199 } 200 panic("managers index out of sync") 201} 202 203// Returns whether TLS is enabled for the given area ID 204func (r *Router) TLSEnabled(areaID types.AreaID) (bool, error) { 205 r.RLock() 206 defer r.RUnlock() 207 208 area, ok := r.areas[areaID] 209 if !ok { 210 return false, fmt.Errorf("area ID %q does not exist", areaID) 211 } 212 213 return area.useTLS, nil 214} 215 216// RemoveArea removes an existing network area from the router. 217func (r *Router) RemoveArea(areaID types.AreaID) error { 218 r.Lock() 219 defer r.Unlock() 220 221 area, ok := r.areas[areaID] 222 if !ok { 223 return fmt.Errorf("area ID %q does not exist", areaID) 224 } 225 226 // Remove all of this area's managers from the index and shut them down. 227 for datacenter, info := range area.managers { 228 r.removeManagerFromIndex(datacenter, info.manager) 229 close(info.shutdownCh) 230 } 231 232 delete(r.areas, areaID) 233 return nil 234} 235 236// addServer does the work of AddServer once the write lock is held. 237func (r *Router) addServer(area *areaInfo, s *metadata.Server) error { 238 // Make the manager on the fly if this is the first we've seen of it, 239 // and add it to the index. 240 info, ok := area.managers[s.Datacenter] 241 if !ok { 242 shutdownCh := make(chan struct{}) 243 manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName) 244 info = &managerInfo{ 245 manager: manager, 246 shutdownCh: shutdownCh, 247 } 248 area.managers[s.Datacenter] = info 249 250 managers := r.managers[s.Datacenter] 251 r.managers[s.Datacenter] = append(managers, manager) 252 go manager.Start() 253 } 254 255 // If TLS is enabled for the area, set it on the server so the manager 256 // knows to use TLS when pinging it. 257 if area.useTLS { 258 s.UseTLS = true 259 } 260 261 info.manager.AddServer(s) 262 return nil 263} 264 265// AddServer should be called whenever a new server joins an area. This is 266// typically hooked into the Serf event handler area for this area. 267func (r *Router) AddServer(areaID types.AreaID, s *metadata.Server) error { 268 r.Lock() 269 defer r.Unlock() 270 271 area, ok := r.areas[areaID] 272 if !ok { 273 return fmt.Errorf("area ID %q does not exist", areaID) 274 } 275 return r.addServer(area, s) 276} 277 278// RemoveServer should be called whenever a server is removed from an area. This 279// is typically hooked into the Serf event handler area for this area. 280func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error { 281 r.Lock() 282 defer r.Unlock() 283 284 area, ok := r.areas[areaID] 285 if !ok { 286 return fmt.Errorf("area ID %q does not exist", areaID) 287 } 288 289 // If the manager has already been removed we just quietly exit. This 290 // can get called by Serf events, so the timing isn't totally 291 // deterministic. 292 info, ok := area.managers[s.Datacenter] 293 if !ok { 294 return nil 295 } 296 info.manager.RemoveServer(s) 297 298 // If this manager is empty then remove it so we don't accumulate cruft 299 // and waste time during request routing. 300 if num := info.manager.NumServers(); num == 0 { 301 r.removeManagerFromIndex(s.Datacenter, info.manager) 302 close(info.shutdownCh) 303 delete(area.managers, s.Datacenter) 304 } 305 306 return nil 307} 308 309// FailServer should be called whenever a server is failed in an area. This 310// is typically hooked into the Serf event handler area for this area. We will 311// immediately shift traffic away from this server, but it will remain in the 312// list of servers. 313func (r *Router) FailServer(areaID types.AreaID, s *metadata.Server) error { 314 r.RLock() 315 defer r.RUnlock() 316 317 area, ok := r.areas[areaID] 318 if !ok { 319 return fmt.Errorf("area ID %q does not exist", areaID) 320 } 321 322 // If the manager has already been removed we just quietly exit. This 323 // can get called by Serf events, so the timing isn't totally 324 // deterministic. 325 info, ok := area.managers[s.Datacenter] 326 if !ok { 327 return nil 328 } 329 330 info.manager.NotifyFailedServer(s) 331 return nil 332} 333 334// FindRoute returns a healthy server with a route to the given datacenter. The 335// Boolean return parameter will indicate if a server was available. In some 336// cases this may return a best-effort unhealthy server that can be used for a 337// connection attempt. If any problem occurs with the given server, the caller 338// should feed that back to the manager associated with the server, which is 339// also returned, by calling NotifyFailedServer(). 340func (r *Router) FindRoute(datacenter string) (*Manager, *metadata.Server, bool) { 341 return r.routeFn(datacenter) 342} 343 344// findDirectRoute looks for a route to the given datacenter if it's directly 345// adjacent to the server. 346func (r *Router) findDirectRoute(datacenter string) (*Manager, *metadata.Server, bool) { 347 r.RLock() 348 defer r.RUnlock() 349 350 // Get the list of managers for this datacenter. This will usually just 351 // have one entry, but it's possible to have a user-defined area + WAN. 352 managers, ok := r.managers[datacenter] 353 if !ok { 354 return nil, nil, false 355 } 356 357 // Try each manager until we get a server. 358 for _, manager := range managers { 359 if manager.IsOffline() { 360 continue 361 } 362 363 if s := manager.FindServer(); s != nil { 364 return manager, s, true 365 } 366 } 367 368 // Didn't find a route (even via an unhealthy server). 369 return nil, nil, false 370} 371 372// CheckServers returns thwo things 373// 1. bool to indicate whether any servers were processed 374// 2. error if any propagated from the fn 375// 376// The fn called should return a bool indicating whether checks should continue and an error 377// If an error is returned then checks will stop immediately 378func (r *Router) CheckServers(dc string, fn func(srv *metadata.Server) bool) { 379 r.RLock() 380 defer r.RUnlock() 381 382 managers, ok := r.managers[dc] 383 if !ok { 384 return 385 } 386 387 for _, m := range managers { 388 if !m.checkServers(fn) { 389 return 390 } 391 } 392} 393 394// GetDatacenters returns a list of datacenters known to the router, sorted by 395// name. 396func (r *Router) GetDatacenters() []string { 397 r.RLock() 398 defer r.RUnlock() 399 400 dcs := make([]string, 0, len(r.managers)) 401 for dc := range r.managers { 402 dcs = append(dcs, dc) 403 } 404 405 sort.Strings(dcs) 406 return dcs 407} 408 409// HasDatacenter checks whether dc is defined in WAN 410func (r *Router) HasDatacenter(dc string) bool { 411 r.RLock() 412 defer r.RUnlock() 413 _, ok := r.managers[dc] 414 return ok 415} 416 417// datacenterSorter takes a list of DC names and a parallel vector of distances 418// and implements sort.Interface, keeping both structures coherent and sorting 419// by distance. 420type datacenterSorter struct { 421 Names []string 422 Vec []float64 423} 424 425// See sort.Interface. 426func (n *datacenterSorter) Len() int { 427 return len(n.Names) 428} 429 430// See sort.Interface. 431func (n *datacenterSorter) Swap(i, j int) { 432 n.Names[i], n.Names[j] = n.Names[j], n.Names[i] 433 n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i] 434} 435 436// See sort.Interface. 437func (n *datacenterSorter) Less(i, j int) bool { 438 return n.Vec[i] < n.Vec[j] 439} 440 441// GetDatacentersByDistance returns a list of datacenters known to the router, 442// sorted by median RTT from this server to the servers in each datacenter. If 443// there are multiple areas that reach a given datacenter, this will use the 444// lowest RTT for the sort. 445func (r *Router) GetDatacentersByDistance() ([]string, error) { 446 r.RLock() 447 defer r.RUnlock() 448 449 // Go through each area and aggregate the median RTT from the current 450 // server to the other servers in each datacenter. 451 dcs := make(map[string]float64) 452 for areaID, info := range r.areas { 453 index := make(map[string][]float64) 454 coord, err := info.cluster.GetCoordinate() 455 if err != nil { 456 return nil, err 457 } 458 459 for _, m := range info.cluster.Members() { 460 ok, parts := metadata.IsConsulServer(m) 461 if !ok { 462 r.logger.Warn("Non-server in server-only area", 463 "non_server", m.Name, 464 "area", areaID, 465 ) 466 continue 467 } 468 469 if m.Status == serf.StatusLeft { 470 r.logger.Debug("server in area left, skipping", 471 "server", m.Name, 472 "area", areaID, 473 ) 474 continue 475 } 476 477 existing := index[parts.Datacenter] 478 if parts.Datacenter == r.localDatacenter { 479 // Everything in the local datacenter looks like zero RTT. 480 index[parts.Datacenter] = append(existing, 0.0) 481 } else { 482 // It's OK to get a nil coordinate back, ComputeDistance 483 // will put the RTT at positive infinity. 484 other, _ := info.cluster.GetCachedCoordinate(parts.Name) 485 rtt := lib.ComputeDistance(coord, other) 486 index[parts.Datacenter] = append(existing, rtt) 487 } 488 } 489 490 // Compute the median RTT between this server and the servers 491 // in each datacenter. We accumulate the lowest RTT to each DC 492 // in the master map, since a given DC might appear in multiple 493 // areas. 494 for dc, rtts := range index { 495 sort.Float64s(rtts) 496 rtt := rtts[len(rtts)/2] 497 498 current, ok := dcs[dc] 499 if !ok || (ok && rtt < current) { 500 dcs[dc] = rtt 501 } 502 } 503 } 504 505 // First sort by DC name, since we do a stable sort later. 506 names := make([]string, 0, len(dcs)) 507 for dc := range dcs { 508 names = append(names, dc) 509 } 510 sort.Strings(names) 511 512 // Then stable sort by median RTT. 513 rtts := make([]float64, 0, len(dcs)) 514 for _, dc := range names { 515 rtts = append(rtts, dcs[dc]) 516 } 517 sort.Stable(&datacenterSorter{names, rtts}) 518 return names, nil 519} 520 521// GetDatacenterMaps returns a structure with the raw network coordinates of 522// each known server, organized by datacenter and network area. 523func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { 524 r.RLock() 525 defer r.RUnlock() 526 527 var maps []structs.DatacenterMap 528 for areaID, info := range r.areas { 529 index := make(map[string]structs.Coordinates) 530 for _, m := range info.cluster.Members() { 531 ok, parts := metadata.IsConsulServer(m) 532 if !ok { 533 r.logger.Warn("Non-server in server-only area", 534 "non_server", m.Name, 535 "area", areaID, 536 ) 537 continue 538 } 539 540 if m.Status == serf.StatusLeft { 541 r.logger.Debug("server in area left, skipping", 542 "server", m.Name, 543 "area", areaID, 544 ) 545 continue 546 } 547 548 coord, ok := info.cluster.GetCachedCoordinate(parts.Name) 549 if ok { 550 entry := &structs.Coordinate{ 551 Node: parts.Name, 552 Coord: coord, 553 } 554 existing := index[parts.Datacenter] 555 index[parts.Datacenter] = append(existing, entry) 556 } 557 } 558 559 for dc, coords := range index { 560 entry := structs.DatacenterMap{ 561 Datacenter: dc, 562 AreaID: areaID, 563 Coordinates: coords, 564 } 565 maps = append(maps, entry) 566 } 567 } 568 return maps, nil 569} 570