1package prometheus 2 3import ( 4 "errors" 5 "fmt" 6 "math" 7 "time" 8 9 "github.com/gogo/protobuf/types" 10 "github.com/influxdata/influxdb/models" 11 "github.com/influxdata/influxdb/services/storage" 12 "github.com/influxdata/influxdb/storage/reads/datatypes" 13 dto "github.com/prometheus/client_model/go" 14 "github.com/prometheus/prometheus/prompb" 15) 16 17const ( 18 // measurementName is the default name used if no Prometheus name can be found on write 19 measurementName = "prom_metric_not_specified" 20 21 // fieldName is the field all prometheus values get written to 22 fieldName = "value" 23 24 // fieldTagKey is the tag key that all field names use in the new storage processor 25 fieldTagKey = "_field" 26 27 // prometheusNameTag is the tag key that Prometheus uses for metric names 28 prometheusNameTag = "__name__" 29 30 // measurementTagKey is the tag key that all measurement names use in the new storage processor 31 measurementTagKey = "_measurement" 32) 33 34// A DroppedValuesError is returned when the prometheus write request contains 35// unsupported float64 values. 36type DroppedValuesError struct { 37 nan uint64 38 ninf uint64 39 inf uint64 40} 41 42// Error returns a descriptive error of the values dropped. 43func (e DroppedValuesError) Error() string { 44 return fmt.Sprintf("dropped unsupported Prometheus values: [NaN = %d, +Inf = %d, -Inf = %d]", e.nan, e.inf, e.ninf) 45} 46 47// WriteRequestToPoints converts a Prometheus remote write request of time series and their 48// samples into Points that can be written into Influx 49func WriteRequestToPoints(req *prompb.WriteRequest) ([]models.Point, error) { 50 var maxPoints int 51 for _, ts := range req.Timeseries { 52 maxPoints += len(ts.Samples) 53 } 54 points := make([]models.Point, 0, maxPoints) 55 56 // Track any dropped values. 57 var nan, inf, ninf uint64 58 59 for _, ts := range req.Timeseries { 60 measurement := measurementName 61 62 tags := make(map[string]string, len(ts.Labels)) 63 for _, l := range ts.Labels { 64 tags[l.Name] = l.Value 65 if l.Name == prometheusNameTag { 66 measurement = l.Value 67 } 68 } 69 70 for _, s := range ts.Samples { 71 if v := s.Value; math.IsNaN(v) { 72 nan++ 73 continue 74 } else if math.IsInf(v, -1) { 75 ninf++ 76 continue 77 } else if math.IsInf(v, 1) { 78 inf++ 79 continue 80 } 81 82 // convert and append 83 t := time.Unix(0, s.Timestamp*int64(time.Millisecond)) 84 fields := map[string]interface{}{fieldName: s.Value} 85 p, err := models.NewPoint(measurement, models.NewTags(tags), fields, t) 86 if err != nil { 87 return nil, err 88 } 89 points = append(points, p) 90 } 91 } 92 93 if nan+inf+ninf > 0 { 94 return points, DroppedValuesError{nan: nan, inf: inf, ninf: ninf} 95 } 96 return points, nil 97} 98 99// ReadRequestToInfluxStorageRequest converts a Prometheus remote read request into one using the 100// new storage API that IFQL uses. 101func ReadRequestToInfluxStorageRequest(req *prompb.ReadRequest, db, rp string) (*datatypes.ReadFilterRequest, error) { 102 if len(req.Queries) != 1 { 103 return nil, errors.New("Prometheus read endpoint currently only supports one query at a time") 104 } 105 q := req.Queries[0] 106 107 src, err := types.MarshalAny(&storage.ReadSource{Database: db, RetentionPolicy: rp}) 108 if err != nil { 109 return nil, err 110 } 111 112 sreq := &datatypes.ReadFilterRequest{ 113 ReadSource: src, 114 Range: datatypes.TimestampRange{ 115 Start: time.Unix(0, q.StartTimestampMs*int64(time.Millisecond)).UnixNano(), 116 End: time.Unix(0, q.EndTimestampMs*int64(time.Millisecond)).UnixNano(), 117 }, 118 } 119 120 pred, err := predicateFromMatchers(q.Matchers) 121 if err != nil { 122 return nil, err 123 } 124 125 sreq.Predicate = pred 126 return sreq, nil 127} 128 129// RemoveInfluxSystemTags will remove tags that are Influx internal (_measurement and _field) 130func RemoveInfluxSystemTags(tags models.Tags) models.Tags { 131 var t models.Tags 132 for _, tt := range tags { 133 if string(tt.Key) == measurementTagKey || string(tt.Key) == fieldTagKey { 134 continue 135 } 136 t = append(t, tt) 137 } 138 139 return t 140} 141 142// predicateFromMatchers takes Prometheus label matchers and converts them to a storage 143// predicate that works with the schema that is written in, which assumes a single field 144// named value 145func predicateFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Predicate, error) { 146 left, err := nodeFromMatchers(matchers) 147 if err != nil { 148 return nil, err 149 } 150 right := fieldNode() 151 152 return &datatypes.Predicate{ 153 Root: &datatypes.Node{ 154 NodeType: datatypes.NodeTypeLogicalExpression, 155 Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd}, 156 Children: []*datatypes.Node{left, right}, 157 }, 158 }, nil 159} 160 161// fieldNode returns a datatypes.Node that will match that the fieldTagKey == fieldName 162// which matches how Prometheus data is fed into the system 163func fieldNode() *datatypes.Node { 164 children := []*datatypes.Node{ 165 &datatypes.Node{ 166 NodeType: datatypes.NodeTypeTagRef, 167 Value: &datatypes.Node_TagRefValue{ 168 TagRefValue: fieldTagKey, 169 }, 170 }, 171 &datatypes.Node{ 172 NodeType: datatypes.NodeTypeLiteral, 173 Value: &datatypes.Node_StringValue{ 174 StringValue: fieldName, 175 }, 176 }, 177 } 178 179 return &datatypes.Node{ 180 NodeType: datatypes.NodeTypeComparisonExpression, 181 Value: &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonEqual}, 182 Children: children, 183 } 184} 185 186func nodeFromMatchers(matchers []*prompb.LabelMatcher) (*datatypes.Node, error) { 187 if len(matchers) == 0 { 188 return nil, errors.New("expected matcher") 189 } else if len(matchers) == 1 { 190 return nodeFromMatcher(matchers[0]) 191 } 192 193 left, err := nodeFromMatcher(matchers[0]) 194 if err != nil { 195 return nil, err 196 } 197 198 right, err := nodeFromMatchers(matchers[1:]) 199 if err != nil { 200 return nil, err 201 } 202 203 children := []*datatypes.Node{left, right} 204 return &datatypes.Node{ 205 NodeType: datatypes.NodeTypeLogicalExpression, 206 Value: &datatypes.Node_Logical_{Logical: datatypes.LogicalAnd}, 207 Children: children, 208 }, nil 209} 210 211func nodeFromMatcher(m *prompb.LabelMatcher) (*datatypes.Node, error) { 212 var op datatypes.Node_Comparison 213 switch m.Type { 214 case prompb.LabelMatcher_EQ: 215 op = datatypes.ComparisonEqual 216 case prompb.LabelMatcher_NEQ: 217 op = datatypes.ComparisonNotEqual 218 case prompb.LabelMatcher_RE: 219 op = datatypes.ComparisonRegex 220 case prompb.LabelMatcher_NRE: 221 op = datatypes.ComparisonNotRegex 222 default: 223 return nil, fmt.Errorf("unknown match type %v", m.Type) 224 } 225 226 name := m.Name 227 if m.Name == prometheusNameTag { 228 name = measurementTagKey 229 } 230 231 left := &datatypes.Node{ 232 NodeType: datatypes.NodeTypeTagRef, 233 Value: &datatypes.Node_TagRefValue{ 234 TagRefValue: name, 235 }, 236 } 237 238 var right *datatypes.Node 239 240 if op == datatypes.ComparisonRegex || op == datatypes.ComparisonNotRegex { 241 right = &datatypes.Node{ 242 NodeType: datatypes.NodeTypeLiteral, 243 Value: &datatypes.Node_RegexValue{ 244 // To comply with PromQL, see 245 // https://github.com/prometheus/prometheus/blob/daf382e4a9f5ca380b2b662c8e60755a56675f14/pkg/labels/regexp.go#L30 246 RegexValue: "^(?:" + m.Value + ")$", 247 }, 248 } 249 } else { 250 right = &datatypes.Node{ 251 NodeType: datatypes.NodeTypeLiteral, 252 Value: &datatypes.Node_StringValue{ 253 StringValue: m.Value, 254 }, 255 } 256 } 257 258 children := []*datatypes.Node{left, right} 259 return &datatypes.Node{ 260 NodeType: datatypes.NodeTypeComparisonExpression, 261 Value: &datatypes.Node_Comparison_{Comparison: op}, 262 Children: children, 263 }, nil 264} 265 266// ModelTagsToLabelPairs converts models.Tags to a slice of Prometheus label pairs 267func ModelTagsToLabelPairs(tags models.Tags) []prompb.Label { 268 pairs := make([]prompb.Label, 0, len(tags)) 269 for _, t := range tags { 270 if string(t.Value) == "" { 271 continue 272 } 273 pairs = append(pairs, prompb.Label{ 274 Name: string(t.Key), 275 Value: string(t.Value), 276 }) 277 } 278 return pairs 279} 280 281// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs 282func TagsToLabelPairs(tags map[string]string) []*prompb.Label { 283 pairs := make([]*prompb.Label, 0, len(tags)) 284 for k, v := range tags { 285 if v == "" { 286 // If we select metrics with different sets of labels names, 287 // InfluxDB returns *all* possible tag names on all returned 288 // series, with empty tag values on series where they don't 289 // apply. In Prometheus, an empty label value is equivalent 290 // to a non-existent label, so we just skip empty ones here 291 // to make the result correct. 292 continue 293 } 294 pairs = append(pairs, &prompb.Label{ 295 Name: k, 296 Value: v, 297 }) 298 } 299 return pairs 300} 301 302// PrometheusToStatistics converts a prometheus metric family (from Registry.Gather) 303// to a []model.Statistics for /debug/vars . 304// This code is strongly inspired by the telegraf prometheus plugin. 305func PrometheusToStatistics(family []*dto.MetricFamily, tags map[string]string) []models.Statistic { 306 statistics := []models.Statistic{} 307 for _, mf := range family { 308 for _, m := range mf.Metric { 309 newTags := make(map[string]string, len(tags)+len(m.Label)) 310 for key, value := range tags { 311 newTags[key] = value 312 } 313 for _, lp := range m.Label { 314 newTags[lp.GetName()] = lp.GetValue() 315 } 316 317 // reading fields 318 var fields map[string]interface{} 319 if mf.GetType() == dto.MetricType_SUMMARY { 320 // summary metric 321 fields = makeQuantiles(m) 322 fields["count"] = float64(m.GetSummary().GetSampleCount()) 323 fields["sum"] = float64(m.GetSummary().GetSampleSum()) 324 } else if mf.GetType() == dto.MetricType_HISTOGRAM { 325 // histogram metric 326 fields = makeBuckets(m) 327 fields["count"] = float64(m.GetHistogram().GetSampleCount()) 328 fields["sum"] = float64(m.GetHistogram().GetSampleSum()) 329 } else { 330 // standard metric 331 fields = getNameAndValue(m) 332 } 333 334 statistics = append(statistics, models.Statistic{ 335 Name: *mf.Name, 336 Tags: tags, 337 Values: fields, 338 }) 339 } 340 } 341 return statistics 342} 343 344// Get Quantiles from summary metric 345func makeQuantiles(m *dto.Metric) map[string]interface{} { 346 fields := make(map[string]interface{}) 347 for _, q := range m.GetSummary().Quantile { 348 if !math.IsNaN(q.GetValue()) { 349 fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) 350 } 351 } 352 return fields 353} 354 355// Get Buckets from histogram metric 356func makeBuckets(m *dto.Metric) map[string]interface{} { 357 fields := make(map[string]interface{}) 358 for _, b := range m.GetHistogram().Bucket { 359 fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) 360 } 361 return fields 362} 363 364// Get name and value from metric 365func getNameAndValue(m *dto.Metric) map[string]interface{} { 366 fields := make(map[string]interface{}) 367 if m.Gauge != nil { 368 if !math.IsNaN(m.GetGauge().GetValue()) { 369 fields["gauge"] = float64(m.GetGauge().GetValue()) 370 } 371 } else if m.Counter != nil { 372 if !math.IsNaN(m.GetCounter().GetValue()) { 373 fields["counter"] = float64(m.GetCounter().GetValue()) 374 } 375 } else if m.Untyped != nil { 376 if !math.IsNaN(m.GetUntyped().GetValue()) { 377 fields["value"] = float64(m.GetUntyped().GetValue()) 378 } 379 } 380 return fields 381} 382