1// Copyright 2015 The Prometheus Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14package influxdb
15
16import (
17	"encoding/json"
18	"fmt"
19	"math"
20	"strings"
21
22	"github.com/prometheus/client_golang/prometheus"
23	"github.com/prometheus/common/log"
24	"github.com/prometheus/common/model"
25	"github.com/prometheus/prometheus/storage/remote"
26
27	influx "github.com/influxdata/influxdb/client/v2"
28)
29
30// Client allows sending batches of Prometheus samples to InfluxDB.
31type Client struct {
32	client          influx.Client
33	database        string
34	retentionPolicy string
35	ignoredSamples  prometheus.Counter
36}
37
38// NewClient creates a new Client.
39func NewClient(conf influx.HTTPConfig, db string, rp string) *Client {
40	c, err := influx.NewHTTPClient(conf)
41	// Currently influx.NewClient() *should* never return an error.
42	if err != nil {
43		log.Fatal(err)
44	}
45
46	return &Client{
47		client:          c,
48		database:        db,
49		retentionPolicy: rp,
50		ignoredSamples: prometheus.NewCounter(
51			prometheus.CounterOpts{
52				Name: "prometheus_influxdb_ignored_samples_total",
53				Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).",
54			},
55		),
56	}
57}
58
59// tagsFromMetric extracts InfluxDB tags from a Prometheus metric.
60func tagsFromMetric(m model.Metric) map[string]string {
61	tags := make(map[string]string, len(m)-1)
62	for l, v := range m {
63		if l != model.MetricNameLabel {
64			tags[string(l)] = string(v)
65		}
66	}
67	return tags
68}
69
70// Write sends a batch of samples to InfluxDB via its HTTP API.
71func (c *Client) Write(samples model.Samples) error {
72	points := make([]*influx.Point, 0, len(samples))
73	for _, s := range samples {
74		v := float64(s.Value)
75		if math.IsNaN(v) || math.IsInf(v, 0) {
76			log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s)
77			c.ignoredSamples.Inc()
78			continue
79		}
80		p, err := influx.NewPoint(
81			string(s.Metric[model.MetricNameLabel]),
82			tagsFromMetric(s.Metric),
83			map[string]interface{}{"value": v},
84			s.Timestamp.Time(),
85		)
86		if err != nil {
87			return err
88		}
89		points = append(points, p)
90	}
91
92	bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{
93		Precision:       "ms",
94		Database:        c.database,
95		RetentionPolicy: c.retentionPolicy,
96	})
97	if err != nil {
98		return err
99	}
100	bps.AddPoints(points)
101	return c.client.Write(bps)
102}
103
104func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) {
105	labelsToSeries := map[string]*remote.TimeSeries{}
106	for _, q := range req.Queries {
107		command, err := c.buildCommand(q)
108		if err != nil {
109			return nil, err
110		}
111
112		query := influx.NewQuery(command, c.database, "ms")
113		resp, err := c.client.Query(query)
114		if err != nil {
115			return nil, err
116		}
117		if resp.Err != "" {
118			return nil, fmt.Errorf(resp.Err)
119		}
120
121		if err = mergeResult(labelsToSeries, resp.Results); err != nil {
122			return nil, err
123		}
124	}
125
126	resp := remote.ReadResponse{
127		Results: []*remote.QueryResult{
128			{Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries))},
129		},
130	}
131	for _, ts := range labelsToSeries {
132		resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts)
133	}
134	return &resp, nil
135}
136
137func (c *Client) buildCommand(q *remote.Query) (string, error) {
138	matchers := make([]string, 0, len(q.Matchers))
139	// If we don't find a metric name matcher, query all metrics
140	// (InfluxDB measurements) by default.
141	from := "FROM /.+/"
142	for _, m := range q.Matchers {
143		if m.Name == model.MetricNameLabel {
144			switch m.Type {
145			case remote.MatchType_EQUAL:
146				from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value)
147			case remote.MatchType_REGEX_MATCH:
148				from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value))
149			default:
150				// TODO: Figure out how to support these efficiently.
151				return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet")
152			}
153			continue
154		}
155
156		switch m.Type {
157		case remote.MatchType_EQUAL:
158			matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value)))
159		case remote.MatchType_NOT_EQUAL:
160			matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value)))
161		case remote.MatchType_REGEX_MATCH:
162			matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value)))
163		case remote.MatchType_REGEX_NO_MATCH:
164			matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value)))
165		default:
166			return "", fmt.Errorf("unknown match type %v", m.Type)
167		}
168	}
169	matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs))
170	matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs))
171
172	return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil
173}
174
175func escapeSingleQuotes(str string) string {
176	return strings.Replace(str, `'`, `\'`, -1)
177}
178
179func escapeSlashes(str string) string {
180	return strings.Replace(str, `/`, `\/`, -1)
181}
182
183func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error {
184	for _, r := range results {
185		for _, s := range r.Series {
186			k := concatLabels(s.Tags)
187			ts, ok := labelsToSeries[k]
188			if !ok {
189				ts = &remote.TimeSeries{
190					Labels: tagsToLabelPairs(s.Name, s.Tags),
191				}
192				labelsToSeries[k] = ts
193			}
194
195			samples, err := valuesToSamples(s.Values)
196			if err != nil {
197				return err
198			}
199
200			ts.Samples = mergeSamples(ts.Samples, samples)
201		}
202	}
203	return nil
204}
205
206func concatLabels(labels map[string]string) string {
207	// 0xff cannot cannot occur in valid UTF-8 sequences, so use it
208	// as a separator here.
209	separator := "\xff"
210	pairs := make([]string, 0, len(labels))
211	for k, v := range labels {
212		pairs = append(pairs, k+separator+v)
213	}
214	return strings.Join(pairs, separator)
215}
216
217func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair {
218	pairs := make([]*remote.LabelPair, 0, len(tags))
219	for k, v := range tags {
220		if v == "" {
221			// If we select metrics with different sets of labels names,
222			// InfluxDB returns *all* possible tag names on all returned
223			// series, with empty tag values on series where they don't
224			// apply. In Prometheus, an empty label value is equivalent
225			// to a non-existent label, so we just skip empty ones here
226			// to make the result correct.
227			continue
228		}
229		pairs = append(pairs, &remote.LabelPair{
230			Name:  k,
231			Value: v,
232		})
233	}
234	pairs = append(pairs, &remote.LabelPair{
235		Name:  model.MetricNameLabel,
236		Value: name,
237	})
238	return pairs
239}
240
241func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) {
242	samples := make([]*remote.Sample, 0, len(values))
243	for _, v := range values {
244		if len(v) != 2 {
245			return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v)
246		}
247
248		jsonTimestamp, ok := v[0].(json.Number)
249		if !ok {
250			return nil, fmt.Errorf("bad timestamp: %v", v[0])
251		}
252
253		jsonValue, ok := v[1].(json.Number)
254		if !ok {
255			return nil, fmt.Errorf("bad sample value: %v", v[1])
256		}
257
258		timestamp, err := jsonTimestamp.Int64()
259		if err != nil {
260			return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err)
261		}
262
263		value, err := jsonValue.Float64()
264		if err != nil {
265			return nil, fmt.Errorf("unable to convert sample value to float64: %v", err)
266		}
267
268		samples = append(samples, &remote.Sample{
269			TimestampMs: timestamp,
270			Value:       value,
271		})
272	}
273	return samples, nil
274}
275
276// mergeSamples merges two lists of sample pairs and removes duplicate
277// timestamps. It assumes that both lists are sorted by timestamp.
278func mergeSamples(a, b []*remote.Sample) []*remote.Sample {
279	result := make([]*remote.Sample, 0, len(a)+len(b))
280	i, j := 0, 0
281	for i < len(a) && j < len(b) {
282		if a[i].TimestampMs < b[j].TimestampMs {
283			result = append(result, a[i])
284			i++
285		} else if a[i].TimestampMs > b[j].TimestampMs {
286			result = append(result, b[j])
287			j++
288		} else {
289			result = append(result, a[i])
290			i++
291			j++
292		}
293	}
294	result = append(result, a[i:]...)
295	result = append(result, b[j:]...)
296	return result
297}
298
299// Name identifies the client as an InfluxDB client.
300func (c Client) Name() string {
301	return "influxdb"
302}
303
304// Describe implements prometheus.Collector.
305func (c *Client) Describe(ch chan<- *prometheus.Desc) {
306	ch <- c.ignoredSamples.Desc()
307}
308
309// Collect implements prometheus.Collector.
310func (c *Client) Collect(ch chan<- prometheus.Metric) {
311	ch <- c.ignoredSamples
312}
313