1package dependency
2
3import (
4	"log"
5	"net/url"
6	"sort"
7	"time"
8
9	"github.com/hashicorp/consul/api"
10	"github.com/pkg/errors"
11)
12
13var (
14	// Ensure implements
15	_ Dependency = (*CatalogDatacentersQuery)(nil)
16
17	// CatalogDatacentersQuerySleepTime is the amount of time to sleep between
18	// queries, since the endpoint does not support blocking queries.
19	CatalogDatacentersQuerySleepTime = 15 * time.Second
20)
21
22// CatalogDatacentersQuery is the dependency to query all datacenters
23type CatalogDatacentersQuery struct {
24	ignoreFailing bool
25
26	stopCh chan struct{}
27}
28
29// NewCatalogDatacentersQuery creates a new datacenter dependency.
30func NewCatalogDatacentersQuery(ignoreFailing bool) (*CatalogDatacentersQuery, error) {
31	return &CatalogDatacentersQuery{
32		ignoreFailing: ignoreFailing,
33		stopCh:        make(chan struct{}, 1),
34	}, nil
35}
36
37// Fetch queries the Consul API defined by the given client and returns a slice
38// of strings representing the datacenters
39func (d *CatalogDatacentersQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interface{}, *ResponseMetadata, error) {
40	opts = opts.Merge(&QueryOptions{})
41
42	log.Printf("[TRACE] %s: GET %s", d, &url.URL{
43		Path:     "/v1/catalog/datacenters",
44		RawQuery: opts.String(),
45	})
46
47	// This is pretty ghetto, but the datacenters endpoint does not support
48	// blocking queries, so we are going to "fake it until we make it". When we
49	// first query, the LastIndex will be "0", meaning we should immediately
50	// return data, but future calls will include a LastIndex. If we have a
51	// LastIndex in the query metadata, sleep for 15 seconds before asking Consul
52	// again.
53	//
54	// This is probably okay given the frequency in which datacenters actually
55	// change, but is technically not edge-triggering.
56	if opts.WaitIndex != 0 {
57		log.Printf("[TRACE] %s: long polling for %s", d, CatalogDatacentersQuerySleepTime)
58
59		select {
60		case <-d.stopCh:
61			return nil, nil, ErrStopped
62		case <-time.After(CatalogDatacentersQuerySleepTime):
63		}
64	}
65
66	result, err := clients.Consul().Catalog().Datacenters()
67	if err != nil {
68		return nil, nil, errors.Wrapf(err, d.String())
69	}
70
71	// If the user opted in for skipping "down" datacenters, figure out which
72	// datacenters are down.
73	if d.ignoreFailing {
74		dcs := make([]string, 0, len(result))
75		for _, dc := range result {
76			if _, _, err := clients.Consul().Catalog().Services(&api.QueryOptions{
77				Datacenter:        dc,
78				AllowStale:        false,
79				RequireConsistent: true,
80			}); err == nil {
81				dcs = append(dcs, dc)
82			}
83		}
84		result = dcs
85	}
86
87	log.Printf("[TRACE] %s: returned %d results", d, len(result))
88
89	sort.Strings(result)
90
91	return respWithMetadata(result)
92}
93
94// CanShare returns if this dependency is shareable.
95func (d *CatalogDatacentersQuery) CanShare() bool {
96	return true
97}
98
99// String returns the human-friendly version of this dependency.
100func (d *CatalogDatacentersQuery) String() string {
101	return "catalog.datacenters"
102}
103
104// Stop terminates this dependency's fetch.
105func (d *CatalogDatacentersQuery) Stop() {
106	close(d.stopCh)
107}
108
109// Type returns the type of this dependency.
110func (d *CatalogDatacentersQuery) Type() Type {
111	return TypeConsul
112}
113