1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package otlpmetricgrpc_test
16
17import (
18	"context"
19	"fmt"
20	"net"
21	"runtime"
22	"strings"
23	"sync"
24	"testing"
25	"time"
26
27	"google.golang.org/grpc/metadata"
28
29	"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest"
30
31	"google.golang.org/grpc"
32
33	collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
34	metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
35)
36
37func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector {
38	return &mockCollector{
39		t: t,
40		metricSvc: &mockMetricService{
41			storage: otlpmetrictest.NewMetricsStorage(),
42			errors:  mockConfig.errors,
43		},
44	}
45}
46
47type mockMetricService struct {
48	collectormetricpb.UnimplementedMetricsServiceServer
49
50	requests int
51	errors   []error
52
53	headers metadata.MD
54	mu      sync.RWMutex
55	storage otlpmetrictest.MetricsStorage
56	delay   time.Duration
57}
58
59func (mms *mockMetricService) getHeaders() metadata.MD {
60	mms.mu.RLock()
61	defer mms.mu.RUnlock()
62	return mms.headers
63}
64
65func (mms *mockMetricService) getMetrics() []*metricpb.Metric {
66	mms.mu.RLock()
67	defer mms.mu.RUnlock()
68	return mms.storage.GetMetrics()
69}
70
71func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) {
72	if mms.delay > 0 {
73		time.Sleep(mms.delay)
74	}
75
76	mms.mu.Lock()
77	defer func() {
78		mms.requests++
79		mms.mu.Unlock()
80	}()
81
82	reply := &collectormetricpb.ExportMetricsServiceResponse{}
83	if mms.requests < len(mms.errors) {
84		idx := mms.requests
85		return reply, mms.errors[idx]
86	}
87
88	mms.headers, _ = metadata.FromIncomingContext(ctx)
89	mms.storage.AddMetrics(exp)
90	return reply, nil
91}
92
93type mockCollector struct {
94	t *testing.T
95
96	metricSvc *mockMetricService
97
98	endpoint string
99	ln       *listener
100	stopFunc func()
101	stopOnce sync.Once
102}
103
104type mockConfig struct {
105	errors   []error
106	endpoint string
107}
108
109var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil)
110
111var errAlreadyStopped = fmt.Errorf("already stopped")
112
113func (mc *mockCollector) stop() error {
114	var err = errAlreadyStopped
115	mc.stopOnce.Do(func() {
116		err = nil
117		if mc.stopFunc != nil {
118			mc.stopFunc()
119		}
120	})
121	// Give it sometime to shutdown.
122	<-time.After(160 * time.Millisecond)
123
124	// Wait for services to finish reading/writing.
125	// Getting the lock ensures the metricSvc is done flushing.
126	mc.metricSvc.mu.Lock()
127	defer mc.metricSvc.mu.Unlock()
128	return err
129}
130
131func (mc *mockCollector) Stop() error {
132	return mc.stop()
133}
134
135func (mc *mockCollector) getHeaders() metadata.MD {
136	return mc.metricSvc.getHeaders()
137}
138
139func (mc *mockCollector) getMetrics() []*metricpb.Metric {
140	return mc.metricSvc.getMetrics()
141}
142
143func (mc *mockCollector) GetMetrics() []*metricpb.Metric {
144	return mc.getMetrics()
145}
146
147// runMockCollector is a helper function to create a mock Collector
148func runMockCollector(t *testing.T) *mockCollector {
149	return runMockCollectorAtEndpoint(t, "localhost:0")
150}
151
152func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
153	return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint})
154}
155
156func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector {
157	ln, err := net.Listen("tcp", mockConfig.endpoint)
158	if err != nil {
159		t.Fatalf("Failed to get an endpoint: %v", err)
160	}
161
162	srv := grpc.NewServer()
163	mc := makeMockCollector(t, mockConfig)
164	collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
165	mc.ln = newListener(ln)
166	go func() {
167		_ = srv.Serve((net.Listener)(mc.ln))
168	}()
169
170	mc.endpoint = ln.Addr().String()
171	// srv.Stop calls Close on mc.ln.
172	mc.stopFunc = srv.Stop
173
174	return mc
175}
176
177type listener struct {
178	closeOnce sync.Once
179	wrapped   net.Listener
180	C         chan struct{}
181}
182
183func newListener(wrapped net.Listener) *listener {
184	return &listener{
185		wrapped: wrapped,
186		C:       make(chan struct{}, 1),
187	}
188}
189
190func (l *listener) Close() error { return l.wrapped.Close() }
191
192func (l *listener) Addr() net.Addr { return l.wrapped.Addr() }
193
194// Accept waits for and returns the next connection to the listener. It will
195// send a signal on l.C that a connection has been made before returning.
196func (l *listener) Accept() (net.Conn, error) {
197	conn, err := l.wrapped.Accept()
198	if err != nil {
199		// Go 1.16 exported net.ErrClosed that could clean up this check, but to
200		// remain backwards compatible with previous versions of Go that we
201		// support the following string evaluation is used instead to keep in line
202		// with the previously recommended way to check this:
203		// https://github.com/golang/go/issues/4373#issuecomment-353076799
204		if strings.Contains(err.Error(), "use of closed network connection") {
205			// If the listener has been closed, do not allow callers of
206			// WaitForConn to wait for a connection that will never come.
207			l.closeOnce.Do(func() { close(l.C) })
208		}
209		return conn, err
210	}
211
212	select {
213	case l.C <- struct{}{}:
214	default:
215		// If C is full, assume nobody is listening and move on.
216	}
217	return conn, nil
218}
219
220// WaitForConn will wait indefintely for a connection to be estabilished with
221// the listener before returning.
222func (l *listener) WaitForConn() {
223	for {
224		select {
225		case <-l.C:
226			return
227		default:
228			runtime.Gosched()
229		}
230	}
231}
232