1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package influxdb 15 16import ( 17 "encoding/json" 18 "fmt" 19 "math" 20 "strings" 21 22 "github.com/prometheus/client_golang/prometheus" 23 "github.com/prometheus/common/log" 24 "github.com/prometheus/common/model" 25 "github.com/prometheus/prometheus/storage/remote" 26 27 influx "github.com/influxdata/influxdb/client/v2" 28) 29 30// Client allows sending batches of Prometheus samples to InfluxDB. 31type Client struct { 32 client influx.Client 33 database string 34 retentionPolicy string 35 ignoredSamples prometheus.Counter 36} 37 38// NewClient creates a new Client. 39func NewClient(conf influx.HTTPConfig, db string, rp string) *Client { 40 c, err := influx.NewHTTPClient(conf) 41 // Currently influx.NewClient() *should* never return an error. 42 if err != nil { 43 log.Fatal(err) 44 } 45 46 return &Client{ 47 client: c, 48 database: db, 49 retentionPolicy: rp, 50 ignoredSamples: prometheus.NewCounter( 51 prometheus.CounterOpts{ 52 Name: "prometheus_influxdb_ignored_samples_total", 53 Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).", 54 }, 55 ), 56 } 57} 58 59// tagsFromMetric extracts InfluxDB tags from a Prometheus metric. 60func tagsFromMetric(m model.Metric) map[string]string { 61 tags := make(map[string]string, len(m)-1) 62 for l, v := range m { 63 if l != model.MetricNameLabel { 64 tags[string(l)] = string(v) 65 } 66 } 67 return tags 68} 69 70// Write sends a batch of samples to InfluxDB via its HTTP API. 71func (c *Client) Write(samples model.Samples) error { 72 points := make([]*influx.Point, 0, len(samples)) 73 for _, s := range samples { 74 v := float64(s.Value) 75 if math.IsNaN(v) || math.IsInf(v, 0) { 76 log.Debugf("cannot send value %f to InfluxDB, skipping sample %#v", v, s) 77 c.ignoredSamples.Inc() 78 continue 79 } 80 p, err := influx.NewPoint( 81 string(s.Metric[model.MetricNameLabel]), 82 tagsFromMetric(s.Metric), 83 map[string]interface{}{"value": v}, 84 s.Timestamp.Time(), 85 ) 86 if err != nil { 87 return err 88 } 89 points = append(points, p) 90 } 91 92 bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{ 93 Precision: "ms", 94 Database: c.database, 95 RetentionPolicy: c.retentionPolicy, 96 }) 97 if err != nil { 98 return err 99 } 100 bps.AddPoints(points) 101 return c.client.Write(bps) 102} 103 104func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { 105 labelsToSeries := map[string]*remote.TimeSeries{} 106 for _, q := range req.Queries { 107 command, err := c.buildCommand(q) 108 if err != nil { 109 return nil, err 110 } 111 112 query := influx.NewQuery(command, c.database, "ms") 113 resp, err := c.client.Query(query) 114 if err != nil { 115 return nil, err 116 } 117 if resp.Err != "" { 118 return nil, fmt.Errorf(resp.Err) 119 } 120 121 if err = mergeResult(labelsToSeries, resp.Results); err != nil { 122 return nil, err 123 } 124 } 125 126 resp := remote.ReadResponse{ 127 Results: []*remote.QueryResult{ 128 {Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries))}, 129 }, 130 } 131 for _, ts := range labelsToSeries { 132 resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) 133 } 134 return &resp, nil 135} 136 137func (c *Client) buildCommand(q *remote.Query) (string, error) { 138 matchers := make([]string, 0, len(q.Matchers)) 139 // If we don't find a metric name matcher, query all metrics 140 // (InfluxDB measurements) by default. 141 from := "FROM /.+/" 142 for _, m := range q.Matchers { 143 if m.Name == model.MetricNameLabel { 144 switch m.Type { 145 case remote.MatchType_EQUAL: 146 from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value) 147 case remote.MatchType_REGEX_MATCH: 148 from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value)) 149 default: 150 // TODO: Figure out how to support these efficiently. 151 return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet") 152 } 153 continue 154 } 155 156 switch m.Type { 157 case remote.MatchType_EQUAL: 158 matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) 159 case remote.MatchType_NOT_EQUAL: 160 matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) 161 case remote.MatchType_REGEX_MATCH: 162 matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) 163 case remote.MatchType_REGEX_NO_MATCH: 164 matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) 165 default: 166 return "", fmt.Errorf("unknown match type %v", m.Type) 167 } 168 } 169 matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs)) 170 matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs)) 171 172 return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil 173} 174 175func escapeSingleQuotes(str string) string { 176 return strings.Replace(str, `'`, `\'`, -1) 177} 178 179func escapeSlashes(str string) string { 180 return strings.Replace(str, `/`, `\/`, -1) 181} 182 183func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error { 184 for _, r := range results { 185 for _, s := range r.Series { 186 k := concatLabels(s.Tags) 187 ts, ok := labelsToSeries[k] 188 if !ok { 189 ts = &remote.TimeSeries{ 190 Labels: tagsToLabelPairs(s.Name, s.Tags), 191 } 192 labelsToSeries[k] = ts 193 } 194 195 samples, err := valuesToSamples(s.Values) 196 if err != nil { 197 return err 198 } 199 200 ts.Samples = mergeSamples(ts.Samples, samples) 201 } 202 } 203 return nil 204} 205 206func concatLabels(labels map[string]string) string { 207 // 0xff cannot cannot occur in valid UTF-8 sequences, so use it 208 // as a separator here. 209 separator := "\xff" 210 pairs := make([]string, 0, len(labels)) 211 for k, v := range labels { 212 pairs = append(pairs, k+separator+v) 213 } 214 return strings.Join(pairs, separator) 215} 216 217func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { 218 pairs := make([]*remote.LabelPair, 0, len(tags)) 219 for k, v := range tags { 220 if v == "" { 221 // If we select metrics with different sets of labels names, 222 // InfluxDB returns *all* possible tag names on all returned 223 // series, with empty tag values on series where they don't 224 // apply. In Prometheus, an empty label value is equivalent 225 // to a non-existent label, so we just skip empty ones here 226 // to make the result correct. 227 continue 228 } 229 pairs = append(pairs, &remote.LabelPair{ 230 Name: k, 231 Value: v, 232 }) 233 } 234 pairs = append(pairs, &remote.LabelPair{ 235 Name: model.MetricNameLabel, 236 Value: name, 237 }) 238 return pairs 239} 240 241func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { 242 samples := make([]*remote.Sample, 0, len(values)) 243 for _, v := range values { 244 if len(v) != 2 { 245 return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v) 246 } 247 248 jsonTimestamp, ok := v[0].(json.Number) 249 if !ok { 250 return nil, fmt.Errorf("bad timestamp: %v", v[0]) 251 } 252 253 jsonValue, ok := v[1].(json.Number) 254 if !ok { 255 return nil, fmt.Errorf("bad sample value: %v", v[1]) 256 } 257 258 timestamp, err := jsonTimestamp.Int64() 259 if err != nil { 260 return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err) 261 } 262 263 value, err := jsonValue.Float64() 264 if err != nil { 265 return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) 266 } 267 268 samples = append(samples, &remote.Sample{ 269 TimestampMs: timestamp, 270 Value: value, 271 }) 272 } 273 return samples, nil 274} 275 276// mergeSamples merges two lists of sample pairs and removes duplicate 277// timestamps. It assumes that both lists are sorted by timestamp. 278func mergeSamples(a, b []*remote.Sample) []*remote.Sample { 279 result := make([]*remote.Sample, 0, len(a)+len(b)) 280 i, j := 0, 0 281 for i < len(a) && j < len(b) { 282 if a[i].TimestampMs < b[j].TimestampMs { 283 result = append(result, a[i]) 284 i++ 285 } else if a[i].TimestampMs > b[j].TimestampMs { 286 result = append(result, b[j]) 287 j++ 288 } else { 289 result = append(result, a[i]) 290 i++ 291 j++ 292 } 293 } 294 result = append(result, a[i:]...) 295 result = append(result, b[j:]...) 296 return result 297} 298 299// Name identifies the client as an InfluxDB client. 300func (c Client) Name() string { 301 return "influxdb" 302} 303 304// Describe implements prometheus.Collector. 305func (c *Client) Describe(ch chan<- *prometheus.Desc) { 306 ch <- c.ignoredSamples.Desc() 307} 308 309// Collect implements prometheus.Collector. 310func (c *Client) Collect(ch chan<- prometheus.Metric) { 311 ch <- c.ignoredSamples 312} 313