1// Copyright 2013 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package opentsdb
15
16import (
17	"bytes"
18	"context"
19	"encoding/json"
20	"io"
21	"io/ioutil"
22	"math"
23	"net/http"
24	"net/url"
25	"time"
26
27	"github.com/go-kit/kit/log"
28	"github.com/go-kit/kit/log/level"
29	"github.com/pkg/errors"
30	"github.com/prometheus/common/model"
31)
32
33const (
34	putEndpoint     = "/api/put"
35	contentTypeJSON = "application/json"
36)
37
38// Client allows sending batches of Prometheus samples to OpenTSDB.
39type Client struct {
40	logger log.Logger
41
42	url     string
43	timeout time.Duration
44}
45
46// NewClient creates a new Client.
47func NewClient(logger log.Logger, url string, timeout time.Duration) *Client {
48	return &Client{
49		logger:  logger,
50		url:     url,
51		timeout: timeout,
52	}
53}
54
55// StoreSamplesRequest is used for building a JSON request for storing samples
56// via the OpenTSDB.
57type StoreSamplesRequest struct {
58	Metric    TagValue            `json:"metric"`
59	Timestamp int64               `json:"timestamp"`
60	Value     float64             `json:"value"`
61	Tags      map[string]TagValue `json:"tags"`
62}
63
64// tagsFromMetric translates Prometheus metric into OpenTSDB tags.
65func tagsFromMetric(m model.Metric) map[string]TagValue {
66	tags := make(map[string]TagValue, len(m)-1)
67	for l, v := range m {
68		if l == model.MetricNameLabel {
69			continue
70		}
71		tags[string(l)] = TagValue(v)
72	}
73	return tags
74}
75
76// Write sends a batch of samples to OpenTSDB via its HTTP API.
77func (c *Client) Write(samples model.Samples) error {
78	reqs := make([]StoreSamplesRequest, 0, len(samples))
79	for _, s := range samples {
80		v := float64(s.Value)
81		if math.IsNaN(v) || math.IsInf(v, 0) {
82			level.Debug(c.logger).Log("msg", "cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s)
83			continue
84		}
85		metric := TagValue(s.Metric[model.MetricNameLabel])
86		reqs = append(reqs, StoreSamplesRequest{
87			Metric:    metric,
88			Timestamp: s.Timestamp.Unix(),
89			Value:     v,
90			Tags:      tagsFromMetric(s.Metric),
91		})
92	}
93
94	u, err := url.Parse(c.url)
95	if err != nil {
96		return err
97	}
98
99	u.Path = putEndpoint
100
101	buf, err := json.Marshal(reqs)
102	if err != nil {
103		return err
104	}
105
106	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
107	defer cancel()
108
109	req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(buf))
110	if err != nil {
111		return err
112	}
113	req.Header.Set("Content-Type", contentTypeJSON)
114	resp, err := http.DefaultClient.Do(req.WithContext(ctx))
115	if err != nil {
116		return err
117	}
118	defer func() {
119		io.Copy(ioutil.Discard, resp.Body)
120		resp.Body.Close()
121	}()
122
123	// API returns status code 204 for successful writes.
124	// http://opentsdb.net/docs/build/html/api_http/put.html
125	if resp.StatusCode == http.StatusNoContent {
126		return nil
127	}
128
129	// API returns status code 400 on error, encoding error details in the
130	// response content in JSON.
131	buf, err = ioutil.ReadAll(resp.Body)
132	if err != nil {
133		return err
134	}
135
136	var r map[string]int
137	if err := json.Unmarshal(buf, &r); err != nil {
138		return err
139	}
140	return errors.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
141}
142
143// Name identifies the client as an OpenTSDB client.
144func (c Client) Name() string {
145	return "opentsdb"
146}
147