1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package otlpgrpc_test
16
17import (
18	"context"
19	"fmt"
20	"net"
21	"sync"
22	"testing"
23	"time"
24
25	"google.golang.org/grpc"
26	metadata "google.golang.org/grpc/metadata"
27
28	collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
29	collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
30	metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
31	tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
32	"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
33)
34
35func makeMockCollector(t *testing.T) *mockCollector {
36	return &mockCollector{
37		t: t,
38		traceSvc: &mockTraceService{
39			storage: otlptest.NewSpansStorage(),
40		},
41		metricSvc: &mockMetricService{
42			storage: otlptest.NewMetricsStorage(),
43		},
44	}
45}
46
47type mockTraceService struct {
48	mu      sync.RWMutex
49	storage otlptest.SpansStorage
50	headers metadata.MD
51}
52
53func (mts *mockTraceService) getHeaders() metadata.MD {
54	mts.mu.RLock()
55	defer mts.mu.RUnlock()
56	return mts.headers
57}
58
59func (mts *mockTraceService) getSpans() []*tracepb.Span {
60	mts.mu.RLock()
61	defer mts.mu.RUnlock()
62	return mts.storage.GetSpans()
63}
64
65func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans {
66	mts.mu.RLock()
67	defer mts.mu.RUnlock()
68	return mts.storage.GetResourceSpans()
69}
70
71func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) {
72	reply := &collectortracepb.ExportTraceServiceResponse{}
73	mts.mu.Lock()
74	defer mts.mu.Unlock()
75	mts.headers, _ = metadata.FromIncomingContext(ctx)
76	mts.storage.AddSpans(exp)
77	return reply, nil
78}
79
80type mockMetricService struct {
81	mu      sync.RWMutex
82	storage otlptest.MetricsStorage
83}
84
85func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
86	mms.mu.RLock()
87	defer mms.mu.RUnlock()
88	return mms.storage.GetMetrics()
89}
90
91func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) {
92	reply := &collectormetricpb.ExportMetricsServiceResponse{}
93	mms.mu.Lock()
94	defer mms.mu.Unlock()
95	mms.storage.AddMetrics(exp)
96	return reply, nil
97}
98
99type mockCollector struct {
100	t *testing.T
101
102	traceSvc  *mockTraceService
103	metricSvc *mockMetricService
104
105	endpoint string
106	stopFunc func() error
107	stopOnce sync.Once
108}
109
110var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil)
111var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil)
112
113var errAlreadyStopped = fmt.Errorf("already stopped")
114
115func (mc *mockCollector) stop() error {
116	var err = errAlreadyStopped
117	mc.stopOnce.Do(func() {
118		if mc.stopFunc != nil {
119			err = mc.stopFunc()
120		}
121	})
122	// Give it sometime to shutdown.
123	<-time.After(160 * time.Millisecond)
124
125	// Wait for services to finish reading/writing.
126	var wg sync.WaitGroup
127	wg.Add(1)
128	go func() {
129		// Getting the lock ensures the traceSvc is done flushing.
130		mc.traceSvc.mu.Lock()
131		defer mc.traceSvc.mu.Unlock()
132		wg.Done()
133	}()
134	wg.Add(1)
135	go func() {
136		// Getting the lock ensures the metricSvc is done flushing.
137		mc.metricSvc.mu.Lock()
138		defer mc.metricSvc.mu.Unlock()
139		wg.Done()
140	}()
141	wg.Wait()
142	return err
143}
144
145func (mc *mockCollector) Stop() error {
146	return mc.stop()
147}
148
149func (mc *mockCollector) getSpans() []*tracepb.Span {
150	return mc.traceSvc.getSpans()
151}
152
153func (mc *mockCollector) getResourceSpans() []*tracepb.ResourceSpans {
154	return mc.traceSvc.getResourceSpans()
155}
156
157func (mc *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans {
158	return mc.getResourceSpans()
159}
160
161func (mc *mockCollector) getHeaders() metadata.MD {
162	return mc.traceSvc.getHeaders()
163}
164
165func (mc *mockCollector) getMetrics() []*metricpb.Metric {
166	return mc.metricSvc.getMetrics()
167}
168
169func (mc *mockCollector) GetMetrics() []*metricpb.Metric {
170	return mc.getMetrics()
171}
172
173// runMockCollector is a helper function to create a mock Collector
174func runMockCollector(t *testing.T) *mockCollector {
175	return runMockCollectorAtEndpoint(t, "localhost:0")
176}
177
178func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
179	ln, err := net.Listen("tcp", endpoint)
180	if err != nil {
181		t.Fatalf("Failed to get an endpoint: %v", err)
182	}
183
184	srv := grpc.NewServer()
185	mc := makeMockCollector(t)
186	collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
187	collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
188	go func() {
189		_ = srv.Serve(ln)
190	}()
191
192	deferFunc := func() error {
193		srv.Stop()
194		return ln.Close()
195	}
196
197	_, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String())
198
199	mc.endpoint = "localhost:" + collectorPortStr
200	mc.stopFunc = deferFunc
201
202	return mc
203}
204