1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package otlpgrpc_test 16 17import ( 18 "context" 19 "fmt" 20 "net" 21 "sync" 22 "testing" 23 "time" 24 25 "google.golang.org/grpc" 26 metadata "google.golang.org/grpc/metadata" 27 28 collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" 29 collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" 30 metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" 31 tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" 32 "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" 33) 34 35func makeMockCollector(t *testing.T) *mockCollector { 36 return &mockCollector{ 37 t: t, 38 traceSvc: &mockTraceService{ 39 storage: otlptest.NewSpansStorage(), 40 }, 41 metricSvc: &mockMetricService{ 42 storage: otlptest.NewMetricsStorage(), 43 }, 44 } 45} 46 47type mockTraceService struct { 48 mu sync.RWMutex 49 storage otlptest.SpansStorage 50 headers metadata.MD 51} 52 53func (mts *mockTraceService) getHeaders() metadata.MD { 54 mts.mu.RLock() 55 defer mts.mu.RUnlock() 56 return mts.headers 57} 58 59func (mts *mockTraceService) getSpans() []*tracepb.Span { 60 mts.mu.RLock() 61 defer mts.mu.RUnlock() 62 return mts.storage.GetSpans() 63} 64 65func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans { 66 mts.mu.RLock() 67 defer mts.mu.RUnlock() 68 return mts.storage.GetResourceSpans() 69} 70 71func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) { 72 reply := &collectortracepb.ExportTraceServiceResponse{} 73 mts.mu.Lock() 74 defer mts.mu.Unlock() 75 mts.headers, _ = metadata.FromIncomingContext(ctx) 76 mts.storage.AddSpans(exp) 77 return reply, nil 78} 79 80type mockMetricService struct { 81 mu sync.RWMutex 82 storage otlptest.MetricsStorage 83} 84 85func (mms *mockMetricService) getMetrics() []*metricpb.Metric { 86 mms.mu.RLock() 87 defer mms.mu.RUnlock() 88 return mms.storage.GetMetrics() 89} 90 91func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) { 92 reply := &collectormetricpb.ExportMetricsServiceResponse{} 93 mms.mu.Lock() 94 defer mms.mu.Unlock() 95 mms.storage.AddMetrics(exp) 96 return reply, nil 97} 98 99type mockCollector struct { 100 t *testing.T 101 102 traceSvc *mockTraceService 103 metricSvc *mockMetricService 104 105 endpoint string 106 stopFunc func() error 107 stopOnce sync.Once 108} 109 110var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil) 111var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil) 112 113var errAlreadyStopped = fmt.Errorf("already stopped") 114 115func (mc *mockCollector) stop() error { 116 var err = errAlreadyStopped 117 mc.stopOnce.Do(func() { 118 if mc.stopFunc != nil { 119 err = mc.stopFunc() 120 } 121 }) 122 // Give it sometime to shutdown. 123 <-time.After(160 * time.Millisecond) 124 125 // Wait for services to finish reading/writing. 126 var wg sync.WaitGroup 127 wg.Add(1) 128 go func() { 129 // Getting the lock ensures the traceSvc is done flushing. 130 mc.traceSvc.mu.Lock() 131 defer mc.traceSvc.mu.Unlock() 132 wg.Done() 133 }() 134 wg.Add(1) 135 go func() { 136 // Getting the lock ensures the metricSvc is done flushing. 137 mc.metricSvc.mu.Lock() 138 defer mc.metricSvc.mu.Unlock() 139 wg.Done() 140 }() 141 wg.Wait() 142 return err 143} 144 145func (mc *mockCollector) Stop() error { 146 return mc.stop() 147} 148 149func (mc *mockCollector) getSpans() []*tracepb.Span { 150 return mc.traceSvc.getSpans() 151} 152 153func (mc *mockCollector) getResourceSpans() []*tracepb.ResourceSpans { 154 return mc.traceSvc.getResourceSpans() 155} 156 157func (mc *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans { 158 return mc.getResourceSpans() 159} 160 161func (mc *mockCollector) getHeaders() metadata.MD { 162 return mc.traceSvc.getHeaders() 163} 164 165func (mc *mockCollector) getMetrics() []*metricpb.Metric { 166 return mc.metricSvc.getMetrics() 167} 168 169func (mc *mockCollector) GetMetrics() []*metricpb.Metric { 170 return mc.getMetrics() 171} 172 173// runMockCollector is a helper function to create a mock Collector 174func runMockCollector(t *testing.T) *mockCollector { 175 return runMockCollectorAtEndpoint(t, "localhost:0") 176} 177 178func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector { 179 ln, err := net.Listen("tcp", endpoint) 180 if err != nil { 181 t.Fatalf("Failed to get an endpoint: %v", err) 182 } 183 184 srv := grpc.NewServer() 185 mc := makeMockCollector(t) 186 collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc) 187 collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) 188 go func() { 189 _ = srv.Serve(ln) 190 }() 191 192 deferFunc := func() error { 193 srv.Stop() 194 return ln.Close() 195 } 196 197 _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) 198 199 mc.endpoint = "localhost:" + collectorPortStr 200 mc.stopFunc = deferFunc 201 202 return mc 203} 204