1// Copyright 2018, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package stackdriver
16
17/*
18The code in this file is responsible for converting OpenCensus Proto metrics
19directly to Stackdriver Metrics.
20*/
21
22import (
23	"context"
24	"errors"
25	"fmt"
26	"path"
27	"strings"
28
29	"go.opencensus.io/resource"
30
31	commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
32	metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
33	resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
34	timestamppb "github.com/golang/protobuf/ptypes/timestamp"
35	distributionpb "google.golang.org/genproto/googleapis/api/distribution"
36	labelpb "google.golang.org/genproto/googleapis/api/label"
37	googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
38	monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
39	monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
40)
41
42var errNilMetricOrMetricDescriptor = errors.New("non-nil metric or metric descriptor")
43var percentileLabelKey = &metricspb.LabelKey{
44	Key:         "percentile",
45	Description: "the value at a given percentile of a distribution",
46}
47var globalResource = &resource.Resource{Type: "global"}
48var domains = []string{"googleapis.com", "kubernetes.io", "istio.io", "knative.dev"}
49
50// PushMetricsProto exports OpenCensus Metrics Proto to Stackdriver Monitoring synchronously,
51// without de-duping or adding proto metrics to the bundler.
52func (se *statsExporter) PushMetricsProto(ctx context.Context, node *commonpb.Node, rsc *resourcepb.Resource, metrics []*metricspb.Metric) (int, error) {
53	if len(metrics) == 0 {
54		return 0, errNilMetricOrMetricDescriptor
55	}
56
57	// Caches the resources seen so far
58	seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)
59
60	mb := newMetricsBatcher(ctx, se.o.ProjectID, se.o.NumberOfWorkers, se.c, se.o.Timeout)
61	for _, metric := range metrics {
62		if len(metric.GetTimeseries()) == 0 {
63			// No TimeSeries to export, skip this metric.
64			continue
65		}
66		mappedRsc := se.getResource(rsc, metric, seenResources)
67		if metric.GetMetricDescriptor().GetType() == metricspb.MetricDescriptor_SUMMARY {
68			summaryMtcs := se.convertSummaryMetrics(metric)
69			for _, summaryMtc := range summaryMtcs {
70				if err := se.createMetricDescriptorFromMetricProto(ctx, summaryMtc); err != nil {
71					mb.recordDroppedTimeseries(len(summaryMtc.GetTimeseries()), err)
72					continue
73				}
74				se.protoMetricToTimeSeries(ctx, mappedRsc, summaryMtc, mb)
75			}
76		} else {
77			if err := se.createMetricDescriptorFromMetricProto(ctx, metric); err != nil {
78				mb.recordDroppedTimeseries(len(metric.GetTimeseries()), err)
79				continue
80			}
81			se.protoMetricToTimeSeries(ctx, mappedRsc, metric, mb)
82		}
83	}
84
85	return mb.droppedTimeSeries, mb.close(ctx)
86}
87
88func (se *statsExporter) convertSummaryMetrics(summary *metricspb.Metric) []*metricspb.Metric {
89	var metrics []*metricspb.Metric
90
91	for _, ts := range summary.Timeseries {
92		var percentileTss []*metricspb.TimeSeries
93		var countTss []*metricspb.TimeSeries
94		var sumTss []*metricspb.TimeSeries
95		lvs := ts.GetLabelValues()
96
97		startTime := ts.StartTimestamp
98		for _, pt := range ts.GetPoints() {
99			ptTimestamp := pt.GetTimestamp()
100			summaryValue := pt.GetSummaryValue()
101			if summaryValue.Sum != nil {
102				sumTs := &metricspb.TimeSeries{
103					LabelValues:    lvs,
104					StartTimestamp: startTime,
105					Points: []*metricspb.Point{
106						{
107							Value: &metricspb.Point_DoubleValue{
108								DoubleValue: summaryValue.Sum.Value,
109							},
110							Timestamp: ptTimestamp,
111						},
112					},
113				}
114				sumTss = append(sumTss, sumTs)
115			}
116
117			if summaryValue.Count != nil {
118				countTs := &metricspb.TimeSeries{
119					LabelValues:    lvs,
120					StartTimestamp: startTime,
121					Points: []*metricspb.Point{
122						{
123							Value: &metricspb.Point_Int64Value{
124								Int64Value: summaryValue.Count.Value,
125							},
126							Timestamp: ptTimestamp,
127						},
128					},
129				}
130				countTss = append(countTss, countTs)
131			}
132
133			snapshot := summaryValue.GetSnapshot()
134			for _, percentileValue := range snapshot.GetPercentileValues() {
135				lvsWithPercentile := lvs[0:]
136				lvsWithPercentile = append(lvsWithPercentile, &metricspb.LabelValue{
137					HasValue: true,
138					Value:    fmt.Sprintf("%f", percentileValue.Percentile),
139				})
140				percentileTs := &metricspb.TimeSeries{
141					LabelValues:    lvsWithPercentile,
142					StartTimestamp: nil,
143					Points: []*metricspb.Point{
144						{
145							Value: &metricspb.Point_DoubleValue{
146								DoubleValue: percentileValue.Value,
147							},
148							Timestamp: ptTimestamp,
149						},
150					},
151				}
152				percentileTss = append(percentileTss, percentileTs)
153			}
154		}
155
156		if len(sumTss) > 0 {
157			metric := &metricspb.Metric{
158				MetricDescriptor: &metricspb.MetricDescriptor{
159					Name:        fmt.Sprintf("%s_summary_sum", summary.GetMetricDescriptor().GetName()),
160					Description: summary.GetMetricDescriptor().GetDescription(),
161					Type:        metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
162					Unit:        summary.GetMetricDescriptor().GetUnit(),
163					LabelKeys:   summary.GetMetricDescriptor().GetLabelKeys(),
164				},
165				Timeseries: sumTss,
166				Resource:   summary.Resource,
167			}
168			metrics = append(metrics, metric)
169		}
170		if len(countTss) > 0 {
171			metric := &metricspb.Metric{
172				MetricDescriptor: &metricspb.MetricDescriptor{
173					Name:        fmt.Sprintf("%s_summary_count", summary.GetMetricDescriptor().GetName()),
174					Description: summary.GetMetricDescriptor().GetDescription(),
175					Type:        metricspb.MetricDescriptor_CUMULATIVE_INT64,
176					Unit:        "1",
177					LabelKeys:   summary.GetMetricDescriptor().GetLabelKeys(),
178				},
179				Timeseries: countTss,
180				Resource:   summary.Resource,
181			}
182			metrics = append(metrics, metric)
183		}
184		if len(percentileTss) > 0 {
185			lks := summary.GetMetricDescriptor().GetLabelKeys()[0:]
186			lks = append(lks, percentileLabelKey)
187			metric := &metricspb.Metric{
188				MetricDescriptor: &metricspb.MetricDescriptor{
189					Name:        fmt.Sprintf("%s_summary_percentile", summary.GetMetricDescriptor().GetName()),
190					Description: summary.GetMetricDescriptor().GetDescription(),
191					Type:        metricspb.MetricDescriptor_GAUGE_DOUBLE,
192					Unit:        summary.GetMetricDescriptor().GetUnit(),
193					LabelKeys:   lks,
194				},
195				Timeseries: percentileTss,
196				Resource:   summary.Resource,
197			}
198			metrics = append(metrics, metric)
199		}
200	}
201	return metrics
202}
203
204func (se *statsExporter) getResource(rsc *resourcepb.Resource, metric *metricspb.Metric, seenRscs map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) *monitoredrespb.MonitoredResource {
205	var resource = rsc
206	if metric.Resource != nil {
207		resource = metric.Resource
208	}
209	mappedRsc, ok := seenRscs[resource]
210	if !ok {
211		mappedRsc = se.o.MapResource(resourcepbToResource(resource))
212		seenRscs[resource] = mappedRsc
213	}
214	return mappedRsc
215}
216
217func resourcepbToResource(rsc *resourcepb.Resource) *resource.Resource {
218	if rsc == nil {
219		return globalResource
220	}
221	res := &resource.Resource{
222		Type:   rsc.Type,
223		Labels: make(map[string]string, len(rsc.Labels)),
224	}
225
226	for k, v := range rsc.Labels {
227		res.Labels[k] = v
228	}
229	return res
230}
231
232// protoMetricToTimeSeries converts a metric into a Stackdriver Monitoring v3 API CreateTimeSeriesRequest
233// but it doesn't invoke any remote API.
234func (se *statsExporter) protoMetricToTimeSeries(ctx context.Context, mappedRsc *monitoredrespb.MonitoredResource, metric *metricspb.Metric, mb *metricsBatcher) {
235	if metric == nil || metric.MetricDescriptor == nil {
236		mb.recordDroppedTimeseries(len(metric.GetTimeseries()), errNilMetricOrMetricDescriptor)
237	}
238
239	metricType := se.metricTypeFromProto(metric.GetMetricDescriptor().GetName())
240	metricLabelKeys := metric.GetMetricDescriptor().GetLabelKeys()
241	metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
242	labelKeys := make([]string, 0, len(metricLabelKeys))
243	for _, key := range metricLabelKeys {
244		labelKeys = append(labelKeys, sanitize(key.GetKey()))
245	}
246
247	for _, protoTimeSeries := range metric.Timeseries {
248		if len(protoTimeSeries.Points) == 0 {
249			// No points to send just move forward.
250			continue
251		}
252
253		sdPoints, err := se.protoTimeSeriesToMonitoringPoints(protoTimeSeries, metricKind)
254		if err != nil {
255			mb.recordDroppedTimeseries(1, err)
256			continue
257		}
258
259		// Each TimeSeries has labelValues which MUST be correlated
260		// with that from the MetricDescriptor
261		labels, err := labelsPerTimeSeries(se.defaultLabels, labelKeys, protoTimeSeries.GetLabelValues())
262		if err != nil {
263			mb.recordDroppedTimeseries(1, err)
264			continue
265		}
266		mb.addTimeSeries(&monitoringpb.TimeSeries{
267			Metric: &googlemetricpb.Metric{
268				Type:   metricType,
269				Labels: labels,
270			},
271			MetricKind: metricKind,
272			ValueType:  valueType,
273			Resource:   mappedRsc,
274			Points:     sdPoints,
275		})
276	}
277}
278
279func labelsPerTimeSeries(defaults map[string]labelValue, labelKeys []string, labelValues []*metricspb.LabelValue) (map[string]string, error) {
280	if len(labelKeys) != len(labelValues) {
281		return nil, fmt.Errorf("length mismatch: len(labelKeys)=%d len(labelValues)=%d", len(labelKeys), len(labelValues))
282	}
283
284	if len(defaults)+len(labelKeys) == 0 {
285		// No labels for this metric
286		return nil, nil
287	}
288
289	labels := make(map[string]string)
290	// Fill in the defaults firstly, irrespective of if the labelKeys and labelValues are mismatched.
291	for key, label := range defaults {
292		labels[key] = label.val
293	}
294
295	for i, labelKey := range labelKeys {
296		labelValue := labelValues[i]
297		if !labelValue.GetHasValue() {
298			continue
299		}
300		labels[labelKey] = labelValue.GetValue()
301	}
302
303	return labels, nil
304}
305
306func (se *statsExporter) createMetricDescriptorFromMetricProto(ctx context.Context, metric *metricspb.Metric) error {
307	// Skip create metric descriptor if configured
308	if se.o.SkipCMD {
309		return nil
310	}
311
312	ctx, cancel := newContextWithTimeout(ctx, se.o.Timeout)
313	defer cancel()
314
315	se.protoMu.Lock()
316	defer se.protoMu.Unlock()
317
318	name := metric.GetMetricDescriptor().GetName()
319	if _, created := se.protoMetricDescriptors[name]; created {
320		return nil
321	}
322
323	if builtinMetric(se.metricTypeFromProto(name)) {
324		se.protoMetricDescriptors[name] = true
325		return nil
326	}
327
328	// Otherwise, we encountered a cache-miss and
329	// should create the metric descriptor remotely.
330	inMD, err := se.protoToMonitoringMetricDescriptor(metric, se.defaultLabels)
331	if err != nil {
332		return err
333	}
334
335	if err = se.createMetricDescriptor(ctx, inMD); err != nil {
336		return err
337	}
338
339	se.protoMetricDescriptors[name] = true
340	return nil
341}
342
343func (se *statsExporter) protoTimeSeriesToMonitoringPoints(ts *metricspb.TimeSeries, metricKind googlemetricpb.MetricDescriptor_MetricKind) ([]*monitoringpb.Point, error) {
344	sptl := make([]*monitoringpb.Point, 0, len(ts.Points))
345	for _, pt := range ts.Points {
346		// If we have a last value aggregation point i.e. MetricDescriptor_GAUGE
347		// StartTime should be nil.
348		startTime := ts.StartTimestamp
349		if metricKind == googlemetricpb.MetricDescriptor_GAUGE {
350			startTime = nil
351		}
352		spt, err := fromProtoPoint(startTime, pt)
353		if err != nil {
354			return nil, err
355		}
356		sptl = append(sptl, spt)
357	}
358	return sptl, nil
359}
360
361func (se *statsExporter) protoToMonitoringMetricDescriptor(metric *metricspb.Metric, additionalLabels map[string]labelValue) (*googlemetricpb.MetricDescriptor, error) {
362	if metric == nil || metric.MetricDescriptor == nil {
363		return nil, errNilMetricOrMetricDescriptor
364	}
365
366	md := metric.GetMetricDescriptor()
367	metricName := md.GetName()
368	unit := md.GetUnit()
369	description := md.GetDescription()
370	metricType := se.metricTypeFromProto(metricName)
371	displayName := se.displayName(metricName)
372	metricKind, valueType := protoMetricDescriptorTypeToMetricKind(metric)
373
374	sdm := &googlemetricpb.MetricDescriptor{
375		Name:        fmt.Sprintf("projects/%s/metricDescriptors/%s", se.o.ProjectID, metricType),
376		DisplayName: displayName,
377		Description: description,
378		Unit:        unit,
379		Type:        metricType,
380		MetricKind:  metricKind,
381		ValueType:   valueType,
382		Labels:      labelDescriptorsFromProto(additionalLabels, metric.GetMetricDescriptor().GetLabelKeys()),
383	}
384
385	return sdm, nil
386}
387
388func labelDescriptorsFromProto(defaults map[string]labelValue, protoLabelKeys []*metricspb.LabelKey) []*labelpb.LabelDescriptor {
389	labelDescriptors := make([]*labelpb.LabelDescriptor, 0, len(defaults)+len(protoLabelKeys))
390
391	// Fill in the defaults first.
392	for key, lbl := range defaults {
393		labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
394			Key:         sanitize(key),
395			Description: lbl.desc,
396			ValueType:   labelpb.LabelDescriptor_STRING,
397		})
398	}
399
400	// Now fill in those from the metric.
401	for _, protoKey := range protoLabelKeys {
402		labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
403			Key:         sanitize(protoKey.GetKey()),
404			Description: protoKey.GetDescription(),
405			ValueType:   labelpb.LabelDescriptor_STRING, // We only use string tags
406		})
407	}
408	return labelDescriptors
409}
410
411func (se *statsExporter) metricTypeFromProto(name string) string {
412	prefix := se.o.MetricPrefix
413	if se.o.GetMetricPrefix != nil {
414		prefix = se.o.GetMetricPrefix(name)
415	}
416	if prefix != "" {
417		name = path.Join(prefix, name)
418	}
419	if !hasDomain(name) {
420		// Still needed because the name may or may not have a "/" at the beginning.
421		name = path.Join(defaultDomain, name)
422	}
423	return name
424}
425
426// hasDomain checks if the metric name already has a domain in it.
427func hasDomain(name string) bool {
428	for _, domain := range domains {
429		if strings.Contains(name, domain) {
430			return true
431		}
432	}
433	return false
434}
435
436func fromProtoPoint(startTime *timestamppb.Timestamp, pt *metricspb.Point) (*monitoringpb.Point, error) {
437	if pt == nil {
438		return nil, nil
439	}
440
441	mptv, err := protoToMetricPoint(pt.Value)
442	if err != nil {
443		return nil, err
444	}
445
446	endTime := pt.Timestamp
447	interval := &monitoringpb.TimeInterval{
448		StartTime: startTime,
449		EndTime:   endTime,
450	}
451	if startTime != nil && endTime != nil {
452		interval = toValidTimeIntervalpb(startTime.AsTime(), endTime.AsTime())
453	}
454
455	return &monitoringpb.Point{
456		Value:    mptv,
457		Interval: interval,
458	}, nil
459}
460
461func protoToMetricPoint(value interface{}) (*monitoringpb.TypedValue, error) {
462	if value == nil {
463		return nil, nil
464	}
465
466	switch v := value.(type) {
467	default:
468		// All the other types are not yet handled.
469		// TODO: (@odeke-em, @songy23) talk to the Stackdriver team to determine
470		// the use cases for:
471		//
472		//      *TypedValue_BoolValue
473		//      *TypedValue_StringValue
474		//
475		// and then file feature requests on OpenCensus-Specs and then OpenCensus-Proto,
476		// lest we shall error here.
477		//
478		// TODO: Add conversion from SummaryValue when
479		//      https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/66
480		// has been figured out.
481		return nil, fmt.Errorf("protoToMetricPoint: unknown Data type: %T", value)
482
483	case *metricspb.Point_Int64Value:
484		return &monitoringpb.TypedValue{
485			Value: &monitoringpb.TypedValue_Int64Value{
486				Int64Value: v.Int64Value,
487			},
488		}, nil
489
490	case *metricspb.Point_DoubleValue:
491		return &monitoringpb.TypedValue{
492			Value: &monitoringpb.TypedValue_DoubleValue{
493				DoubleValue: v.DoubleValue,
494			},
495		}, nil
496
497	case *metricspb.Point_DistributionValue:
498		dv := v.DistributionValue
499		var mv *monitoringpb.TypedValue_DistributionValue
500		if dv != nil {
501			var mean float64
502			if dv.Count > 0 {
503				mean = float64(dv.Sum) / float64(dv.Count)
504			}
505			mv = &monitoringpb.TypedValue_DistributionValue{
506				DistributionValue: &distributionpb.Distribution{
507					Count:                 dv.Count,
508					Mean:                  mean,
509					SumOfSquaredDeviation: dv.SumOfSquaredDeviation,
510				},
511			}
512
513			insertZeroBound := false
514			if bopts := dv.BucketOptions; bopts != nil && bopts.Type != nil {
515				bexp, ok := bopts.Type.(*metricspb.DistributionValue_BucketOptions_Explicit_)
516				if ok && bexp != nil && bexp.Explicit != nil {
517					insertZeroBound = shouldInsertZeroBound(bexp.Explicit.Bounds...)
518					mv.DistributionValue.BucketOptions = &distributionpb.Distribution_BucketOptions{
519						Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
520							ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
521								// The first bucket bound should be 0.0 because the Metrics first bucket is
522								// [0, first_bound) but Stackdriver monitoring bucket bounds begin with -infinity
523								// (first bucket is (-infinity, 0))
524								Bounds: addZeroBoundOnCondition(insertZeroBound, bexp.Explicit.Bounds...),
525							},
526						},
527					}
528				}
529			}
530			mv.DistributionValue.BucketCounts = addZeroBucketCountOnCondition(insertZeroBound, bucketCounts(dv.Buckets)...)
531
532		}
533		return &monitoringpb.TypedValue{Value: mv}, nil
534	}
535}
536
537func bucketCounts(buckets []*metricspb.DistributionValue_Bucket) []int64 {
538	bucketCounts := make([]int64, len(buckets))
539	for i, bucket := range buckets {
540		if bucket != nil {
541			bucketCounts[i] = bucket.Count
542		}
543	}
544	return bucketCounts
545}
546
547func protoMetricDescriptorTypeToMetricKind(m *metricspb.Metric) (googlemetricpb.MetricDescriptor_MetricKind, googlemetricpb.MetricDescriptor_ValueType) {
548	dt := m.GetMetricDescriptor()
549	if dt == nil {
550		return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
551	}
552
553	switch dt.Type {
554	case metricspb.MetricDescriptor_CUMULATIVE_INT64:
555		return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_INT64
556
557	case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE:
558		return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DOUBLE
559
560	case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION:
561		return googlemetricpb.MetricDescriptor_CUMULATIVE, googlemetricpb.MetricDescriptor_DISTRIBUTION
562
563	case metricspb.MetricDescriptor_GAUGE_DOUBLE:
564		return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DOUBLE
565
566	case metricspb.MetricDescriptor_GAUGE_INT64:
567		return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_INT64
568
569	case metricspb.MetricDescriptor_GAUGE_DISTRIBUTION:
570		return googlemetricpb.MetricDescriptor_GAUGE, googlemetricpb.MetricDescriptor_DISTRIBUTION
571
572	default:
573		return googlemetricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, googlemetricpb.MetricDescriptor_VALUE_TYPE_UNSPECIFIED
574	}
575}
576