1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package influxdb
15
16import (
17	"encoding/json"
18	"fmt"
19	"math"
20	"os"
21	"strings"
22
23	"github.com/go-kit/kit/log"
24	"github.com/go-kit/kit/log/level"
25	"github.com/prometheus/client_golang/prometheus"
26	"github.com/prometheus/common/model"
27	"github.com/prometheus/prometheus/prompb"
28
29	influx "github.com/influxdata/influxdb/client/v2"
30)
31
32// Client allows sending batches of Prometheus samples to InfluxDB.
33type Client struct {
34	logger log.Logger
35
36	client          influx.Client
37	database        string
38	retentionPolicy string
39	ignoredSamples  prometheus.Counter
40}
41
42// NewClient creates a new Client.
43func NewClient(logger log.Logger, conf influx.HTTPConfig, db string, rp string) *Client {
44	c, err := influx.NewHTTPClient(conf)
45	// Currently influx.NewClient() *should* never return an error.
46	if err != nil {
47		level.Error(logger).Log("err", err)
48		os.Exit(1)
49	}
50
51	if logger == nil {
52		logger = log.NewNopLogger()
53	}
54
55	return &Client{
56		logger:          logger,
57		client:          c,
58		database:        db,
59		retentionPolicy: rp,
60		ignoredSamples: prometheus.NewCounter(
61			prometheus.CounterOpts{
62				Name: "prometheus_influxdb_ignored_samples_total",
63				Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
64			},
65		),
66	}
67}
68
69// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
70func tagsFromMetric(m model.Metric) map[string]string {
71	tags := make(map[string]string, len(m)-1)
72	for l, v := range m {
73		if l != model.MetricNameLabel {
74			tags[string(l)] = string(v)
75		}
76	}
77	return tags
78}
79
80// Write sends a batch of samples to InfluxDB via its HTTP API.
81func (c *Client) Write(samples model.Samples) error {
82	points := make([]*influx.Point, 0, len(samples))
83	for _, s := range samples {
84		v := float64(s.Value)
85		if math.IsNaN(v) || math.IsInf(v, 0) {
86			level.Debug(c.logger).Log("msg", "cannot send  to InfluxDB, skipping sample", "value", v, "sample", s)
87			c.ignoredSamples.Inc()
88			continue
89		}
90		p, err := influx.NewPoint(
91			string(s.Metric[model.MetricNameLabel]),
92			tagsFromMetric(s.Metric),
93			map[string]interface{}{"value": v},
94			s.Timestamp.Time(),
95		)
96		if err != nil {
97			return err
98		}
99		points = append(points, p)
100	}
101
102	bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
103		Precision:       "ms",
104		Database:        c.database,
105		RetentionPolicy: c.retentionPolicy,
106	})
107	if err != nil {
108		return err
109	}
110	bps.AddPoints(points)
111	return c.client.Write(bps)
112}
113
114func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
115	labelsToSeries := map[string]*prompb.TimeSeries{}
116	for _, q := range req.Queries {
117		command, err := c.buildCommand(q)
118		if err != nil {
119			return nil, err
120		}
121
122		query := influx.NewQuery(command, c.database, "ms")
123		resp, err := c.client.Query(query)
124		if err != nil {
125			return nil, err
126		}
127		if resp.Err != "" {
128			return nil, fmt.Errorf(resp.Err)
129		}
130
131		if err = mergeResult(labelsToSeries, resp.Results); err != nil {
132			return nil, err
133		}
134	}
135
136	resp := prompb.ReadResponse{
137		Results: []*prompb.QueryResult{
138			{Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))},
139		},
140	}
141	for _, ts := range labelsToSeries {
142		resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
143	}
144	return &resp, nil
145}
146
147func (c *Client) buildCommand(q *prompb.Query) (string, error) {
148	matchers := make([]string, 0, len(q.Matchers))
149	// If we don't find a metric name matcher, query all metrics
150	// (InfluxDB measurements) by default.
151	from := "FROM /.+/"
152	for _, m := range q.Matchers {
153		if m.Name == model.MetricNameLabel {
154			switch m.Type {
155			case prompb.LabelMatcher_EQ:
156				from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value)
157			case prompb.LabelMatcher_RE:
158				from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value))
159			default:
160				// TODO: Figure out how to support these efficiently.
161				return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet")
162			}
163			continue
164		}
165
166		switch m.Type {
167		case prompb.LabelMatcher_EQ:
168			matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
169		case prompb.LabelMatcher_NEQ:
170			matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
171		case prompb.LabelMatcher_RE:
172			matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
173		case prompb.LabelMatcher_NRE:
174			matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
175		default:
176			return "", fmt.Errorf("unknown match type %v", m.Type)
177		}
178	}
179	matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
180	matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
181
182	return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
183}
184
185func escapeSingleQuotes(str string) string {
186	return strings.Replace(str, `'`, `\'`, -1)
187}
188
189func escapeSlashes(str string) string {
190	return strings.Replace(str, `/`, `\/`, -1)
191}
192
193func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error {
194	for _, r := range results {
195		for _, s := range r.Series {
196			k := concatLabels(s.Tags)
197			ts, ok := labelsToSeries[k]
198			if !ok {
199				ts = &prompb.TimeSeries{
200					Labels: tagsToLabelPairs(s.Name, s.Tags),
201				}
202				labelsToSeries[k] = ts
203			}
204
205			samples, err := valuesToSamples(s.Values)
206			if err != nil {
207				return err
208			}
209
210			ts.Samples = mergeSamples(ts.Samples, samples)
211		}
212	}
213	return nil
214}
215
216func concatLabels(labels map[string]string) string {
217	// 0xff cannot cannot occur in valid UTF-8 sequences, so use it
218	// as a separator here.
219	separator := "\xff"
220	pairs := make([]string, 0, len(labels))
221	for k, v := range labels {
222		pairs = append(pairs, k+separator+v)
223	}
224	return strings.Join(pairs, separator)
225}
226
227func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label {
228	pairs := make([]*prompb.Label, 0, len(tags))
229	for k, v := range tags {
230		if v == "" {
231			// If we select metrics with different sets of labels names,
232			// InfluxDB returns *all* possible tag names on all returned
233			// series, with empty tag values on series where they don't
234			// apply. In Prometheus, an empty label value is equivalent
235			// to a non-existent label, so we just skip empty ones here
236			// to make the result correct.
237			continue
238		}
239		pairs = append(pairs, &prompb.Label{
240			Name:  k,
241			Value: v,
242		})
243	}
244	pairs = append(pairs, &prompb.Label{
245		Name:  model.MetricNameLabel,
246		Value: name,
247	})
248	return pairs
249}
250
251func valuesToSamples(values [][]interface{}) ([]prompb.Sample, error) {
252	samples := make([]prompb.Sample, 0, len(values))
253	for _, v := range values {
254		if len(v) != 2 {
255			return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
256		}
257
258		jsonTimestamp, ok := v[0].(json.Number)
259		if !ok {
260			return nil, fmt.Errorf("bad timestamp: %v", v[0])
261		}
262
263		jsonValue, ok := v[1].(json.Number)
264		if !ok {
265			return nil, fmt.Errorf("bad sample value: %v", v[1])
266		}
267
268		timestamp, err := jsonTimestamp.Int64()
269		if err != nil {
270			return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err)
271		}
272
273		value, err := jsonValue.Float64()
274		if err != nil {
275			return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
276		}
277
278		samples = append(samples, prompb.Sample{
279			Timestamp: timestamp,
280			Value:     value,
281		})
282	}
283	return samples, nil
284}
285
286// mergeSamples merges two lists of sample pairs and removes duplicate
287// timestamps. It assumes that both lists are sorted by timestamp.
288func mergeSamples(a, b []prompb.Sample) []prompb.Sample {
289	result := make([]prompb.Sample, 0, len(a)+len(b))
290	i, j := 0, 0
291	for i < len(a) && j < len(b) {
292		if a[i].Timestamp < b[j].Timestamp {
293			result = append(result, a[i])
294			i++
295		} else if a[i].Timestamp > b[j].Timestamp {
296			result = append(result, b[j])
297			j++
298		} else {
299			result = append(result, a[i])
300			i++
301			j++
302		}
303	}
304	result = append(result, a[i:]...)
305	result = append(result, b[j:]...)
306	return result
307}
308
309// Name identifies the client as an InfluxDB client.
310func (c Client) Name() string {
311	return "influxdb"
312}
313
314// Describe implements prometheus.Collector.
315func (c *Client) Describe(ch chan<- *prometheus.Desc) {
316	ch <- c.ignoredSamples.Desc()
317}
318
319// Collect implements prometheus.Collector.
320func (c *Client) Collect(ch chan<- prometheus.Metric) {
321	ch <- c.ignoredSamples
322}
323