1// Copyright 2018, OpenCensus Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package stackdriver 16 17import ( 18 "context" 19 "fmt" 20 "net" 21 "sync" 22 "testing" 23 "time" 24 25 "go.opencensus.io/metric/metricdata" 26 "google.golang.org/api/option" 27 "google.golang.org/grpc" 28 29 metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" 30 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" 31 32 "github.com/golang/protobuf/ptypes/empty" 33 timestamp "github.com/golang/protobuf/ptypes/timestamp" 34 googlemetricpb "google.golang.org/genproto/googleapis/api/metric" 35 monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" 36 monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" 37) 38 39func TestStatsAndMetricsEquivalence(t *testing.T) { 40 startTime := time.Unix(1000, 0) 41 startTimePb := ×tamp.Timestamp{Seconds: 1000} 42 md := metricdata.Descriptor{ 43 Name: "ocagent.io/latency", 44 Description: "The latency of the various methods", 45 Unit: "ms", 46 Type: metricdata.TypeCumulativeInt64, 47 } 48 metricDescriptor := &metricspb.MetricDescriptor{ 49 Name: "ocagent.io/latency", 50 Description: "The latency of the various methods", 51 Unit: "ms", 52 Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, 53 } 54 seenResources := make(map[*resourcepb.Resource]*monitoredrespb.MonitoredResource) 55 56 // Generate some metricdata.Metric and metrics proto. 57 var metrics []*metricdata.Metric 58 var metricPbs []*metricspb.Metric 59 for i := 0; i < 100; i++ { 60 metric := &metricdata.Metric{ 61 Descriptor: md, 62 TimeSeries: []*metricdata.TimeSeries{ 63 { 64 StartTime: startTime, 65 Points: []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(1+i)*time.Second), int64(4*(i+2)))}, 66 }, 67 }, 68 } 69 metricPb := &metricspb.Metric{ 70 MetricDescriptor: metricDescriptor, 71 Timeseries: []*metricspb.TimeSeries{ 72 { 73 StartTimestamp: startTimePb, 74 Points: []*metricspb.Point{ 75 { 76 Timestamp: ×tamp.Timestamp{Seconds: int64(1001 + i)}, 77 Value: &metricspb.Point_Int64Value{Int64Value: int64(4 * (i + 2))}, 78 }, 79 }, 80 }, 81 }, 82 } 83 metrics = append(metrics, metric) 84 metricPbs = append(metricPbs, metricPb) 85 } 86 87 // Now perform some exporting. 88 for i, metric := range metrics { 89 se := &statsExporter{ 90 o: Options{ProjectID: "equivalence", MapResource: DefaultMapResource}, 91 } 92 93 ctx := context.Background() 94 sMD, err := se.metricToMpbMetricDescriptor(metric) 95 if err != nil { 96 t.Errorf("#%d: Stats.metricToMpbMetricDescriptor: %v", i, err) 97 } 98 sMDR := &monitoringpb.CreateMetricDescriptorRequest{ 99 Name: fmt.Sprintf("projects/%s", se.o.ProjectID), 100 MetricDescriptor: sMD, 101 } 102 inMD, err := se.protoToMonitoringMetricDescriptor(metricPbs[i], nil) 103 if err != nil { 104 t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err) 105 } 106 pMDR := &monitoringpb.CreateMetricDescriptorRequest{ 107 Name: fmt.Sprintf("projects/%s", se.o.ProjectID), 108 MetricDescriptor: inMD, 109 } 110 if diff := cmpMDReq(pMDR, sMDR); diff != "" { 111 t.Fatalf("MetricDescriptor Mismatch -FromMetricsPb +FromMetrics: %s", diff) 112 } 113 114 stss, _ := se.metricToMpbTs(ctx, metric) 115 sctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(stss) 116 allTss, _ := protoMetricToTimeSeries(ctx, se, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i]) 117 pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss) 118 if diff := cmpTSReqs(pctreql, sctreql); diff != "" { 119 t.Fatalf("TimeSeries Mismatch -FromMetricsPb +FromMetrics: %s", diff) 120 } 121 } 122} 123 124// This test creates and uses a "Stackdriver backend" which receives 125// CreateTimeSeriesRequest and CreateMetricDescriptor requests 126// that the Stackdriver Metrics Proto client then sends to, as it would 127// send to Google Stackdriver backends. 128// 129// This test ensures that the final responses sent by direct stats(metricdata.Metric) exporting 130// are exactly equal to those from metricdata.Metric-->OpenCensus-Proto.Metrics exporting. 131func TestEquivalenceStatsVsMetricsUploads(t *testing.T) { 132 server, addr, doneFn := createFakeServer(t) 133 defer doneFn() 134 135 // Now create a gRPC connection to the fake Stackdriver server. 136 conn, err := grpc.Dial(addr, grpc.WithInsecure()) 137 if err != nil { 138 t.Fatalf("Failed to make a gRPC connection to the server: %v", err) 139 } 140 defer conn.Close() 141 142 // Finally create the OpenCensus stats exporter 143 exporterOptions := Options{ 144 ProjectID: "equivalence", 145 MonitoringClientOptions: []option.ClientOption{option.WithGRPCConn(conn)}, 146 147 // Setting this time delay threshold to a very large value 148 // so that batching is performed deterministically and flushing is 149 // fully controlled by us. 150 BundleDelayThreshold: 2 * time.Hour, 151 MapResource: DefaultMapResource, 152 } 153 se, err := newStatsExporter(exporterOptions) 154 if err != nil { 155 t.Fatalf("Failed to create the statsExporter: %v", err) 156 } 157 158 startTime := time.Unix(1000, 0) 159 startTimePb := ×tamp.Timestamp{Seconds: 1000} 160 161 // Generate the metricdata.Metric. 162 metrics := []*metricdata.Metric{ 163 { 164 Descriptor: metricdata.Descriptor{ 165 Name: "ocagent.io/calls", 166 Description: "The number of the various calls", 167 Unit: "1", 168 Type: metricdata.TypeCumulativeInt64, 169 }, 170 TimeSeries: []*metricdata.TimeSeries{ 171 { 172 StartTime: startTime, 173 Points: []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(1)*time.Second), 8)}, 174 }, 175 }, 176 }, 177 { 178 Descriptor: metricdata.Descriptor{ 179 Name: "ocagent.io/latency", 180 Description: "The latency of the various methods", 181 182 Unit: "ms", 183 Type: metricdata.TypeCumulativeDistribution, 184 }, 185 TimeSeries: []*metricdata.TimeSeries{ 186 { 187 StartTime: startTime, 188 Points: []metricdata.Point{ 189 metricdata.NewDistributionPoint( 190 startTime.Add(time.Duration(2)*time.Second), 191 &metricdata.Distribution{ 192 Count: 1, 193 Sum: 125.9, 194 Buckets: []metricdata.Bucket{{Count: 0}, {Count: 1}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}}, 195 BucketOptions: &metricdata.BucketOptions{Bounds: []float64{100, 500, 1000, 2000, 4000, 8000, 16000}}, 196 }), 197 }, 198 }, 199 }, 200 }, 201 { 202 Descriptor: metricdata.Descriptor{ 203 Name: "ocagent.io/connections", 204 Description: "The count of various connections instantaneously", 205 Unit: "1", 206 Type: metricdata.TypeGaugeInt64, 207 }, 208 TimeSeries: []*metricdata.TimeSeries{ 209 { 210 Points: []metricdata.Point{metricdata.NewInt64Point(startTime.Add(time.Duration(3)*time.Second), 99)}, 211 }, 212 }, 213 }, 214 { 215 Descriptor: metricdata.Descriptor{ 216 Name: "ocagent.io/uptime", 217 Description: "The total uptime at any instance", 218 Unit: "ms", 219 Type: metricdata.TypeCumulativeFloat64, 220 }, 221 TimeSeries: []*metricdata.TimeSeries{ 222 { 223 StartTime: startTime, 224 Points: []metricdata.Point{metricdata.NewFloat64Point(startTime.Add(time.Duration(1)*time.Second), 199903.97)}, 225 }, 226 }, 227 }, 228 } 229 230 se.ExportMetrics(context.Background(), metrics) 231 se.Flush() 232 233 // Examining the stackdriver metrics that are available. 234 var stackdriverTimeSeriesFromMetrics []*monitoringpb.CreateTimeSeriesRequest 235 server.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) { 236 stackdriverTimeSeriesFromMetrics = append(stackdriverTimeSeriesFromMetrics, sdt) 237 }) 238 var stackdriverMetricDescriptorsFromMetrics []*monitoringpb.CreateMetricDescriptorRequest 239 server.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) { 240 stackdriverMetricDescriptorsFromMetrics = append(stackdriverMetricDescriptorsFromMetrics, sdmd) 241 }) 242 243 // Reset the stackdriverTimeSeries to enable fresh collection 244 // and then comparison with the results from metrics uploads. 245 server.resetStackdriverTimeSeries() 246 server.resetStackdriverMetricDescriptors() 247 248 // Generate the proto Metrics. 249 var metricPbs []*metricspb.Metric 250 metricPbs = append(metricPbs, 251 &metricspb.Metric{ 252 MetricDescriptor: &metricspb.MetricDescriptor{ 253 Name: "ocagent.io/calls", 254 Description: "The number of the various calls", 255 Unit: "1", 256 Type: metricspb.MetricDescriptor_CUMULATIVE_INT64, 257 }, 258 Timeseries: []*metricspb.TimeSeries{ 259 { 260 StartTimestamp: startTimePb, 261 Points: []*metricspb.Point{ 262 { 263 Timestamp: ×tamp.Timestamp{Seconds: int64(1001)}, 264 Value: &metricspb.Point_Int64Value{Int64Value: int64(8)}, 265 }, 266 }, 267 }, 268 }, 269 }, 270 &metricspb.Metric{ 271 MetricDescriptor: &metricspb.MetricDescriptor{ 272 Name: "ocagent.io/latency", 273 Description: "The latency of the various methods", 274 Unit: "ms", 275 Type: metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION, 276 }, 277 Timeseries: []*metricspb.TimeSeries{ 278 { 279 StartTimestamp: startTimePb, 280 Points: []*metricspb.Point{ 281 { 282 Timestamp: ×tamp.Timestamp{Seconds: int64(1002)}, 283 Value: &metricspb.Point_DistributionValue{ 284 DistributionValue: &metricspb.DistributionValue{ 285 Count: 1, 286 Sum: 125.9, 287 BucketOptions: &metricspb.DistributionValue_BucketOptions{ 288 Type: &metricspb.DistributionValue_BucketOptions_Explicit_{ 289 Explicit: &metricspb.DistributionValue_BucketOptions_Explicit{Bounds: []float64{100, 500, 1000, 2000, 4000, 8000, 16000}}, 290 }, 291 }, 292 Buckets: []*metricspb.DistributionValue_Bucket{{Count: 0}, {Count: 1}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}, {Count: 0}}, 293 }, 294 }, 295 }, 296 }, 297 }, 298 }, 299 }, 300 &metricspb.Metric{ 301 MetricDescriptor: &metricspb.MetricDescriptor{ 302 Name: "ocagent.io/connections", 303 Description: "The count of various connections instantaneously", 304 Unit: "1", 305 Type: metricspb.MetricDescriptor_GAUGE_INT64, 306 }, 307 Timeseries: []*metricspb.TimeSeries{ 308 { 309 StartTimestamp: startTimePb, 310 Points: []*metricspb.Point{ 311 { 312 Timestamp: ×tamp.Timestamp{Seconds: int64(1003)}, 313 Value: &metricspb.Point_Int64Value{Int64Value: 99}, 314 }, 315 }, 316 }, 317 }, 318 }, 319 &metricspb.Metric{ 320 MetricDescriptor: &metricspb.MetricDescriptor{ 321 Name: "ocagent.io/uptime", 322 Description: "The total uptime at any instance", 323 Unit: "ms", 324 Type: metricspb.MetricDescriptor_CUMULATIVE_DOUBLE, 325 }, 326 Timeseries: []*metricspb.TimeSeries{ 327 { 328 StartTimestamp: startTimePb, 329 Points: []*metricspb.Point{ 330 { 331 Timestamp: ×tamp.Timestamp{Seconds: int64(1001)}, 332 Value: &metricspb.Point_DoubleValue{DoubleValue: 199903.97}, 333 }, 334 }, 335 }, 336 }, 337 }) 338 339 // Export the proto Metrics to the Stackdriver backend. 340 se.PushMetricsProto(context.Background(), nil, nil, metricPbs) 341 se.Flush() 342 343 var stackdriverTimeSeriesFromMetricsPb []*monitoringpb.CreateTimeSeriesRequest 344 server.forEachStackdriverTimeSeries(func(sdt *monitoringpb.CreateTimeSeriesRequest) { 345 stackdriverTimeSeriesFromMetricsPb = append(stackdriverTimeSeriesFromMetricsPb, sdt) 346 }) 347 var stackdriverMetricDescriptorsFromMetricsPb []*monitoringpb.CreateMetricDescriptorRequest 348 server.forEachStackdriverMetricDescriptor(func(sdmd *monitoringpb.CreateMetricDescriptorRequest) { 349 stackdriverMetricDescriptorsFromMetricsPb = append(stackdriverMetricDescriptorsFromMetricsPb, sdmd) 350 }) 351 352 if len(stackdriverTimeSeriesFromMetrics) == 0 { 353 t.Fatalf("Failed to export timeseries with metrics") 354 } 355 356 if len(stackdriverTimeSeriesFromMetricsPb) == 0 { 357 t.Fatalf("Failed to export timeseries with metrics pb") 358 } 359 360 // The results should be equal now 361 if diff := cmpTSReqs(stackdriverTimeSeriesFromMetricsPb, stackdriverTimeSeriesFromMetrics); diff != "" { 362 t.Fatalf("Unexpected CreateTimeSeriesRequests -FromMetricsPb +FromMetrics: %s", diff) 363 } 364 365 // Examining the metric descriptors too. 366 if diff := cmpMDReqs(stackdriverMetricDescriptorsFromMetricsPb, stackdriverMetricDescriptorsFromMetrics); diff != "" { 367 t.Fatalf("Unexpected CreateMetricDescriptorRequests -FromMetricsPb +FromMetrics: %s", diff) 368 } 369} 370 371type fakeMetricsServer struct { 372 monitoringpb.MetricServiceServer 373 mu sync.RWMutex 374 stackdriverTimeSeries []*monitoringpb.CreateTimeSeriesRequest 375 stackdriverMetricDescriptors []*monitoringpb.CreateMetricDescriptorRequest 376} 377 378func createFakeServer(t *testing.T) (*fakeMetricsServer, string, func()) { 379 ln, err := net.Listen("tcp", "localhost:0") 380 if err != nil { 381 t.Fatalf("Failed to bind to an available address: %v", err) 382 } 383 server := new(fakeMetricsServer) 384 srv := grpc.NewServer() 385 monitoringpb.RegisterMetricServiceServer(srv, server) 386 go func() { 387 _ = srv.Serve(ln) 388 }() 389 stop := func() { 390 srv.Stop() 391 _ = ln.Close() 392 } 393 _, agentPortStr, _ := net.SplitHostPort(ln.Addr().String()) 394 return server, "localhost:" + agentPortStr, stop 395} 396 397func (server *fakeMetricsServer) forEachStackdriverTimeSeries(fn func(sdt *monitoringpb.CreateTimeSeriesRequest)) { 398 server.mu.RLock() 399 defer server.mu.RUnlock() 400 401 for _, sdt := range server.stackdriverTimeSeries { 402 fn(sdt) 403 } 404} 405 406func (server *fakeMetricsServer) forEachStackdriverMetricDescriptor(fn func(sdmd *monitoringpb.CreateMetricDescriptorRequest)) { 407 server.mu.RLock() 408 defer server.mu.RUnlock() 409 410 for _, sdmd := range server.stackdriverMetricDescriptors { 411 fn(sdmd) 412 } 413} 414 415func (server *fakeMetricsServer) resetStackdriverTimeSeries() { 416 server.mu.Lock() 417 server.stackdriverTimeSeries = server.stackdriverTimeSeries[:0] 418 server.mu.Unlock() 419} 420 421func (server *fakeMetricsServer) resetStackdriverMetricDescriptors() { 422 server.mu.Lock() 423 server.stackdriverMetricDescriptors = server.stackdriverMetricDescriptors[:0] 424 server.mu.Unlock() 425} 426 427func (server *fakeMetricsServer) CreateMetricDescriptor(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) (*googlemetricpb.MetricDescriptor, error) { 428 server.mu.Lock() 429 server.stackdriverMetricDescriptors = append(server.stackdriverMetricDescriptors, req) 430 server.mu.Unlock() 431 return req.MetricDescriptor, nil 432} 433 434func (server *fakeMetricsServer) CreateTimeSeries(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*empty.Empty, error) { 435 server.mu.Lock() 436 server.stackdriverTimeSeries = append(server.stackdriverTimeSeries, req) 437 server.mu.Unlock() 438 return new(empty.Empty), nil 439} 440