1package dependency 2 3import ( 4 "log" 5 "net/url" 6 "sort" 7 "time" 8 9 "github.com/hashicorp/consul/api" 10 "github.com/pkg/errors" 11) 12 13var ( 14 // Ensure implements 15 _ Dependency = (*CatalogDatacentersQuery)(nil) 16 17 // CatalogDatacentersQuerySleepTime is the amount of time to sleep between 18 // queries, since the endpoint does not support blocking queries. 19 CatalogDatacentersQuerySleepTime = 15 * time.Second 20) 21 22// CatalogDatacentersQuery is the dependency to query all datacenters 23type CatalogDatacentersQuery struct { 24 ignoreFailing bool 25 26 stopCh chan struct{} 27} 28 29// NewCatalogDatacentersQuery creates a new datacenter dependency. 30func NewCatalogDatacentersQuery(ignoreFailing bool) (*CatalogDatacentersQuery, error) { 31 return &CatalogDatacentersQuery{ 32 ignoreFailing: ignoreFailing, 33 stopCh: make(chan struct{}, 1), 34 }, nil 35} 36 37// Fetch queries the Consul API defined by the given client and returns a slice 38// of strings representing the datacenters 39func (d *CatalogDatacentersQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) { 40 opts = opts.Merge(&QueryOptions{}) 41 42 log.Printf("[TRACE] %s: GET %s", d, &url.URL{ 43 Path: "/v1/catalog/datacenters", 44 RawQuery: opts.String(), 45 }) 46 47 // This is pretty ghetto, but the datacenters endpoint does not support 48 // blocking queries, so we are going to "fake it until we make it". When we 49 // first query, the LastIndex will be "0", meaning we should immediately 50 // return data, but future calls will include a LastIndex. If we have a 51 // LastIndex in the query metadata, sleep for 15 seconds before asking Consul 52 // again. 53 // 54 // This is probably okay given the frequency in which datacenters actually 55 // change, but is technically not edge-triggering. 56 if opts.WaitIndex != 0 { 57 log.Printf("[TRACE] %s: long polling for %s", d, CatalogDatacentersQuerySleepTime) 58 59 select { 60 case <-d.stopCh: 61 return nil, nil, ErrStopped 62 case <-time.After(CatalogDatacentersQuerySleepTime): 63 } 64 } 65 66 result, err := clients.Consul().Catalog().Datacenters() 67 if err != nil { 68 return nil, nil, errors.Wrapf(err, d.String()) 69 } 70 71 // If the user opted in for skipping "down" datacenters, figure out which 72 // datacenters are down. 73 if d.ignoreFailing { 74 dcs := make([]string, 0, len(result)) 75 for _, dc := range result { 76 if _, _, err := clients.Consul().Catalog().Services(&api.QueryOptions{ 77 Datacenter: dc, 78 AllowStale: false, 79 RequireConsistent: true, 80 }); err == nil { 81 dcs = append(dcs, dc) 82 } 83 } 84 result = dcs 85 } 86 87 log.Printf("[TRACE] %s: returned %d results", d, len(result)) 88 89 sort.Strings(result) 90 91 return respWithMetadata(result) 92} 93 94// CanShare returns if this dependency is shareable. 95func (d *CatalogDatacentersQuery) CanShare() bool { 96 return true 97} 98 99// String returns the human-friendly version of this dependency. 100func (d *CatalogDatacentersQuery) String() string { 101 return "catalog.datacenters" 102} 103 104// Stop terminates this dependency's fetch. 105func (d *CatalogDatacentersQuery) Stop() { 106 close(d.stopCh) 107} 108 109// Type returns the type of this dependency. 110func (d *CatalogDatacentersQuery) Type() Type { 111 return TypeConsul 112} 113