1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14// Package prometheus provides bindings to the Prometheus HTTP API:
15// http://prometheus.io/docs/querying/api/
16package prometheus
17
18import (
19	"encoding/json"
20	"fmt"
21	"io/ioutil"
22	"net"
23	"net/http"
24	"net/url"
25	"path"
26	"strconv"
27	"strings"
28	"time"
29
30	"github.com/prometheus/common/model"
31	"golang.org/x/net/context"
32	"golang.org/x/net/context/ctxhttp"
33)
34
35const (
36	statusAPIError = 422
37	apiPrefix      = "/api/v1"
38
39	epQuery       = "/query"
40	epQueryRange  = "/query_range"
41	epLabelValues = "/label/:name/values"
42	epSeries      = "/series"
43)
44
45type ErrorType string
46
47const (
48	// The different API error types.
49	ErrBadData     ErrorType = "bad_data"
50	ErrTimeout               = "timeout"
51	ErrCanceled              = "canceled"
52	ErrExec                  = "execution"
53	ErrBadResponse           = "bad_response"
54)
55
56// Error is an error returned by the API.
57type Error struct {
58	Type ErrorType
59	Msg  string
60}
61
62func (e *Error) Error() string {
63	return fmt.Sprintf("%s: %s", e.Type, e.Msg)
64}
65
66// CancelableTransport is like net.Transport but provides
67// per-request cancelation functionality.
68type CancelableTransport interface {
69	http.RoundTripper
70	CancelRequest(req *http.Request)
71}
72
73var DefaultTransport CancelableTransport = &http.Transport{
74	Proxy: http.ProxyFromEnvironment,
75	Dial: (&net.Dialer{
76		Timeout:   30 * time.Second,
77		KeepAlive: 30 * time.Second,
78	}).Dial,
79	TLSHandshakeTimeout: 10 * time.Second,
80}
81
82// Config defines configuration parameters for a new client.
83type Config struct {
84	// The address of the Prometheus to connect to.
85	Address string
86
87	// Transport is used by the Client to drive HTTP requests. If not
88	// provided, DefaultTransport will be used.
89	Transport CancelableTransport
90}
91
92func (cfg *Config) transport() CancelableTransport {
93	if cfg.Transport == nil {
94		return DefaultTransport
95	}
96	return cfg.Transport
97}
98
99type Client interface {
100	url(ep string, args map[string]string) *url.URL
101	do(context.Context, *http.Request) (*http.Response, []byte, error)
102}
103
104// New returns a new Client.
105//
106// It is safe to use the returned Client from multiple goroutines.
107func New(cfg Config) (Client, error) {
108	u, err := url.Parse(cfg.Address)
109	if err != nil {
110		return nil, err
111	}
112	u.Path = strings.TrimRight(u.Path, "/") + apiPrefix
113
114	return &httpClient{
115		endpoint:  u,
116		transport: cfg.transport(),
117	}, nil
118}
119
120type httpClient struct {
121	endpoint  *url.URL
122	transport CancelableTransport
123}
124
125func (c *httpClient) url(ep string, args map[string]string) *url.URL {
126	p := path.Join(c.endpoint.Path, ep)
127
128	for arg, val := range args {
129		arg = ":" + arg
130		p = strings.Replace(p, arg, val, -1)
131	}
132
133	u := *c.endpoint
134	u.Path = p
135
136	return &u
137}
138
139func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
140	resp, err := ctxhttp.Do(ctx, &http.Client{Transport: c.transport}, req)
141
142	defer func() {
143		if resp != nil {
144			resp.Body.Close()
145		}
146	}()
147
148	if err != nil {
149		return nil, nil, err
150	}
151
152	var body []byte
153	done := make(chan struct{})
154	go func() {
155		body, err = ioutil.ReadAll(resp.Body)
156		close(done)
157	}()
158
159	select {
160	case <-ctx.Done():
161		err = resp.Body.Close()
162		<-done
163		if err == nil {
164			err = ctx.Err()
165		}
166	case <-done:
167	}
168
169	return resp, body, err
170}
171
172// apiClient wraps a regular client and processes successful API responses.
173// Successful also includes responses that errored at the API level.
174type apiClient struct {
175	Client
176}
177
178type apiResponse struct {
179	Status    string          `json:"status"`
180	Data      json.RawMessage `json:"data"`
181	ErrorType ErrorType       `json:"errorType"`
182	Error     string          `json:"error"`
183}
184
185func (c apiClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
186	resp, body, err := c.Client.do(ctx, req)
187	if err != nil {
188		return resp, body, err
189	}
190
191	code := resp.StatusCode
192
193	if code/100 != 2 && code != statusAPIError {
194		return resp, body, &Error{
195			Type: ErrBadResponse,
196			Msg:  fmt.Sprintf("bad response code %d", resp.StatusCode),
197		}
198	}
199
200	var result apiResponse
201
202	if err = json.Unmarshal(body, &result); err != nil {
203		return resp, body, &Error{
204			Type: ErrBadResponse,
205			Msg:  err.Error(),
206		}
207	}
208
209	if (code == statusAPIError) != (result.Status == "error") {
210		err = &Error{
211			Type: ErrBadResponse,
212			Msg:  "inconsistent body for response code",
213		}
214	}
215
216	if code == statusAPIError && result.Status == "error" {
217		err = &Error{
218			Type: result.ErrorType,
219			Msg:  result.Error,
220		}
221	}
222
223	return resp, []byte(result.Data), err
224}
225
226// Range represents a sliced time range.
227type Range struct {
228	// The boundaries of the time range.
229	Start, End time.Time
230	// The maximum time between two slices within the boundaries.
231	Step time.Duration
232}
233
234// queryResult contains result data for a query.
235type queryResult struct {
236	Type   model.ValueType `json:"resultType"`
237	Result interface{}     `json:"result"`
238
239	// The decoded value.
240	v model.Value
241}
242
243func (qr *queryResult) UnmarshalJSON(b []byte) error {
244	v := struct {
245		Type   model.ValueType `json:"resultType"`
246		Result json.RawMessage `json:"result"`
247	}{}
248
249	err := json.Unmarshal(b, &v)
250	if err != nil {
251		return err
252	}
253
254	switch v.Type {
255	case model.ValScalar:
256		var sv model.Scalar
257		err = json.Unmarshal(v.Result, &sv)
258		qr.v = &sv
259
260	case model.ValVector:
261		var vv model.Vector
262		err = json.Unmarshal(v.Result, &vv)
263		qr.v = vv
264
265	case model.ValMatrix:
266		var mv model.Matrix
267		err = json.Unmarshal(v.Result, &mv)
268		qr.v = mv
269
270	default:
271		err = fmt.Errorf("unexpected value type %q", v.Type)
272	}
273	return err
274}
275
276// QueryAPI provides bindings the Prometheus's query API.
277type QueryAPI interface {
278	// Query performs a query for the given time.
279	Query(ctx context.Context, query string, ts time.Time) (model.Value, error)
280	// Query performs a query for the given range.
281	QueryRange(ctx context.Context, query string, r Range) (model.Value, error)
282}
283
284// NewQueryAPI returns a new QueryAPI for the client.
285//
286// It is safe to use the returned QueryAPI from multiple goroutines.
287func NewQueryAPI(c Client) QueryAPI {
288	return &httpQueryAPI{client: apiClient{c}}
289}
290
291type httpQueryAPI struct {
292	client Client
293}
294
295func (h *httpQueryAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, error) {
296	u := h.client.url(epQuery, nil)
297	q := u.Query()
298
299	q.Set("query", query)
300	q.Set("time", ts.Format(time.RFC3339Nano))
301
302	u.RawQuery = q.Encode()
303
304	req, _ := http.NewRequest("GET", u.String(), nil)
305
306	_, body, err := h.client.do(ctx, req)
307	if err != nil {
308		return nil, err
309	}
310
311	var qres queryResult
312	err = json.Unmarshal(body, &qres)
313
314	return model.Value(qres.v), err
315}
316
317func (h *httpQueryAPI) QueryRange(ctx context.Context, query string, r Range) (model.Value, error) {
318	u := h.client.url(epQueryRange, nil)
319	q := u.Query()
320
321	var (
322		start = r.Start.Format(time.RFC3339Nano)
323		end   = r.End.Format(time.RFC3339Nano)
324		step  = strconv.FormatFloat(r.Step.Seconds(), 'f', 3, 64)
325	)
326
327	q.Set("query", query)
328	q.Set("start", start)
329	q.Set("end", end)
330	q.Set("step", step)
331
332	u.RawQuery = q.Encode()
333
334	req, _ := http.NewRequest("GET", u.String(), nil)
335
336	_, body, err := h.client.do(ctx, req)
337	if err != nil {
338		return nil, err
339	}
340
341	var qres queryResult
342	err = json.Unmarshal(body, &qres)
343
344	return model.Value(qres.v), err
345}
346