1// Copyright 2013 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package opentsdb 15 16import ( 17 "bytes" 18 "encoding/json" 19 "fmt" 20 "io/ioutil" 21 "math" 22 "net/http" 23 "net/url" 24 "time" 25 26 "github.com/prometheus/common/log" 27 "github.com/prometheus/common/model" 28 "golang.org/x/net/context" 29 "golang.org/x/net/context/ctxhttp" 30) 31 32const ( 33 putEndpoint = "/api/put" 34 contentTypeJSON = "application/json" 35) 36 37// Client allows sending batches of Prometheus samples to OpenTSDB. 38type Client struct { 39 url string 40 timeout time.Duration 41} 42 43// NewClient creates a new Client. 44func NewClient(url string, timeout time.Duration) *Client { 45 return &Client{ 46 url: url, 47 timeout: timeout, 48 } 49} 50 51// StoreSamplesRequest is used for building a JSON request for storing samples 52// via the OpenTSDB. 53type StoreSamplesRequest struct { 54 Metric TagValue `json:"metric"` 55 Timestamp int64 `json:"timestamp"` 56 Value float64 `json:"value"` 57 Tags map[string]TagValue `json:"tags"` 58} 59 60// tagsFromMetric translates Prometheus metric into OpenTSDB tags. 61func tagsFromMetric(m model.Metric) map[string]TagValue { 62 tags := make(map[string]TagValue, len(m)-1) 63 for l, v := range m { 64 if l == model.MetricNameLabel { 65 continue 66 } 67 tags[string(l)] = TagValue(v) 68 } 69 return tags 70} 71 72// Write sends a batch of samples to OpenTSDB via its HTTP API. 73func (c *Client) Write(samples model.Samples) error { 74 reqs := make([]StoreSamplesRequest, 0, len(samples)) 75 for _, s := range samples { 76 v := float64(s.Value) 77 if math.IsNaN(v) || math.IsInf(v, 0) { 78 log.Debugf("cannot send value %f to OpenTSDB, skipping sample %#v", v, s) 79 continue 80 } 81 metric := TagValue(s.Metric[model.MetricNameLabel]) 82 reqs = append(reqs, StoreSamplesRequest{ 83 Metric: metric, 84 Timestamp: s.Timestamp.Unix(), 85 Value: v, 86 Tags: tagsFromMetric(s.Metric), 87 }) 88 } 89 90 u, err := url.Parse(c.url) 91 if err != nil { 92 return err 93 } 94 95 u.Path = putEndpoint 96 97 buf, err := json.Marshal(reqs) 98 if err != nil { 99 return err 100 } 101 102 ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 103 defer cancel() 104 105 resp, err := ctxhttp.Post(ctx, http.DefaultClient, u.String(), contentTypeJSON, bytes.NewBuffer(buf)) 106 if err != nil { 107 return err 108 } 109 defer resp.Body.Close() 110 111 // API returns status code 204 for successful writes. 112 // http://opentsdb.net/docs/build/html/api_http/put.html 113 if resp.StatusCode == http.StatusNoContent { 114 return nil 115 } 116 117 // API returns status code 400 on error, encoding error details in the 118 // response content in JSON. 119 buf, err = ioutil.ReadAll(resp.Body) 120 if err != nil { 121 return err 122 } 123 124 var r map[string]int 125 if err := json.Unmarshal(buf, &r); err != nil { 126 return err 127 } 128 return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) 129} 130 131// Name identifies the client as an OpenTSDB client. 132func (c Client) Name() string { 133 return "opentsdb" 134} 135