1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package otlphttp_test
16
17import (
18	"bytes"
19	"compress/gzip"
20	"context"
21	"crypto/tls"
22	"fmt"
23	"io"
24	"io/ioutil"
25	"net"
26	"net/http"
27	"sync"
28	"testing"
29
30	"github.com/gogo/protobuf/jsonpb"
31	"github.com/stretchr/testify/assert"
32	"github.com/stretchr/testify/require"
33
34	collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
35	collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
36	metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
37	tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
38	"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
39	"go.opentelemetry.io/otel/exporters/otlp/otlphttp"
40)
41
42type mockCollector struct {
43	endpoint string
44	server   *http.Server
45
46	spanLock     sync.Mutex
47	spansStorage otlptest.SpansStorage
48
49	metricLock     sync.Mutex
50	metricsStorage otlptest.MetricsStorage
51
52	injectHTTPStatus  []int
53	injectContentType string
54
55	clientTLSConfig *tls.Config
56	expectedHeaders map[string]string
57}
58
59func (c *mockCollector) Stop() error {
60	return c.server.Shutdown(context.Background())
61}
62
63func (c *mockCollector) MustStop(t *testing.T) {
64	assert.NoError(t, c.server.Shutdown(context.Background()))
65}
66
67func (c *mockCollector) GetSpans() []*tracepb.Span {
68	c.spanLock.Lock()
69	defer c.spanLock.Unlock()
70	return c.spansStorage.GetSpans()
71}
72
73func (c *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans {
74	c.spanLock.Lock()
75	defer c.spanLock.Unlock()
76	return c.spansStorage.GetResourceSpans()
77}
78
79func (c *mockCollector) GetMetrics() []*metricpb.Metric {
80	c.metricLock.Lock()
81	defer c.metricLock.Unlock()
82	return c.metricsStorage.GetMetrics()
83}
84
85func (c *mockCollector) Endpoint() string {
86	return c.endpoint
87}
88
89func (c *mockCollector) ClientTLSConfig() *tls.Config {
90	return c.clientTLSConfig
91}
92
93func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
94	if !c.checkHeaders(r) {
95		w.WriteHeader(http.StatusBadRequest)
96		return
97	}
98	response := collectormetricpb.ExportMetricsServiceResponse{}
99	rawResponse, err := response.Marshal()
100	if err != nil {
101		w.WriteHeader(http.StatusInternalServerError)
102		return
103	}
104	if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
105		writeReply(w, rawResponse, injectedStatus, c.injectContentType)
106		return
107	}
108	rawRequest, err := readRequest(r)
109	if err != nil {
110		w.WriteHeader(http.StatusInternalServerError)
111		return
112	}
113	request, err := unmarshalMetricsRequest(rawRequest, r.Header.Get("content-type"))
114	if err != nil {
115		w.WriteHeader(http.StatusBadRequest)
116		return
117	}
118	writeReply(w, rawResponse, 0, c.injectContentType)
119	c.metricLock.Lock()
120	defer c.metricLock.Unlock()
121	c.metricsStorage.AddMetrics(request)
122}
123
124func unmarshalMetricsRequest(rawRequest []byte, contentType string) (*collectormetricpb.ExportMetricsServiceRequest, error) {
125	request := &collectormetricpb.ExportMetricsServiceRequest{}
126	if contentType == "application/json" {
127		err := jsonpb.UnmarshalString(string(rawRequest), request)
128		return request, err
129	}
130	err := request.Unmarshal(rawRequest)
131	return request, err
132}
133
134func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
135	if !c.checkHeaders(r) {
136		w.WriteHeader(http.StatusBadRequest)
137		return
138	}
139	response := collectortracepb.ExportTraceServiceResponse{}
140	rawResponse, err := response.Marshal()
141	if err != nil {
142		w.WriteHeader(http.StatusInternalServerError)
143		return
144	}
145	if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
146		writeReply(w, rawResponse, injectedStatus, c.injectContentType)
147		return
148	}
149	rawRequest, err := readRequest(r)
150	if err != nil {
151		w.WriteHeader(http.StatusInternalServerError)
152		return
153	}
154
155	request, err := unmarshalTraceRequest(rawRequest, r.Header.Get("content-type"))
156	if err != nil {
157		w.WriteHeader(http.StatusBadRequest)
158		return
159	}
160	writeReply(w, rawResponse, 0, c.injectContentType)
161	c.spanLock.Lock()
162	defer c.spanLock.Unlock()
163	c.spansStorage.AddSpans(request)
164}
165
166func unmarshalTraceRequest(rawRequest []byte, contentType string) (*collectortracepb.ExportTraceServiceRequest, error) {
167	request := &collectortracepb.ExportTraceServiceRequest{}
168	if contentType == "application/json" {
169		err := jsonpb.UnmarshalString(string(rawRequest), request)
170		return request, err
171	}
172	err := request.Unmarshal(rawRequest)
173	return request, err
174}
175
176func (c *mockCollector) checkHeaders(r *http.Request) bool {
177	for k, v := range c.expectedHeaders {
178		got := r.Header.Get(k)
179		if got != v {
180			return false
181		}
182	}
183	return true
184}
185
186func (c *mockCollector) getInjectHTTPStatus() int {
187	if len(c.injectHTTPStatus) == 0 {
188		return 0
189	}
190	status := c.injectHTTPStatus[0]
191	c.injectHTTPStatus = c.injectHTTPStatus[1:]
192	if len(c.injectHTTPStatus) == 0 {
193		c.injectHTTPStatus = nil
194	}
195	return status
196}
197
198func readRequest(r *http.Request) ([]byte, error) {
199	if r.Header.Get("Content-Encoding") == "gzip" {
200		return readGzipBody(r.Body)
201	}
202	return ioutil.ReadAll(r.Body)
203}
204
205func readGzipBody(body io.Reader) ([]byte, error) {
206	rawRequest := bytes.Buffer{}
207	gunzipper, err := gzip.NewReader(body)
208	if err != nil {
209		return nil, err
210	}
211	defer gunzipper.Close()
212	_, err = io.Copy(&rawRequest, gunzipper)
213	if err != nil {
214		return nil, err
215	}
216	return rawRequest.Bytes(), nil
217}
218
219func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) {
220	status := http.StatusOK
221	if injectHTTPStatus != 0 {
222		status = injectHTTPStatus
223	}
224	contentType := "application/x-protobuf"
225	if injectContentType != "" {
226		contentType = injectContentType
227	}
228	w.Header().Set("Content-Type", contentType)
229	w.WriteHeader(status)
230	_, _ = w.Write(rawResponse)
231}
232
233type mockCollectorConfig struct {
234	MetricsURLPath    string
235	TracesURLPath     string
236	Port              int
237	InjectHTTPStatus  []int
238	InjectContentType string
239	WithTLS           bool
240	ExpectedHeaders   map[string]string
241}
242
243func (c *mockCollectorConfig) fillInDefaults() {
244	if c.MetricsURLPath == "" {
245		c.MetricsURLPath = otlphttp.DefaultMetricsPath
246	}
247	if c.TracesURLPath == "" {
248		c.TracesURLPath = otlphttp.DefaultTracesPath
249	}
250}
251
252func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
253	cfg.fillInDefaults()
254	ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port))
255	require.NoError(t, err)
256	_, portStr, err := net.SplitHostPort(ln.Addr().String())
257	require.NoError(t, err)
258	m := &mockCollector{
259		endpoint:          fmt.Sprintf("localhost:%s", portStr),
260		spansStorage:      otlptest.NewSpansStorage(),
261		metricsStorage:    otlptest.NewMetricsStorage(),
262		injectHTTPStatus:  cfg.InjectHTTPStatus,
263		injectContentType: cfg.InjectContentType,
264		expectedHeaders:   cfg.ExpectedHeaders,
265	}
266	mux := http.NewServeMux()
267	mux.Handle(cfg.MetricsURLPath, http.HandlerFunc(m.serveMetrics))
268	mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces))
269	server := &http.Server{
270		Handler: mux,
271	}
272	if cfg.WithTLS {
273		pem, err := generateWeakCertificate()
274		require.NoError(t, err)
275		tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey)
276		require.NoError(t, err)
277		server.TLSConfig = &tls.Config{
278			Certificates: []tls.Certificate{tlsCertificate},
279		}
280
281		m.clientTLSConfig = &tls.Config{
282			InsecureSkipVerify: true,
283		}
284	}
285	go func() {
286		if cfg.WithTLS {
287			_ = server.ServeTLS(ln, "", "")
288		} else {
289			_ = server.Serve(ln)
290		}
291	}()
292	m.server = server
293	return m
294}
295