1package api 2 3import ( 4 "bytes" 5 "crypto/tls" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net" 11 "net/http" 12 "net/url" 13 "os" 14 "strconv" 15 "strings" 16 "time" 17) 18 19// QueryOptions are used to parameterize a query 20type QueryOptions struct { 21 // Providing a datacenter overwrites the DC provided 22 // by the Config 23 Datacenter string 24 25 // AllowStale allows any Consul server (non-leader) to service 26 // a read. This allows for lower latency and higher throughput 27 AllowStale bool 28 29 // RequireConsistent forces the read to be fully consistent. 30 // This is more expensive but prevents ever performing a stale 31 // read. 32 RequireConsistent bool 33 34 // WaitIndex is used to enable a blocking query. Waits 35 // until the timeout or the next index is reached 36 WaitIndex uint64 37 38 // WaitTime is used to bound the duration of a wait. 39 // Defaults to that of the Config, but can be overriden. 40 WaitTime time.Duration 41 42 // Token is used to provide a per-request ACL token 43 // which overrides the agent's default token. 44 Token string 45} 46 47// WriteOptions are used to parameterize a write 48type WriteOptions struct { 49 // Providing a datacenter overwrites the DC provided 50 // by the Config 51 Datacenter string 52 53 // Token is used to provide a per-request ACL token 54 // which overrides the agent's default token. 55 Token string 56} 57 58// QueryMeta is used to return meta data about a query 59type QueryMeta struct { 60 // LastIndex. This can be used as a WaitIndex to perform 61 // a blocking query 62 LastIndex uint64 63 64 // Time of last contact from the leader for the 65 // server servicing the request 66 LastContact time.Duration 67 68 // Is there a known leader 69 KnownLeader bool 70 71 // How long did the request take 72 RequestTime time.Duration 73} 74 75// WriteMeta is used to return meta data about a write 76type WriteMeta struct { 77 // How long did the request take 78 RequestTime time.Duration 79} 80 81// HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication 82type HttpBasicAuth struct { 83 // Username to use for HTTP Basic Authentication 84 Username string 85 86 // Password to use for HTTP Basic Authentication 87 Password string 88} 89 90// Config is used to configure the creation of a client 91type Config struct { 92 // Address is the address of the Consul server 93 Address string 94 95 // Scheme is the URI scheme for the Consul server 96 Scheme string 97 98 // Datacenter to use. If not provided, the default agent datacenter is used. 99 Datacenter string 100 101 // HttpClient is the client to use. Default will be 102 // used if not provided. 103 HttpClient *http.Client 104 105 // HttpAuth is the auth info to use for http access. 106 HttpAuth *HttpBasicAuth 107 108 // WaitTime limits how long a Watch will block. If not provided, 109 // the agent default values will be used. 110 WaitTime time.Duration 111 112 // Token is used to provide a per-request ACL token 113 // which overrides the agent's default token. 114 Token string 115} 116 117// DefaultConfig returns a default configuration for the client 118func DefaultConfig() *Config { 119 config := &Config{ 120 Address: "127.0.0.1:8500", 121 Scheme: "http", 122 HttpClient: http.DefaultClient, 123 } 124 125 if addr := os.Getenv("CONSUL_HTTP_ADDR"); addr != "" { 126 config.Address = addr 127 } 128 129 if token := os.Getenv("CONSUL_HTTP_TOKEN"); token != "" { 130 config.Token = token 131 } 132 133 if auth := os.Getenv("CONSUL_HTTP_AUTH"); auth != "" { 134 var username, password string 135 if strings.Contains(auth, ":") { 136 split := strings.SplitN(auth, ":", 2) 137 username = split[0] 138 password = split[1] 139 } else { 140 username = auth 141 } 142 143 config.HttpAuth = &HttpBasicAuth{ 144 Username: username, 145 Password: password, 146 } 147 } 148 149 if ssl := os.Getenv("CONSUL_HTTP_SSL"); ssl != "" { 150 enabled, err := strconv.ParseBool(ssl) 151 if err != nil { 152 log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL: %s", err) 153 } 154 155 if enabled { 156 config.Scheme = "https" 157 } 158 } 159 160 if verify := os.Getenv("CONSUL_HTTP_SSL_VERIFY"); verify != "" { 161 doVerify, err := strconv.ParseBool(verify) 162 if err != nil { 163 log.Printf("[WARN] client: could not parse CONSUL_HTTP_SSL_VERIFY: %s", err) 164 } 165 166 if !doVerify { 167 config.HttpClient.Transport = &http.Transport{ 168 TLSClientConfig: &tls.Config{ 169 InsecureSkipVerify: true, 170 }, 171 } 172 } 173 } 174 175 return config 176} 177 178// Client provides a client to the Consul API 179type Client struct { 180 config Config 181} 182 183// NewClient returns a new client 184func NewClient(config *Config) (*Client, error) { 185 // bootstrap the config 186 defConfig := DefaultConfig() 187 188 if len(config.Address) == 0 { 189 config.Address = defConfig.Address 190 } 191 192 if len(config.Scheme) == 0 { 193 config.Scheme = defConfig.Scheme 194 } 195 196 if config.HttpClient == nil { 197 config.HttpClient = defConfig.HttpClient 198 } 199 200 if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 { 201 config.HttpClient = &http.Client{ 202 Transport: &http.Transport{ 203 Dial: func(_, _ string) (net.Conn, error) { 204 return net.Dial("unix", parts[1]) 205 }, 206 }, 207 } 208 config.Address = parts[1] 209 } 210 211 client := &Client{ 212 config: *config, 213 } 214 return client, nil 215} 216 217// request is used to help build up a request 218type request struct { 219 config *Config 220 method string 221 url *url.URL 222 params url.Values 223 body io.Reader 224 obj interface{} 225} 226 227// setQueryOptions is used to annotate the request with 228// additional query options 229func (r *request) setQueryOptions(q *QueryOptions) { 230 if q == nil { 231 return 232 } 233 if q.Datacenter != "" { 234 r.params.Set("dc", q.Datacenter) 235 } 236 if q.AllowStale { 237 r.params.Set("stale", "") 238 } 239 if q.RequireConsistent { 240 r.params.Set("consistent", "") 241 } 242 if q.WaitIndex != 0 { 243 r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10)) 244 } 245 if q.WaitTime != 0 { 246 r.params.Set("wait", durToMsec(q.WaitTime)) 247 } 248 if q.Token != "" { 249 r.params.Set("token", q.Token) 250 } 251} 252 253// durToMsec converts a duration to a millisecond specified string 254func durToMsec(dur time.Duration) string { 255 return fmt.Sprintf("%dms", dur/time.Millisecond) 256} 257 258// setWriteOptions is used to annotate the request with 259// additional write options 260func (r *request) setWriteOptions(q *WriteOptions) { 261 if q == nil { 262 return 263 } 264 if q.Datacenter != "" { 265 r.params.Set("dc", q.Datacenter) 266 } 267 if q.Token != "" { 268 r.params.Set("token", q.Token) 269 } 270} 271 272// toHTTP converts the request to an HTTP request 273func (r *request) toHTTP() (*http.Request, error) { 274 // Encode the query parameters 275 r.url.RawQuery = r.params.Encode() 276 277 // Check if we should encode the body 278 if r.body == nil && r.obj != nil { 279 if b, err := encodeBody(r.obj); err != nil { 280 return nil, err 281 } else { 282 r.body = b 283 } 284 } 285 286 // Create the HTTP request 287 req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) 288 if err != nil { 289 return nil, err 290 } 291 292 req.URL.Host = r.url.Host 293 req.URL.Scheme = r.url.Scheme 294 req.Host = r.url.Host 295 296 // Setup auth 297 if r.config.HttpAuth != nil { 298 req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) 299 } 300 301 return req, nil 302} 303 304// newRequest is used to create a new request 305func (c *Client) newRequest(method, path string) *request { 306 r := &request{ 307 config: &c.config, 308 method: method, 309 url: &url.URL{ 310 Scheme: c.config.Scheme, 311 Host: c.config.Address, 312 Path: path, 313 }, 314 params: make(map[string][]string), 315 } 316 if c.config.Datacenter != "" { 317 r.params.Set("dc", c.config.Datacenter) 318 } 319 if c.config.WaitTime != 0 { 320 r.params.Set("wait", durToMsec(r.config.WaitTime)) 321 } 322 if c.config.Token != "" { 323 r.params.Set("token", r.config.Token) 324 } 325 return r 326} 327 328// doRequest runs a request with our client 329func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { 330 req, err := r.toHTTP() 331 if err != nil { 332 return 0, nil, err 333 } 334 start := time.Now() 335 resp, err := c.config.HttpClient.Do(req) 336 diff := time.Now().Sub(start) 337 return diff, resp, err 338} 339 340// Query is used to do a GET request against an endpoint 341// and deserialize the response into an interface using 342// standard Consul conventions. 343func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { 344 r := c.newRequest("GET", endpoint) 345 r.setQueryOptions(q) 346 rtt, resp, err := requireOK(c.doRequest(r)) 347 if err != nil { 348 return nil, err 349 } 350 defer resp.Body.Close() 351 352 qm := &QueryMeta{} 353 parseQueryMeta(resp, qm) 354 qm.RequestTime = rtt 355 356 if err := decodeBody(resp, out); err != nil { 357 return nil, err 358 } 359 return qm, nil 360} 361 362// write is used to do a PUT request against an endpoint 363// and serialize/deserialized using the standard Consul conventions. 364func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { 365 r := c.newRequest("PUT", endpoint) 366 r.setWriteOptions(q) 367 r.obj = in 368 rtt, resp, err := requireOK(c.doRequest(r)) 369 if err != nil { 370 return nil, err 371 } 372 defer resp.Body.Close() 373 374 wm := &WriteMeta{RequestTime: rtt} 375 if out != nil { 376 if err := decodeBody(resp, &out); err != nil { 377 return nil, err 378 } 379 } 380 return wm, nil 381} 382 383// parseQueryMeta is used to help parse query meta-data 384func parseQueryMeta(resp *http.Response, q *QueryMeta) error { 385 header := resp.Header 386 387 // Parse the X-Consul-Index 388 index, err := strconv.ParseUint(header.Get("X-Consul-Index"), 10, 64) 389 if err != nil { 390 return fmt.Errorf("Failed to parse X-Consul-Index: %v", err) 391 } 392 q.LastIndex = index 393 394 // Parse the X-Consul-LastContact 395 last, err := strconv.ParseUint(header.Get("X-Consul-LastContact"), 10, 64) 396 if err != nil { 397 return fmt.Errorf("Failed to parse X-Consul-LastContact: %v", err) 398 } 399 q.LastContact = time.Duration(last) * time.Millisecond 400 401 // Parse the X-Consul-KnownLeader 402 switch header.Get("X-Consul-KnownLeader") { 403 case "true": 404 q.KnownLeader = true 405 default: 406 q.KnownLeader = false 407 } 408 return nil 409} 410 411// decodeBody is used to JSON decode a body 412func decodeBody(resp *http.Response, out interface{}) error { 413 dec := json.NewDecoder(resp.Body) 414 return dec.Decode(out) 415} 416 417// encodeBody is used to encode a request body 418func encodeBody(obj interface{}) (io.Reader, error) { 419 buf := bytes.NewBuffer(nil) 420 enc := json.NewEncoder(buf) 421 if err := enc.Encode(obj); err != nil { 422 return nil, err 423 } 424 return buf, nil 425} 426 427// requireOK is used to wrap doRequest and check for a 200 428func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { 429 if e != nil { 430 if resp != nil { 431 resp.Body.Close() 432 } 433 return d, nil, e 434 } 435 if resp.StatusCode != 200 { 436 var buf bytes.Buffer 437 io.Copy(&buf, resp.Body) 438 resp.Body.Close() 439 return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) 440 } 441 return d, resp, nil 442} 443