1package api
2
3import (
4	"bytes"
5	"crypto/tls"
6	"encoding/json"
7	"fmt"
8	"io"
9	"log"
10	"net"
11	"net/http"
12	"net/url"
13	"os"
14	"strconv"
15	"strings"
16	"time"
17)
18
19// QueryOptions are used to parameterize a query
20type QueryOptions struct {
21	// Providing a datacenter overwrites the DC provided
22	// by the Config
23	Datacenter string
24
25	// AllowStale allows any Consul server (non-leader) to service
26	// a read. This allows for lower latency and higher throughput
27	AllowStale bool
28
29	// RequireConsistent forces the read to be fully consistent.
30	// This is more expensive but prevents ever performing a stale
31	// read.
32	RequireConsistent bool
33
34	// WaitIndex is used to enable a blocking query. Waits
35	// until the timeout or the next index is reached
36	WaitIndex uint64
37
38	// WaitTime is used to bound the duration of a wait.
39	// Defaults to that of the Config, but can be overriden.
40	WaitTime time.Duration
41
42	// Token is used to provide a per-request ACL token
43	// which overrides the agent's default token.
44	Token string
45}
46
47// WriteOptions are used to parameterize a write
48type WriteOptions struct {
49	// Providing a datacenter overwrites the DC provided
50	// by the Config
51	Datacenter string
52
53	// Token is used to provide a per-request ACL token
54	// which overrides the agent's default token.
55	Token string
56}
57
58// QueryMeta is used to return meta data about a query
59type QueryMeta struct {
60	// LastIndex. This can be used as a WaitIndex to perform
61	// a blocking query
62	LastIndex uint64
63
64	// Time of last contact from the leader for the
65	// server servicing the request
66	LastContact time.Duration
67
68	// Is there a known leader
69	KnownLeader bool
70
71	// How long did the request take
72	RequestTime time.Duration
73}
74
75// WriteMeta is used to return meta data about a write
76type WriteMeta struct {
77	// How long did the request take
78	RequestTime time.Duration
79}
80
81// HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication
82type HttpBasicAuth struct {
83	// Username to use for HTTP Basic Authentication
84	Username string
85
86	// Password to use for HTTP Basic Authentication
87	Password string
88}
89
90// Config is used to configure the creation of a client
91type Config struct {
92	// Address is the address of the Consul server
93	Address string
94
95	// Scheme is the URI scheme for the Consul server
96	Scheme string
97
98	// Datacenter to use. If not provided, the default agent datacenter is used.
99	Datacenter string
100
101	// HttpClient is the client to use. Default will be
102	// used if not provided.
103	HttpClient *http.Client
104
105	// HttpAuth is the auth info to use for http access.
106	HttpAuth *HttpBasicAuth
107
108	// WaitTime limits how long a Watch will block. If not provided,
109	// the agent default values will be used.
110	WaitTime time.Duration
111
112	// Token is used to provide a per-request ACL token
113	// which overrides the agent's default token.
114	Token string
115}
116
117// DefaultConfig returns a default configuration for the client
118func DefaultConfig() *Config {
119	config := &Config{
120		Address:    "127.0.0.1:8500",
121		Scheme:     "http",
122		HttpClient: http.DefaultClient,
123	}
124
125	if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" {
126		config.Address = addr
127	}
128
129	if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" {
130		config.Token = token
131	}
132
133	if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" {
134		var username, password string
135		if strings.Contains(auth, ":") {
136			split := strings.SplitN(auth, ":", 2)
137			username = split[0]
138			password = split[1]
139		} else {
140			username = auth
141		}
142
143		config.HttpAuth = &HttpBasicAuth{
144			Username: username,
145			Password: password,
146		}
147	}
148
149	if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" {
150		enabled, err := strconv.ParseBool(ssl)
151		if err != nil {
152			log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err)
153		}
154
155		if enabled {
156			config.Scheme = "https"
157		}
158	}
159
160	if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" {
161		doVerify, err := strconv.ParseBool(verify)
162		if err != nil {
163			log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err)
164		}
165
166		if !doVerify {
167			config.HttpClient.Transport = &http.Transport{
168				TLSClientConfig: &tls.Config{
169					InsecureSkipVerify: true,
170				},
171			}
172		}
173	}
174
175	return config
176}
177
178// Client provides a client to the Consul API
179type Client struct {
180	config Config
181}
182
183// NewClient returns a new client
184func NewClient(config *Config) (*Client, error) {
185	// bootstrap the config
186	defConfig := DefaultConfig()
187
188	if len(config.Address) == 0 {
189		config.Address = defConfig.Address
190	}
191
192	if len(config.Scheme) == 0 {
193		config.Scheme = defConfig.Scheme
194	}
195
196	if config.HttpClient == nil {
197		config.HttpClient = defConfig.HttpClient
198	}
199
200	if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
201		config.HttpClient = &http.Client{
202			Transport: &http.Transport{
203				Dial: func(_, _ string) (net.Conn, error) {
204					return net.Dial("unix", parts[1])
205				},
206			},
207		}
208		config.Address = parts[1]
209	}
210
211	client := &Client{
212		config: *config,
213	}
214	return client, nil
215}
216
217// request is used to help build up a request
218type request struct {
219	config *Config
220	method string
221	url    *url.URL
222	params url.Values
223	body   io.Reader
224	obj    interface{}
225}
226
227// setQueryOptions is used to annotate the request with
228// additional query options
229func (r *request) setQueryOptions(q *QueryOptions) {
230	if q == nil {
231		return
232	}
233	if q.Datacenter != "" {
234		r.params.Set("dc", q.Datacenter)
235	}
236	if q.AllowStale {
237		r.params.Set("stale", "")
238	}
239	if q.RequireConsistent {
240		r.params.Set("consistent", "")
241	}
242	if q.WaitIndex != 0 {
243		r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10))
244	}
245	if q.WaitTime != 0 {
246		r.params.Set("wait", durToMsec(q.WaitTime))
247	}
248	if q.Token != "" {
249		r.params.Set("token", q.Token)
250	}
251}
252
253// durToMsec converts a duration to a millisecond specified string
254func durToMsec(dur time.Duration) string {
255	return fmt.Sprintf("%dms", dur/time.Millisecond)
256}
257
258// setWriteOptions is used to annotate the request with
259// additional write options
260func (r *request) setWriteOptions(q *WriteOptions) {
261	if q == nil {
262		return
263	}
264	if q.Datacenter != "" {
265		r.params.Set("dc", q.Datacenter)
266	}
267	if q.Token != "" {
268		r.params.Set("token", q.Token)
269	}
270}
271
272// toHTTP converts the request to an HTTP request
273func (r *request) toHTTP() (*http.Request, error) {
274	// Encode the query parameters
275	r.url.RawQuery = r.params.Encode()
276
277	// Check if we should encode the body
278	if r.body == nil && r.obj != nil {
279		if b, err := encodeBody(r.obj); err != nil {
280			return nil, err
281		} else {
282			r.body = b
283		}
284	}
285
286	// Create the HTTP request
287	req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body)
288	if err != nil {
289		return nil, err
290	}
291
292	req.URL.Host = r.url.Host
293	req.URL.Scheme = r.url.Scheme
294	req.Host = r.url.Host
295
296	// Setup auth
297	if r.config.HttpAuth != nil {
298		req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
299	}
300
301	return req, nil
302}
303
304// newRequest is used to create a new request
305func (c *Client) newRequest(method, path string) *request {
306	r := &request{
307		config: &c.config,
308		method: method,
309		url: &url.URL{
310			Scheme: c.config.Scheme,
311			Host:   c.config.Address,
312			Path:   path,
313		},
314		params: make(map[string][]string),
315	}
316	if c.config.Datacenter != "" {
317		r.params.Set("dc", c.config.Datacenter)
318	}
319	if c.config.WaitTime != 0 {
320		r.params.Set("wait", durToMsec(r.config.WaitTime))
321	}
322	if c.config.Token != "" {
323		r.params.Set("token", r.config.Token)
324	}
325	return r
326}
327
328// doRequest runs a request with our client
329func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
330	req, err := r.toHTTP()
331	if err != nil {
332		return 0, nil, err
333	}
334	start := time.Now()
335	resp, err := c.config.HttpClient.Do(req)
336	diff := time.Now().Sub(start)
337	return diff, resp, err
338}
339
340// Query is used to do a GET request against an endpoint
341// and deserialize the response into an interface using
342// standard Consul conventions.
343func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) {
344	r := c.newRequest("GET", endpoint)
345	r.setQueryOptions(q)
346	rtt, resp, err := requireOK(c.doRequest(r))
347	if err != nil {
348		return nil, err
349	}
350	defer resp.Body.Close()
351
352	qm := &QueryMeta{}
353	parseQueryMeta(resp, qm)
354	qm.RequestTime = rtt
355
356	if err := decodeBody(resp, out); err != nil {
357		return nil, err
358	}
359	return qm, nil
360}
361
362// write is used to do a PUT request against an endpoint
363// and serialize/deserialized using the standard Consul conventions.
364func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) {
365	r := c.newRequest("PUT", endpoint)
366	r.setWriteOptions(q)
367	r.obj = in
368	rtt, resp, err := requireOK(c.doRequest(r))
369	if err != nil {
370		return nil, err
371	}
372	defer resp.Body.Close()
373
374	wm := &WriteMeta{RequestTime: rtt}
375	if out != nil {
376		if err := decodeBody(resp, &out); err != nil {
377			return nil, err
378		}
379	}
380	return wm, nil
381}
382
383// parseQueryMeta is used to help parse query meta-data
384func parseQueryMeta(resp *http.Response, q *QueryMeta) error {
385	header := resp.Header
386
387	// Parse the X-Consul-Index
388	index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64)
389	if err != nil {
390		return fmt.Errorf("Failed to parse X-Consul-Index: %v", err)
391	}
392	q.LastIndex = index
393
394	// Parse the X-Consul-LastContact
395	last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64)
396	if err != nil {
397		return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err)
398	}
399	q.LastContact = time.Duration(last) * time.Millisecond
400
401	// Parse the X-Consul-KnownLeader
402	switch header.Get("X-Consul-KnownLeader") {
403	case "true":
404		q.KnownLeader = true
405	default:
406		q.KnownLeader = false
407	}
408	return nil
409}
410
411// decodeBody is used to JSON decode a body
412func decodeBody(resp *http.Response, out interface{}) error {
413	dec := json.NewDecoder(resp.Body)
414	return dec.Decode(out)
415}
416
417// encodeBody is used to encode a request body
418func encodeBody(obj interface{}) (io.Reader, error) {
419	buf := bytes.NewBuffer(nil)
420	enc := json.NewEncoder(buf)
421	if err := enc.Encode(obj); err != nil {
422		return nil, err
423	}
424	return buf, nil
425}
426
427// requireOK is used to wrap doRequest and check for a 200
428func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) {
429	if e != nil {
430		if resp != nil {
431			resp.Body.Close()
432		}
433		return d, nil, e
434	}
435	if resp.StatusCode != 200 {
436		var buf bytes.Buffer
437		io.Copy(&buf, resp.Body)
438		resp.Body.Close()
439		return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes())
440	}
441	return d, resp, nil
442}
443