1// Copyright 2013 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package opentsdb
15
16import (
17	"bytes"
18	"encoding/json"
19	"fmt"
20	"io/ioutil"
21	"math"
22	"net/http"
23	"net/url"
24	"time"
25
26	"github.com/prometheus/common/log"
27	"github.com/prometheus/common/model"
28	"golang.org/x/net/context"
29	"golang.org/x/net/context/ctxhttp"
30)
31
32const (
33	putEndpoint     = "/api/put"
34	contentTypeJSON = "application/json"
35)
36
37// Client allows sending batches of Prometheus samples to OpenTSDB.
38type Client struct {
39	url     string
40	timeout time.Duration
41}
42
43// NewClient creates a new Client.
44func NewClient(url string, timeout time.Duration) *Client {
45	return &Client{
46		url:     url,
47		timeout: timeout,
48	}
49}
50
51// StoreSamplesRequest is used for building a JSON request for storing samples
52// via the OpenTSDB.
53type StoreSamplesRequest struct {
54	Metric    TagValue            `json:"metric"`
55	Timestamp int64               `json:"timestamp"`
56	Value     float64             `json:"value"`
57	Tags      map[string]TagValue `json:"tags"`
58}
59
60// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
61func tagsFromMetric(m model.Metric) map[string]TagValue {
62	tags := make(map[string]TagValue, len(m)-1)
63	for l, v := range m {
64		if l == model.MetricNameLabel {
65			continue
66		}
67		tags[string(l)] = TagValue(v)
68	}
69	return tags
70}
71
72// Write sends a batch of samples to OpenTSDB via its HTTP API.
73func (c *Client) Write(samples model.Samples) error {
74	reqs := make([]StoreSamplesRequest, 0, len(samples))
75	for _, s := range samples {
76		v := float64(s.Value)
77		if math.IsNaN(v) || math.IsInf(v, 0) {
78			log.Debugf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s)
79			continue
80		}
81		metric := TagValue(s.Metric[model.MetricNameLabel])
82		reqs = append(reqs, StoreSamplesRequest{
83			Metric:    metric,
84			Timestamp: s.Timestamp.Unix(),
85			Value:     v,
86			Tags:      tagsFromMetric(s.Metric),
87		})
88	}
89
90	u, err := url.Parse(c.url)
91	if err != nil {
92		return err
93	}
94
95	u.Path = putEndpoint
96
97	buf, err := json.Marshal(reqs)
98	if err != nil {
99		return err
100	}
101
102	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
103	defer cancel()
104
105	resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), contentTypeJSON, bytes.NewBuffer(buf))
106	if err != nil {
107		return err
108	}
109	defer resp.Body.Close()
110
111	// API returns status code 204 for successful writes.
112	// http://opentsdb.net/docs/build/html/api_http/put.html
113	if resp.StatusCode == http.StatusNoContent {
114		return nil
115	}
116
117	// API returns status code 400 on error, encoding error details in the
118	// response content in JSON.
119	buf, err = ioutil.ReadAll(resp.Body)
120	if err != nil {
121		return err
122	}
123
124	var r map[string]int
125	if err := json.Unmarshal(buf, &r); err != nil {
126		return err
127	}
128	return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
129}
130
131// Name identifies the client as an OpenTSDB client.
132func (c Client) Name() string {
133	return "opentsdb"
134}
135