1// Copyright 2018, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package stackdriver
16
17import (
18	"context"
19	"fmt"
20	"net"
21	"sync"
22	"testing"
23	"time"
24
25	"go.opencensus.io/metric/metricdata"
26	"google.golang.org/api/option"
27	"google.golang.org/grpc"
28
29	metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
30	resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
31
32	"github.com/golang/protobuf/ptypes/empty"
33	timestamp "github.com/golang/protobuf/ptypes/timestamp"
34	googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
35	monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
36	monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
37)
38
39func TestStatsAndMetricsEquivalence(t *testing.T) {
40	startTime := time.Unix(1000, 0)
41	startTimePb := &timestamp.Timestamp{Seconds: 1000}
42	md := metricdata.Descriptor{
43		Name:        "ocagent.io/latency",
44		Description: "The latency of the various methods",
45		Unit:        "ms",
46		Type:        metricdata.TypeCumulativeInt64,
47	}
48	metricDescriptor := &metricspb.MetricDescriptor{
49		Name:        "ocagent.io/latency",
50		Description: "The latency of the various methods",
51		Unit:        "ms",
52		Type:        metricspb.MetricDescriptor_CUMULATIVE_INT64,
53	}
54	seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource)
55
56	// Generate some metricdata.Metric and metrics proto.
57	var metrics []*metricdata.Metric
58	var metricPbs []*metricspb.Metric
59	for i := 0; i < 100; i++ {
60		metric := &metricdata.Metric{
61			Descriptor: md,
62			TimeSeries: []*metricdata.TimeSeries{
63				{
64					StartTime: startTime,
65					Points:    []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(1+i)*time.Second), int64(4*(i+2)))},
66				},
67			},
68		}
69		metricPb := &metricspb.Metric{
70			MetricDescriptor: metricDescriptor,
71			Timeseries: []*metricspb.TimeSeries{
72				{
73					StartTimestamp: startTimePb,
74					Points: []*metricspb.Point{
75						{
76							Timestamp: &timestamp.Timestamp{Seconds: int64(1001 + i)},
77							Value:     &metricspb.Point_Int64Value{Int64Value: int64(4 * (i + 2))},
78						},
79					},
80				},
81			},
82		}
83		metrics = append(metrics, metric)
84		metricPbs = append(metricPbs, metricPb)
85	}
86
87	// Now perform some exporting.
88	for i, metric := range metrics {
89		se := &statsExporter{
90			o: Options{ProjectID: "equivalence", MapResource: DefaultMapResource},
91		}
92
93		ctx := context.Background()
94		sMD, err := se.metricToMpbMetricDescriptor(metric)
95		if err != nil {
96			t.Errorf("#%d: Stats.metricToMpbMetricDescriptor: %v", i, err)
97		}
98		sMDR := &monitoringpb.CreateMetricDescriptorRequest{
99			Name:             fmt.Sprintf("projects/%s", se.o.ProjectID),
100			MetricDescriptor: sMD,
101		}
102		inMD, err := se.protoToMonitoringMetricDescriptor(metricPbs[i], nil)
103		if err != nil {
104			t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err)
105		}
106		pMDR := &monitoringpb.CreateMetricDescriptorRequest{
107			Name:             fmt.Sprintf("projects/%s", se.o.ProjectID),
108			MetricDescriptor: inMD,
109		}
110		if diff := cmpMDReq(pMDR, sMDR); diff != "" {
111			t.Fatalf("MetricDescriptor Mismatch -FromMetricsPb +FromMetrics: %s", diff)
112		}
113
114		stss, _ := se.metricToMpbTs(ctx, metric)
115		sctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(stss)
116		allTss, _ := protoMetricToTimeSeries(ctx, se, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i])
117		pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss)
118		if diff := cmpTSReqs(pctreql, sctreql); diff != "" {
119			t.Fatalf("TimeSeries Mismatch -FromMetricsPb +FromMetrics: %s", diff)
120		}
121	}
122}
123
124// This test creates and uses a "Stackdriver backend" which receives
125// CreateTimeSeriesRequest and CreateMetricDescriptor requests
126// that the Stackdriver Metrics Proto client then sends to, as it would
127// send to Google Stackdriver backends.
128//
129// This test ensures that the final responses sent by direct stats(metricdata.Metric) exporting
130// are exactly equal to those from metricdata.Metric-->OpenCensus-Proto.Metrics exporting.
131func TestEquivalenceStatsVsMetricsUploads(t *testing.T) {
132	server, addr, doneFn := createFakeServer(t)
133	defer doneFn()
134
135	// Now create a gRPC connection to the fake Stackdriver server.
136	conn, err := grpc.Dial(addr, grpc.WithInsecure())
137	if err != nil {
138		t.Fatalf("Failed to make a gRPC connection to the server: %v", err)
139	}
140	defer conn.Close()
141
142	// Finally create the OpenCensus stats exporter
143	exporterOptions := Options{
144		ProjectID:               "equivalence",
145		MonitoringClientOptions: []option.ClientOption{option.WithGRPCConn(conn)},
146
147		// Setting this time delay threshold to a very large value
148		// so that batching is performed deterministically and flushing is
149		// fully controlled by us.
150		BundleDelayThreshold: 2 * time.Hour,
151		MapResource:          DefaultMapResource,
152	}
153	se, err := newStatsExporter(exporterOptions)
154	if err != nil {
155		t.Fatalf("Failed to create the statsExporter: %v", err)
156	}
157
158	startTime := time.Unix(1000, 0)
159	startTimePb := &timestamp.Timestamp{Seconds: 1000}
160
161	// Generate the metricdata.Metric.
162	metrics := []*metricdata.Metric{
163		{
164			Descriptor: metricdata.Descriptor{
165				Name:        "ocagent.io/calls",
166				Description: "The number of the various calls",
167				Unit:        "1",
168				Type:        metricdata.TypeCumulativeInt64,
169			},
170			TimeSeries: []*metricdata.TimeSeries{
171				{
172					StartTime: startTime,
173					Points:    []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(1)*time.Second), 8)},
174				},
175			},
176		},
177		{
178			Descriptor: metricdata.Descriptor{
179				Name:        "ocagent.io/latency",
180				Description: "The latency of the various methods",
181
182				Unit: "ms",
183				Type: metricdata.TypeCumulativeDistribution,
184			},
185			TimeSeries: []*metricdata.TimeSeries{
186				{
187					StartTime: startTime,
188					Points: []metricdata.Point{
189						metricdata.NewDistributionPoint(
190							startTime.Add(time.Duration(2)*time.Second),
191							&metricdata.Distribution{
192								Count:         1,
193								Sum:           125.9,
194								Buckets:       []metricdata.Bucket{{Count: 0}, {Count: 1}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}},
195								BucketOptions: &metricdata.BucketOptions{Bounds: []float64{100, 500, 1000, 2000, 4000, 8000, 16000}},
196							}),
197					},
198				},
199			},
200		},
201		{
202			Descriptor: metricdata.Descriptor{
203				Name:        "ocagent.io/connections",
204				Description: "The count of various connections instantaneously",
205				Unit:        "1",
206				Type:        metricdata.TypeGaugeInt64,
207			},
208			TimeSeries: []*metricdata.TimeSeries{
209				{
210					Points: []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(3)*time.Second), 99)},
211				},
212			},
213		},
214		{
215			Descriptor: metricdata.Descriptor{
216				Name:        "ocagent.io/uptime",
217				Description: "The total uptime at any instance",
218				Unit:        "ms",
219				Type:        metricdata.TypeCumulativeFloat64,
220			},
221			TimeSeries: []*metricdata.TimeSeries{
222				{
223					StartTime: startTime,
224					Points:    []metricdata.Point{metricdata.NewFloat64Point(startTime.Add(time.Duration(1)*time.Second), 199903.97)},
225				},
226			},
227		},
228	}
229
230	se.ExportMetrics(context.Background(), metrics)
231	se.Flush()
232
233	// Examining the stackdriver metrics that are available.
234	var stackdriverTimeSeriesFromMetrics []*monitoringpb.CreateTimeSeriesRequest
235	server.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
236		stackdriverTimeSeriesFromMetrics = append(stackdriverTimeSeriesFromMetrics, sdt)
237	})
238	var stackdriverMetricDescriptorsFromMetrics []*monitoringpb.CreateMetricDescriptorRequest
239	server.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) {
240		stackdriverMetricDescriptorsFromMetrics = append(stackdriverMetricDescriptorsFromMetrics, sdmd)
241	})
242
243	// Reset the stackdriverTimeSeries to enable fresh collection
244	// and then comparison with the results from metrics uploads.
245	server.resetStackdriverTimeSeries()
246	server.resetStackdriverMetricDescriptors()
247
248	// Generate the proto Metrics.
249	var metricPbs []*metricspb.Metric
250	metricPbs = append(metricPbs,
251		&metricspb.Metric{
252			MetricDescriptor: &metricspb.MetricDescriptor{
253				Name:        "ocagent.io/calls",
254				Description: "The number of the various calls",
255				Unit:        "1",
256				Type:        metricspb.MetricDescriptor_CUMULATIVE_INT64,
257			},
258			Timeseries: []*metricspb.TimeSeries{
259				{
260					StartTimestamp: startTimePb,
261					Points: []*metricspb.Point{
262						{
263							Timestamp: &timestamp.Timestamp{Seconds: int64(1001)},
264							Value:     &metricspb.Point_Int64Value{Int64Value: int64(8)},
265						},
266					},
267				},
268			},
269		},
270		&metricspb.Metric{
271			MetricDescriptor: &metricspb.MetricDescriptor{
272				Name:        "ocagent.io/latency",
273				Description: "The latency of the various methods",
274				Unit:        "ms",
275				Type:        metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION,
276			},
277			Timeseries: []*metricspb.TimeSeries{
278				{
279					StartTimestamp: startTimePb,
280					Points: []*metricspb.Point{
281						{
282							Timestamp: &timestamp.Timestamp{Seconds: int64(1002)},
283							Value: &metricspb.Point_DistributionValue{
284								DistributionValue: &metricspb.DistributionValue{
285									Count: 1,
286									Sum:   125.9,
287									BucketOptions: &metricspb.DistributionValue_BucketOptions{
288										Type: &metricspb.DistributionValue_BucketOptions_Explicit_{
289											Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{Bounds: []float64{100, 500, 1000, 2000, 4000, 8000, 16000}},
290										},
291									},
292									Buckets: []*metricspb.DistributionValue_Bucket{{Count: 0}, {Count: 1}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}},
293								},
294							},
295						},
296					},
297				},
298			},
299		},
300		&metricspb.Metric{
301			MetricDescriptor: &metricspb.MetricDescriptor{
302				Name:        "ocagent.io/connections",
303				Description: "The count of various connections instantaneously",
304				Unit:        "1",
305				Type:        metricspb.MetricDescriptor_GAUGE_INT64,
306			},
307			Timeseries: []*metricspb.TimeSeries{
308				{
309					StartTimestamp: startTimePb,
310					Points: []*metricspb.Point{
311						{
312							Timestamp: &timestamp.Timestamp{Seconds: int64(1003)},
313							Value:     &metricspb.Point_Int64Value{Int64Value: 99},
314						},
315					},
316				},
317			},
318		},
319		&metricspb.Metric{
320			MetricDescriptor: &metricspb.MetricDescriptor{
321				Name:        "ocagent.io/uptime",
322				Description: "The total uptime at any instance",
323				Unit:        "ms",
324				Type:        metricspb.MetricDescriptor_CUMULATIVE_DOUBLE,
325			},
326			Timeseries: []*metricspb.TimeSeries{
327				{
328					StartTimestamp: startTimePb,
329					Points: []*metricspb.Point{
330						{
331							Timestamp: &timestamp.Timestamp{Seconds: int64(1001)},
332							Value:     &metricspb.Point_DoubleValue{DoubleValue: 199903.97},
333						},
334					},
335				},
336			},
337		})
338
339	// Export the proto Metrics to the Stackdriver backend.
340	se.PushMetricsProto(context.Background(), nil, nil, metricPbs)
341	se.Flush()
342
343	var stackdriverTimeSeriesFromMetricsPb []*monitoringpb.CreateTimeSeriesRequest
344	server.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) {
345		stackdriverTimeSeriesFromMetricsPb = append(stackdriverTimeSeriesFromMetricsPb, sdt)
346	})
347	var stackdriverMetricDescriptorsFromMetricsPb []*monitoringpb.CreateMetricDescriptorRequest
348	server.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) {
349		stackdriverMetricDescriptorsFromMetricsPb = append(stackdriverMetricDescriptorsFromMetricsPb, sdmd)
350	})
351
352	if len(stackdriverTimeSeriesFromMetrics) == 0 {
353		t.Fatalf("Failed to export timeseries with metrics")
354	}
355
356	if len(stackdriverTimeSeriesFromMetricsPb) == 0 {
357		t.Fatalf("Failed to export timeseries with metrics pb")
358	}
359
360	// The results should be equal now
361	if diff := cmpTSReqs(stackdriverTimeSeriesFromMetricsPb, stackdriverTimeSeriesFromMetrics); diff != "" {
362		t.Fatalf("Unexpected CreateTimeSeriesRequests -FromMetricsPb +FromMetrics: %s", diff)
363	}
364
365	// Examining the metric descriptors too.
366	if diff := cmpMDReqs(stackdriverMetricDescriptorsFromMetricsPb, stackdriverMetricDescriptorsFromMetrics); diff != "" {
367		t.Fatalf("Unexpected CreateMetricDescriptorRequests -FromMetricsPb +FromMetrics: %s", diff)
368	}
369}
370
371type fakeMetricsServer struct {
372	monitoringpb.MetricServiceServer
373	mu                           sync.RWMutex
374	stackdriverTimeSeries        []*monitoringpb.CreateTimeSeriesRequest
375	stackdriverMetricDescriptors []*monitoringpb.CreateMetricDescriptorRequest
376}
377
378func createFakeServer(t *testing.T) (*fakeMetricsServer, string, func()) {
379	ln, err := net.Listen("tcp", "localhost:0")
380	if err != nil {
381		t.Fatalf("Failed to bind to an available address: %v", err)
382	}
383	server := new(fakeMetricsServer)
384	srv := grpc.NewServer()
385	monitoringpb.RegisterMetricServiceServer(srv, server)
386	go func() {
387		_ = srv.Serve(ln)
388	}()
389	stop := func() {
390		srv.Stop()
391		_ = ln.Close()
392	}
393	_, agentPortStr, _ := net.SplitHostPort(ln.Addr().String())
394	return server, "localhost:" + agentPortStr, stop
395}
396
397func (server *fakeMetricsServer) forEachStackdriverTimeSeries(fn func(sdt *monitoringpb.CreateTimeSeriesRequest)) {
398	server.mu.RLock()
399	defer server.mu.RUnlock()
400
401	for _, sdt := range server.stackdriverTimeSeries {
402		fn(sdt)
403	}
404}
405
406func (server *fakeMetricsServer) forEachStackdriverMetricDescriptor(fn func(sdmd *monitoringpb.CreateMetricDescriptorRequest)) {
407	server.mu.RLock()
408	defer server.mu.RUnlock()
409
410	for _, sdmd := range server.stackdriverMetricDescriptors {
411		fn(sdmd)
412	}
413}
414
415func (server *fakeMetricsServer) resetStackdriverTimeSeries() {
416	server.mu.Lock()
417	server.stackdriverTimeSeries = server.stackdriverTimeSeries[:0]
418	server.mu.Unlock()
419}
420
421func (server *fakeMetricsServer) resetStackdriverMetricDescriptors() {
422	server.mu.Lock()
423	server.stackdriverMetricDescriptors = server.stackdriverMetricDescriptors[:0]
424	server.mu.Unlock()
425}
426
427func (server *fakeMetricsServer) CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) (*googlemetricpb.MetricDescriptor, error) {
428	server.mu.Lock()
429	server.stackdriverMetricDescriptors = append(server.stackdriverMetricDescriptors, req)
430	server.mu.Unlock()
431	return req.MetricDescriptor, nil
432}
433
434func (server *fakeMetricsServer) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*empty.Empty, error) {
435	server.mu.Lock()
436	server.stackdriverTimeSeries = append(server.stackdriverTimeSeries, req)
437	server.mu.Unlock()
438	return new(empty.Empty), nil
439}
440