1// Copyright 2015 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package influxdb 15 16import ( 17 "encoding/json" 18 "fmt" 19 "math" 20 "os" 21 "strings" 22 23 "github.com/go-kit/kit/log" 24 "github.com/go-kit/kit/log/level" 25 "github.com/prometheus/client_golang/prometheus" 26 "github.com/prometheus/common/model" 27 "github.com/prometheus/prometheus/prompb" 28 29 influx "github.com/influxdata/influxdb/client/v2" 30) 31 32// Client allows sending batches of Prometheus samples to InfluxDB. 33type Client struct { 34 logger log.Logger 35 36 client influx.Client 37 database string 38 retentionPolicy string 39 ignoredSamples prometheus.Counter 40} 41 42// NewClient creates a new Client. 43func NewClient(logger log.Logger, conf influx.HTTPConfig, db string, rp string) *Client { 44 c, err := influx.NewHTTPClient(conf) 45 // Currently influx.NewClient() *should* never return an error. 46 if err != nil { 47 level.Error(logger).Log("err", err) 48 os.Exit(1) 49 } 50 51 if logger == nil { 52 logger = log.NewNopLogger() 53 } 54 55 return &Client{ 56 logger: logger, 57 client: c, 58 database: db, 59 retentionPolicy: rp, 60 ignoredSamples: prometheus.NewCounter( 61 prometheus.CounterOpts{ 62 Name: "prometheus_influxdb_ignored_samples_total", 63 Help: "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN).", 64 }, 65 ), 66 } 67} 68 69// tagsFromMetric extracts InfluxDB tags from a Prometheus metric. 70func tagsFromMetric(m model.Metric) map[string]string { 71 tags := make(map[string]string, len(m)-1) 72 for l, v := range m { 73 if l != model.MetricNameLabel { 74 tags[string(l)] = string(v) 75 } 76 } 77 return tags 78} 79 80// Write sends a batch of samples to InfluxDB via its HTTP API. 81func (c *Client) Write(samples model.Samples) error { 82 points := make([]*influx.Point, 0, len(samples)) 83 for _, s := range samples { 84 v := float64(s.Value) 85 if math.IsNaN(v) || math.IsInf(v, 0) { 86 level.Debug(c.logger).Log("msg", "cannot send to InfluxDB, skipping sample", "value", v, "sample", s) 87 c.ignoredSamples.Inc() 88 continue 89 } 90 p, err := influx.NewPoint( 91 string(s.Metric[model.MetricNameLabel]), 92 tagsFromMetric(s.Metric), 93 map[string]interface{}{"value": v}, 94 s.Timestamp.Time(), 95 ) 96 if err != nil { 97 return err 98 } 99 points = append(points, p) 100 } 101 102 bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{ 103 Precision: "ms", 104 Database: c.database, 105 RetentionPolicy: c.retentionPolicy, 106 }) 107 if err != nil { 108 return err 109 } 110 bps.AddPoints(points) 111 return c.client.Write(bps) 112} 113 114func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { 115 labelsToSeries := map[string]*prompb.TimeSeries{} 116 for _, q := range req.Queries { 117 command, err := c.buildCommand(q) 118 if err != nil { 119 return nil, err 120 } 121 122 query := influx.NewQuery(command, c.database, "ms") 123 resp, err := c.client.Query(query) 124 if err != nil { 125 return nil, err 126 } 127 if resp.Err != "" { 128 return nil, fmt.Errorf(resp.Err) 129 } 130 131 if err = mergeResult(labelsToSeries, resp.Results); err != nil { 132 return nil, err 133 } 134 } 135 136 resp := prompb.ReadResponse{ 137 Results: []*prompb.QueryResult{ 138 {Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))}, 139 }, 140 } 141 for _, ts := range labelsToSeries { 142 resp.Results[0].Timeseries = append(resp.Results[0].Timeseries, ts) 143 } 144 return &resp, nil 145} 146 147func (c *Client) buildCommand(q *prompb.Query) (string, error) { 148 matchers := make([]string, 0, len(q.Matchers)) 149 // If we don't find a metric name matcher, query all metrics 150 // (InfluxDB measurements) by default. 151 from := "FROM /.+/" 152 for _, m := range q.Matchers { 153 if m.Name == model.MetricNameLabel { 154 switch m.Type { 155 case prompb.LabelMatcher_EQ: 156 from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value) 157 case prompb.LabelMatcher_RE: 158 from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value)) 159 default: 160 // TODO: Figure out how to support these efficiently. 161 return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet") 162 } 163 continue 164 } 165 166 switch m.Type { 167 case prompb.LabelMatcher_EQ: 168 matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) 169 case prompb.LabelMatcher_NEQ: 170 matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) 171 case prompb.LabelMatcher_RE: 172 matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) 173 case prompb.LabelMatcher_NRE: 174 matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) 175 default: 176 return "", fmt.Errorf("unknown match type %v", m.Type) 177 } 178 } 179 matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs)) 180 matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs)) 181 182 return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil 183} 184 185func escapeSingleQuotes(str string) string { 186 return strings.Replace(str, `'`, `\'`, -1) 187} 188 189func escapeSlashes(str string) string { 190 return strings.Replace(str, `/`, `\/`, -1) 191} 192 193func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error { 194 for _, r := range results { 195 for _, s := range r.Series { 196 k := concatLabels(s.Tags) 197 ts, ok := labelsToSeries[k] 198 if !ok { 199 ts = &prompb.TimeSeries{ 200 Labels: tagsToLabelPairs(s.Name, s.Tags), 201 } 202 labelsToSeries[k] = ts 203 } 204 205 samples, err := valuesToSamples(s.Values) 206 if err != nil { 207 return err 208 } 209 210 ts.Samples = mergeSamples(ts.Samples, samples) 211 } 212 } 213 return nil 214} 215 216func concatLabels(labels map[string]string) string { 217 // 0xff cannot cannot occur in valid UTF-8 sequences, so use it 218 // as a separator here. 219 separator := "\xff" 220 pairs := make([]string, 0, len(labels)) 221 for k, v := range labels { 222 pairs = append(pairs, k+separator+v) 223 } 224 return strings.Join(pairs, separator) 225} 226 227func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label { 228 pairs := make([]*prompb.Label, 0, len(tags)) 229 for k, v := range tags { 230 if v == "" { 231 // If we select metrics with different sets of labels names, 232 // InfluxDB returns *all* possible tag names on all returned 233 // series, with empty tag values on series where they don't 234 // apply. In Prometheus, an empty label value is equivalent 235 // to a non-existent label, so we just skip empty ones here 236 // to make the result correct. 237 continue 238 } 239 pairs = append(pairs, &prompb.Label{ 240 Name: k, 241 Value: v, 242 }) 243 } 244 pairs = append(pairs, &prompb.Label{ 245 Name: model.MetricNameLabel, 246 Value: name, 247 }) 248 return pairs 249} 250 251func valuesToSamples(values [][]interface{}) ([]prompb.Sample, error) { 252 samples := make([]prompb.Sample, 0, len(values)) 253 for _, v := range values { 254 if len(v) != 2 { 255 return nil, fmt.Errorf("bad sample tuple length, expected [<timestamp>, <value>], got %v", v) 256 } 257 258 jsonTimestamp, ok := v[0].(json.Number) 259 if !ok { 260 return nil, fmt.Errorf("bad timestamp: %v", v[0]) 261 } 262 263 jsonValue, ok := v[1].(json.Number) 264 if !ok { 265 return nil, fmt.Errorf("bad sample value: %v", v[1]) 266 } 267 268 timestamp, err := jsonTimestamp.Int64() 269 if err != nil { 270 return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err) 271 } 272 273 value, err := jsonValue.Float64() 274 if err != nil { 275 return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) 276 } 277 278 samples = append(samples, prompb.Sample{ 279 Timestamp: timestamp, 280 Value: value, 281 }) 282 } 283 return samples, nil 284} 285 286// mergeSamples merges two lists of sample pairs and removes duplicate 287// timestamps. It assumes that both lists are sorted by timestamp. 288func mergeSamples(a, b []prompb.Sample) []prompb.Sample { 289 result := make([]prompb.Sample, 0, len(a)+len(b)) 290 i, j := 0, 0 291 for i < len(a) && j < len(b) { 292 if a[i].Timestamp < b[j].Timestamp { 293 result = append(result, a[i]) 294 i++ 295 } else if a[i].Timestamp > b[j].Timestamp { 296 result = append(result, b[j]) 297 j++ 298 } else { 299 result = append(result, a[i]) 300 i++ 301 j++ 302 } 303 } 304 result = append(result, a[i:]...) 305 result = append(result, b[j:]...) 306 return result 307} 308 309// Name identifies the client as an InfluxDB client. 310func (c Client) Name() string { 311 return "influxdb" 312} 313 314// Describe implements prometheus.Collector. 315func (c *Client) Describe(ch chan<- *prometheus.Desc) { 316 ch <- c.ignoredSamples.Desc() 317} 318 319// Collect implements prometheus.Collector. 320func (c *Client) Collect(ch chan<- prometheus.Metric) { 321 ch <- c.ignoredSamples 322} 323