1// Copyright 2018, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package stackdriver 16 17/* 18The code in this file is responsible for converting OpenCensus Proto metrics 19directly to Stackdriver Metrics. 20*/ 21 22import ( 23 "context" 24 "errors" 25 "fmt" 26 "path" 27 "strings" 28 29 "go.opencensus.io/resource" 30 31 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" 32 metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" 33 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" 34 timestamppb "github.com/golang/protobuf/ptypes/timestamp" 35 distributionpb "google.golang.org/genproto/googleapis/api/distribution" 36 labelpb "google.golang.org/genproto/googleapis/api/label" 37 googlemetricpb "google.golang.org/genproto/googleapis/api/metric" 38 monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" 39 monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" 40) 41 42var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor") 43var percentileLabelKey = &metricspb.LabelKey{ 44 Key: "percentile", 45 Description: "the value at a given percentile of a distribution", 46} 47var globalResource = &resource.Resource{Type: "global"} 48var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"} 49 50// PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously, 51// without de-duping or adding proto metrics to the bundler. 52func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) { 53 if len(metrics) == 0 { 54 return 0, errNilMetricOrMetricDescriptor 55 } 56 57 // Caches the resources seen so far 58 seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) 59 60 mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout) 61 for _, metric := range metrics { 62 if len(metric.GetTimeseries()) == 0 { 63 // No TimeSeries to export, skip this metric. 64 continue 65 } 66 mappedRsc := se.getResource(rsc, metric, seenResources) 67 if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY { 68 summaryMtcs := se.convertSummaryMetrics(metric) 69 for _, summaryMtc := range summaryMtcs { 70 if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil { 71 mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err) 72 continue 73 } 74 se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb) 75 } 76 } else { 77 if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil { 78 mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err) 79 continue 80 } 81 se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb) 82 } 83 } 84 85 return mb.droppedTimeSeries, mb.close(ctx) 86} 87 88func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric { 89 var metrics []*metricspb.Metric 90 91 for _, ts := range summary.Timeseries { 92 var percentileTss []*metricspb.TimeSeries 93 var countTss []*metricspb.TimeSeries 94 var sumTss []*metricspb.TimeSeries 95 lvs := ts.GetLabelValues() 96 97 startTime := ts.StartTimestamp 98 for _, pt := range ts.GetPoints() { 99 ptTimestamp := pt.GetTimestamp() 100 summaryValue := pt.GetSummaryValue() 101 if summaryValue.Sum != nil { 102 sumTs := &metricspb.TimeSeries{ 103 LabelValues: lvs, 104 StartTimestamp: startTime, 105 Points: []*metricspb.Point{ 106 { 107 Value: &metricspb.Point_DoubleValue{ 108 DoubleValue: summaryValue.Sum.Value, 109 }, 110 Timestamp: ptTimestamp, 111 }, 112 }, 113 } 114 sumTss = append(sumTss, sumTs) 115 } 116 117 if summaryValue.Count != nil { 118 countTs := &metricspb.TimeSeries{ 119 LabelValues: lvs, 120 StartTimestamp: startTime, 121 Points: []*metricspb.Point{ 122 { 123 Value: &metricspb.Point_Int64Value{ 124 Int64Value: summaryValue.Count.Value, 125 }, 126 Timestamp: ptTimestamp, 127 }, 128 }, 129 } 130 countTss = append(countTss, countTs) 131 } 132 133 snapshot := summaryValue.GetSnapshot() 134 for _, percentileValue := range snapshot.GetPercentileValues() { 135 lvsWithPercentile := lvs[0:] 136 lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{ 137 HasValue: true, 138 Value: fmt.Sprintf("%f", percentileValue.Percentile), 139 }) 140 percentileTs := &metricspb.TimeSeries{ 141 LabelValues: lvsWithPercentile, 142 StartTimestamp: nil, 143 Points: []*metricspb.Point{ 144 { 145 Value: &metricspb.Point_DoubleValue{ 146 DoubleValue: percentileValue.Value, 147 }, 148 Timestamp: ptTimestamp, 149 }, 150 }, 151 } 152 percentileTss = append(percentileTss, percentileTs) 153 } 154 } 155 156 if len(sumTss) > 0 { 157 metric := &metricspb.Metric{ 158 MetricDescriptor: &metricspb.MetricDescriptor{ 159 Name: fmt.Sprintf("%s_summary_sum", summary.GetMetricDescriptor().GetName()), 160 Description: summary.GetMetricDescriptor().GetDescription(), 161 Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, 162 Unit: summary.GetMetricDescriptor().GetUnit(), 163 LabelKeys: summary.GetMetricDescriptor().GetLabelKeys(), 164 }, 165 Timeseries: sumTss, 166 Resource: summary.Resource, 167 } 168 metrics = append(metrics, metric) 169 } 170 if len(countTss) > 0 { 171 metric := &metricspb.Metric{ 172 MetricDescriptor: &metricspb.MetricDescriptor{ 173 Name: fmt.Sprintf("%s_summary_count", summary.GetMetricDescriptor().GetName()), 174 Description: summary.GetMetricDescriptor().GetDescription(), 175 Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, 176 Unit: "1", 177 LabelKeys: summary.GetMetricDescriptor().GetLabelKeys(), 178 }, 179 Timeseries: countTss, 180 Resource: summary.Resource, 181 } 182 metrics = append(metrics, metric) 183 } 184 if len(percentileTss) > 0 { 185 lks := summary.GetMetricDescriptor().GetLabelKeys()[0:] 186 lks = append(lks, percentileLabelKey) 187 metric := &metricspb.Metric{ 188 MetricDescriptor: &metricspb.MetricDescriptor{ 189 Name: fmt.Sprintf("%s_summary_percentile", summary.GetMetricDescriptor().GetName()), 190 Description: summary.GetMetricDescriptor().GetDescription(), 191 Type: metricspb.MetricDescriptor_GAUGE_DOUBLE, 192 Unit: summary.GetMetricDescriptor().GetUnit(), 193 LabelKeys: lks, 194 }, 195 Timeseries: percentileTss, 196 Resource: summary.Resource, 197 } 198 metrics = append(metrics, metric) 199 } 200 } 201 return metrics 202} 203 204func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource { 205 var resource = rsc 206 if metric.Resource != nil { 207 resource = metric.Resource 208 } 209 mappedRsc, ok := seenRscs[resource] 210 if !ok { 211 mappedRsc = se.o.MapResource(resourcepbToResource(resource)) 212 seenRscs[resource] = mappedRsc 213 } 214 return mappedRsc 215} 216 217func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource { 218 if rsc == nil { 219 return globalResource 220 } 221 res := &resource.Resource{ 222 Type: rsc.Type, 223 Labels: make(map[string]string, len(rsc.Labels)), 224 } 225 226 for k, v := range rsc.Labels { 227 res.Labels[k] = v 228 } 229 return res 230} 231 232// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest 233// but it doesn't invoke any remote API. 234func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) { 235 if metric == nil || metric.MetricDescriptor == nil { 236 mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor) 237 } 238 239 metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName()) 240 metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys() 241 metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) 242 labelKeys := make([]string, 0, len(metricLabelKeys)) 243 for _, key := range metricLabelKeys { 244 labelKeys = append(labelKeys, sanitize(key.GetKey())) 245 } 246 247 for _, protoTimeSeries := range metric.Timeseries { 248 if len(protoTimeSeries.Points) == 0 { 249 // No points to send just move forward. 250 continue 251 } 252 253 sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind) 254 if err != nil { 255 mb.recordDroppedTimeseries(1, err) 256 continue 257 } 258 259 // Each TimeSeries has labelValues which MUST be correlated 260 // with that from the MetricDescriptor 261 labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues()) 262 if err != nil { 263 mb.recordDroppedTimeseries(1, err) 264 continue 265 } 266 mb.addTimeSeries(&monitoringpb.TimeSeries{ 267 Metric: &googlemetricpb.Metric{ 268 Type: metricType, 269 Labels: labels, 270 }, 271 MetricKind: metricKind, 272 ValueType: valueType, 273 Resource: mappedRsc, 274 Points: sdPoints, 275 }) 276 } 277} 278 279func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) { 280 if len(labelKeys) != len(labelValues) { 281 return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues)) 282 } 283 284 if len(defaults)+len(labelKeys) == 0 { 285 // No labels for this metric 286 return nil, nil 287 } 288 289 labels := make(map[string]string) 290 // Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched. 291 for key, label := range defaults { 292 labels[key] = label.val 293 } 294 295 for i, labelKey := range labelKeys { 296 labelValue := labelValues[i] 297 if !labelValue.GetHasValue() { 298 continue 299 } 300 labels[labelKey] = labelValue.GetValue() 301 } 302 303 return labels, nil 304} 305 306func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error { 307 // Skip create metric descriptor if configured 308 if se.o.SkipCMD { 309 return nil 310 } 311 312 ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout) 313 defer cancel() 314 315 se.protoMu.Lock() 316 defer se.protoMu.Unlock() 317 318 name := metric.GetMetricDescriptor().GetName() 319 if _, created := se.protoMetricDescriptors[name]; created { 320 return nil 321 } 322 323 if builtinMetric(se.metricTypeFromProto(name)) { 324 se.protoMetricDescriptors[name] = true 325 return nil 326 } 327 328 // Otherwise, we encountered a cache-miss and 329 // should create the metric descriptor remotely. 330 inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels) 331 if err != nil { 332 return err 333 } 334 335 if err = se.createMetricDescriptor(ctx, inMD); err != nil { 336 return err 337 } 338 339 se.protoMetricDescriptors[name] = true 340 return nil 341} 342 343func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) { 344 sptl := make([]*monitoringpb.Point, 0, len(ts.Points)) 345 for _, pt := range ts.Points { 346 // If we have a last value aggregation point i.e. MetricDescriptor_GAUGE 347 // StartTime should be nil. 348 startTime := ts.StartTimestamp 349 if metricKind == googlemetricpb.MetricDescriptor_GAUGE { 350 startTime = nil 351 } 352 spt, err := fromProtoPoint(startTime, pt) 353 if err != nil { 354 return nil, err 355 } 356 sptl = append(sptl, spt) 357 } 358 return sptl, nil 359} 360 361func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) { 362 if metric == nil || metric.MetricDescriptor == nil { 363 return nil, errNilMetricOrMetricDescriptor 364 } 365 366 md := metric.GetMetricDescriptor() 367 metricName := md.GetName() 368 unit := md.GetUnit() 369 description := md.GetDescription() 370 metricType := se.metricTypeFromProto(metricName) 371 displayName := se.displayName(metricName) 372 metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric) 373 374 sdm := &googlemetricpb.MetricDescriptor{ 375 Name: fmt.Sprintf("projects/%s/metricDescriptors/%s", se.o.ProjectID, metricType), 376 DisplayName: displayName, 377 Description: description, 378 Unit: unit, 379 Type: metricType, 380 MetricKind: metricKind, 381 ValueType: valueType, 382 Labels: labelDescriptorsFromProto(additionalLabels, metric.GetMetricDescriptor().GetLabelKeys()), 383 } 384 385 return sdm, nil 386} 387 388func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys []*metricspb.LabelKey) []*labelpb.LabelDescriptor { 389 labelDescriptors := make([]*labelpb.LabelDescriptor, 0, len(defaults)+len(protoLabelKeys)) 390 391 // Fill in the defaults first. 392 for key, lbl := range defaults { 393 labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ 394 Key: sanitize(key), 395 Description: lbl.desc, 396 ValueType: labelpb.LabelDescriptor_STRING, 397 }) 398 } 399 400 // Now fill in those from the metric. 401 for _, protoKey := range protoLabelKeys { 402 labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ 403 Key: sanitize(protoKey.GetKey()), 404 Description: protoKey.GetDescription(), 405 ValueType: labelpb.LabelDescriptor_STRING, // We only use string tags 406 }) 407 } 408 return labelDescriptors 409} 410 411func (se *statsExporter) metricTypeFromProto(name string) string { 412 prefix := se.o.MetricPrefix 413 if se.o.GetMetricPrefix != nil { 414 prefix = se.o.GetMetricPrefix(name) 415 } 416 if prefix != "" { 417 name = path.Join(prefix, name) 418 } 419 if !hasDomain(name) { 420 // Still needed because the name may or may not have a "/" at the beginning. 421 name = path.Join(defaultDomain, name) 422 } 423 return name 424} 425 426// hasDomain checks if the metric name already has a domain in it. 427func hasDomain(name string) bool { 428 for _, domain := range domains { 429 if strings.Contains(name, domain) { 430 return true 431 } 432 } 433 return false 434} 435 436func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) { 437 if pt == nil { 438 return nil, nil 439 } 440 441 mptv, err := protoToMetricPoint(pt.Value) 442 if err != nil { 443 return nil, err 444 } 445 446 endTime := pt.Timestamp 447 interval := &monitoringpb.TimeInterval{ 448 StartTime: startTime, 449 EndTime: endTime, 450 } 451 if startTime != nil && endTime != nil { 452 interval = toValidTimeIntervalpb(startTime.AsTime(), endTime.AsTime()) 453 } 454 455 return &monitoringpb.Point{ 456 Value: mptv, 457 Interval: interval, 458 }, nil 459} 460 461func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) { 462 if value == nil { 463 return nil, nil 464 } 465 466 switch v := value.(type) { 467 default: 468 // All the other types are not yet handled. 469 // TODO: (@odeke-em, @songy23) talk to the Stackdriver team to determine 470 // the use cases for: 471 // 472 // *TypedValue_BoolValue 473 // *TypedValue_StringValue 474 // 475 // and then file feature requests on OpenCensus-Specs and then OpenCensus-Proto, 476 // lest we shall error here. 477 // 478 // TODO: Add conversion from SummaryValue when 479 // https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66 480 // has been figured out. 481 return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value) 482 483 case *metricspb.Point_Int64Value: 484 return &monitoringpb.TypedValue{ 485 Value: &monitoringpb.TypedValue_Int64Value{ 486 Int64Value: v.Int64Value, 487 }, 488 }, nil 489 490 case *metricspb.Point_DoubleValue: 491 return &monitoringpb.TypedValue{ 492 Value: &monitoringpb.TypedValue_DoubleValue{ 493 DoubleValue: v.DoubleValue, 494 }, 495 }, nil 496 497 case *metricspb.Point_DistributionValue: 498 dv := v.DistributionValue 499 var mv *monitoringpb.TypedValue_DistributionValue 500 if dv != nil { 501 var mean float64 502 if dv.Count > 0 { 503 mean = float64(dv.Sum) / float64(dv.Count) 504 } 505 mv = &monitoringpb.TypedValue_DistributionValue{ 506 DistributionValue: &distributionpb.Distribution{ 507 Count: dv.Count, 508 Mean: mean, 509 SumOfSquaredDeviation: dv.SumOfSquaredDeviation, 510 }, 511 } 512 513 insertZeroBound := false 514 if bopts := dv.BucketOptions; bopts != nil && bopts.Type != nil { 515 bexp, ok := bopts.Type.(*metricspb.DistributionValue_BucketOptions_Explicit_) 516 if ok && bexp != nil && bexp.Explicit != nil { 517 insertZeroBound = shouldInsertZeroBound(bexp.Explicit.Bounds...) 518 mv.DistributionValue.BucketOptions = &distributionpb.Distribution_BucketOptions{ 519 Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{ 520 ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{ 521 // The first bucket bound should be 0.0 because the Metrics first bucket is 522 // [0, first_bound) but Stackdriver monitoring bucket bounds begin with -infinity 523 // (first bucket is (-infinity, 0)) 524 Bounds: addZeroBoundOnCondition(insertZeroBound, bexp.Explicit.Bounds...), 525 }, 526 }, 527 } 528 } 529 } 530 mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...) 531 532 } 533 return &monitoringpb.TypedValue{Value: mv}, nil 534 } 535} 536 537func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 { 538 bucketCounts := make([]int64, len(buckets)) 539 for i, bucket := range buckets { 540 if bucket != nil { 541 bucketCounts[i] = bucket.Count 542 } 543 } 544 return bucketCounts 545} 546 547func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) { 548 dt := m.GetMetricDescriptor() 549 if dt == nil { 550 return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED 551 } 552 553 switch dt.Type { 554 case metricspb.MetricDescriptor_CUMULATIVE_INT64: 555 return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_INT64 556 557 case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: 558 return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DOUBLE 559 560 case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: 561 return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DISTRIBUTION 562 563 case metricspb.MetricDescriptor_GAUGE_DOUBLE: 564 return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE 565 566 case metricspb.MetricDescriptor_GAUGE_INT64: 567 return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64 568 569 case metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: 570 return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DISTRIBUTION 571 572 default: 573 return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED 574 } 575} 576