1// Copyright 2013 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package opentsdb 15 16import ( 17 "bytes" 18 "context" 19 "encoding/json" 20 "io" 21 "io/ioutil" 22 "math" 23 "net/http" 24 "net/url" 25 "time" 26 27 "github.com/go-kit/kit/log" 28 "github.com/go-kit/kit/log/level" 29 "github.com/pkg/errors" 30 "github.com/prometheus/common/model" 31) 32 33const ( 34 putEndpoint = "/api/put" 35 contentTypeJSON = "application/json" 36) 37 38// Client allows sending batches of Prometheus samples to OpenTSDB. 39type Client struct { 40 logger log.Logger 41 42 url string 43 timeout time.Duration 44} 45 46// NewClient creates a new Client. 47func NewClient(logger log.Logger, url string, timeout time.Duration) *Client { 48 return &Client{ 49 logger: logger, 50 url: url, 51 timeout: timeout, 52 } 53} 54 55// StoreSamplesRequest is used for building a JSON request for storing samples 56// via the OpenTSDB. 57type StoreSamplesRequest struct { 58 Metric TagValue `json:"metric"` 59 Timestamp int64 `json:"timestamp"` 60 Value float64 `json:"value"` 61 Tags map[string]TagValue `json:"tags"` 62} 63 64// tagsFromMetric translates Prometheus metric into OpenTSDB tags. 65func tagsFromMetric(m model.Metric) map[string]TagValue { 66 tags := make(map[string]TagValue, len(m)-1) 67 for l, v := range m { 68 if l == model.MetricNameLabel { 69 continue 70 } 71 tags[string(l)] = TagValue(v) 72 } 73 return tags 74} 75 76// Write sends a batch of samples to OpenTSDB via its HTTP API. 77func (c *Client) Write(samples model.Samples) error { 78 reqs := make([]StoreSamplesRequest, 0, len(samples)) 79 for _, s := range samples { 80 v := float64(s.Value) 81 if math.IsNaN(v) || math.IsInf(v, 0) { 82 level.Debug(c.logger).Log("msg", "cannot send value to OpenTSDB, skipping sample", "value", v, "sample", s) 83 continue 84 } 85 metric := TagValue(s.Metric[model.MetricNameLabel]) 86 reqs = append(reqs, StoreSamplesRequest{ 87 Metric: metric, 88 Timestamp: s.Timestamp.Unix(), 89 Value: v, 90 Tags: tagsFromMetric(s.Metric), 91 }) 92 } 93 94 u, err := url.Parse(c.url) 95 if err != nil { 96 return err 97 } 98 99 u.Path = putEndpoint 100 101 buf, err := json.Marshal(reqs) 102 if err != nil { 103 return err 104 } 105 106 ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 107 defer cancel() 108 109 req, err := http.NewRequest("POST", u.String(), bytes.NewBuffer(buf)) 110 if err != nil { 111 return err 112 } 113 req.Header.Set("Content-Type", contentTypeJSON) 114 resp, err := http.DefaultClient.Do(req.WithContext(ctx)) 115 if err != nil { 116 return err 117 } 118 defer func() { 119 io.Copy(ioutil.Discard, resp.Body) 120 resp.Body.Close() 121 }() 122 123 // API returns status code 204 for successful writes. 124 // http://opentsdb.net/docs/build/html/api_http/put.html 125 if resp.StatusCode == http.StatusNoContent { 126 return nil 127 } 128 129 // API returns status code 400 on error, encoding error details in the 130 // response content in JSON. 131 buf, err = ioutil.ReadAll(resp.Body) 132 if err != nil { 133 return err 134 } 135 136 var r map[string]int 137 if err := json.Unmarshal(buf, &r); err != nil { 138 return err 139 } 140 return errors.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) 141} 142 143// Name identifies the client as an OpenTSDB client. 144func (c Client) Name() string { 145 return "opentsdb" 146} 147