1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package otlphttp_test 16 17import ( 18 "bytes" 19 "compress/gzip" 20 "context" 21 "crypto/tls" 22 "fmt" 23 "io" 24 "io/ioutil" 25 "net" 26 "net/http" 27 "sync" 28 "testing" 29 30 "github.com/gogo/protobuf/jsonpb" 31 "github.com/stretchr/testify/assert" 32 "github.com/stretchr/testify/require" 33 34 collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" 35 collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" 36 metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" 37 tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" 38 "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" 39 "go.opentelemetry.io/otel/exporters/otlp/otlphttp" 40) 41 42type mockCollector struct { 43 endpoint string 44 server *http.Server 45 46 spanLock sync.Mutex 47 spansStorage otlptest.SpansStorage 48 49 metricLock sync.Mutex 50 metricsStorage otlptest.MetricsStorage 51 52 injectHTTPStatus []int 53 injectContentType string 54 55 clientTLSConfig *tls.Config 56 expectedHeaders map[string]string 57} 58 59func (c *mockCollector) Stop() error { 60 return c.server.Shutdown(context.Background()) 61} 62 63func (c *mockCollector) MustStop(t *testing.T) { 64 assert.NoError(t, c.server.Shutdown(context.Background())) 65} 66 67func (c *mockCollector) GetSpans() []*tracepb.Span { 68 c.spanLock.Lock() 69 defer c.spanLock.Unlock() 70 return c.spansStorage.GetSpans() 71} 72 73func (c *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans { 74 c.spanLock.Lock() 75 defer c.spanLock.Unlock() 76 return c.spansStorage.GetResourceSpans() 77} 78 79func (c *mockCollector) GetMetrics() []*metricpb.Metric { 80 c.metricLock.Lock() 81 defer c.metricLock.Unlock() 82 return c.metricsStorage.GetMetrics() 83} 84 85func (c *mockCollector) Endpoint() string { 86 return c.endpoint 87} 88 89func (c *mockCollector) ClientTLSConfig() *tls.Config { 90 return c.clientTLSConfig 91} 92 93func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) { 94 if !c.checkHeaders(r) { 95 w.WriteHeader(http.StatusBadRequest) 96 return 97 } 98 response := collectormetricpb.ExportMetricsServiceResponse{} 99 rawResponse, err := response.Marshal() 100 if err != nil { 101 w.WriteHeader(http.StatusInternalServerError) 102 return 103 } 104 if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 { 105 writeReply(w, rawResponse, injectedStatus, c.injectContentType) 106 return 107 } 108 rawRequest, err := readRequest(r) 109 if err != nil { 110 w.WriteHeader(http.StatusInternalServerError) 111 return 112 } 113 request, err := unmarshalMetricsRequest(rawRequest, r.Header.Get("content-type")) 114 if err != nil { 115 w.WriteHeader(http.StatusBadRequest) 116 return 117 } 118 writeReply(w, rawResponse, 0, c.injectContentType) 119 c.metricLock.Lock() 120 defer c.metricLock.Unlock() 121 c.metricsStorage.AddMetrics(request) 122} 123 124func unmarshalMetricsRequest(rawRequest []byte, contentType string) (*collectormetricpb.ExportMetricsServiceRequest, error) { 125 request := &collectormetricpb.ExportMetricsServiceRequest{} 126 if contentType == "application/json" { 127 err := jsonpb.UnmarshalString(string(rawRequest), request) 128 return request, err 129 } 130 err := request.Unmarshal(rawRequest) 131 return request, err 132} 133 134func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) { 135 if !c.checkHeaders(r) { 136 w.WriteHeader(http.StatusBadRequest) 137 return 138 } 139 response := collectortracepb.ExportTraceServiceResponse{} 140 rawResponse, err := response.Marshal() 141 if err != nil { 142 w.WriteHeader(http.StatusInternalServerError) 143 return 144 } 145 if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 { 146 writeReply(w, rawResponse, injectedStatus, c.injectContentType) 147 return 148 } 149 rawRequest, err := readRequest(r) 150 if err != nil { 151 w.WriteHeader(http.StatusInternalServerError) 152 return 153 } 154 155 request, err := unmarshalTraceRequest(rawRequest, r.Header.Get("content-type")) 156 if err != nil { 157 w.WriteHeader(http.StatusBadRequest) 158 return 159 } 160 writeReply(w, rawResponse, 0, c.injectContentType) 161 c.spanLock.Lock() 162 defer c.spanLock.Unlock() 163 c.spansStorage.AddSpans(request) 164} 165 166func unmarshalTraceRequest(rawRequest []byte, contentType string) (*collectortracepb.ExportTraceServiceRequest, error) { 167 request := &collectortracepb.ExportTraceServiceRequest{} 168 if contentType == "application/json" { 169 err := jsonpb.UnmarshalString(string(rawRequest), request) 170 return request, err 171 } 172 err := request.Unmarshal(rawRequest) 173 return request, err 174} 175 176func (c *mockCollector) checkHeaders(r *http.Request) bool { 177 for k, v := range c.expectedHeaders { 178 got := r.Header.Get(k) 179 if got != v { 180 return false 181 } 182 } 183 return true 184} 185 186func (c *mockCollector) getInjectHTTPStatus() int { 187 if len(c.injectHTTPStatus) == 0 { 188 return 0 189 } 190 status := c.injectHTTPStatus[0] 191 c.injectHTTPStatus = c.injectHTTPStatus[1:] 192 if len(c.injectHTTPStatus) == 0 { 193 c.injectHTTPStatus = nil 194 } 195 return status 196} 197 198func readRequest(r *http.Request) ([]byte, error) { 199 if r.Header.Get("Content-Encoding") == "gzip" { 200 return readGzipBody(r.Body) 201 } 202 return ioutil.ReadAll(r.Body) 203} 204 205func readGzipBody(body io.Reader) ([]byte, error) { 206 rawRequest := bytes.Buffer{} 207 gunzipper, err := gzip.NewReader(body) 208 if err != nil { 209 return nil, err 210 } 211 defer gunzipper.Close() 212 _, err = io.Copy(&rawRequest, gunzipper) 213 if err != nil { 214 return nil, err 215 } 216 return rawRequest.Bytes(), nil 217} 218 219func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) { 220 status := http.StatusOK 221 if injectHTTPStatus != 0 { 222 status = injectHTTPStatus 223 } 224 contentType := "application/x-protobuf" 225 if injectContentType != "" { 226 contentType = injectContentType 227 } 228 w.Header().Set("Content-Type", contentType) 229 w.WriteHeader(status) 230 _, _ = w.Write(rawResponse) 231} 232 233type mockCollectorConfig struct { 234 MetricsURLPath string 235 TracesURLPath string 236 Port int 237 InjectHTTPStatus []int 238 InjectContentType string 239 WithTLS bool 240 ExpectedHeaders map[string]string 241} 242 243func (c *mockCollectorConfig) fillInDefaults() { 244 if c.MetricsURLPath == "" { 245 c.MetricsURLPath = otlphttp.DefaultMetricsPath 246 } 247 if c.TracesURLPath == "" { 248 c.TracesURLPath = otlphttp.DefaultTracesPath 249 } 250} 251 252func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { 253 cfg.fillInDefaults() 254 ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port)) 255 require.NoError(t, err) 256 _, portStr, err := net.SplitHostPort(ln.Addr().String()) 257 require.NoError(t, err) 258 m := &mockCollector{ 259 endpoint: fmt.Sprintf("localhost:%s", portStr), 260 spansStorage: otlptest.NewSpansStorage(), 261 metricsStorage: otlptest.NewMetricsStorage(), 262 injectHTTPStatus: cfg.InjectHTTPStatus, 263 injectContentType: cfg.InjectContentType, 264 expectedHeaders: cfg.ExpectedHeaders, 265 } 266 mux := http.NewServeMux() 267 mux.Handle(cfg.MetricsURLPath, http.HandlerFunc(m.serveMetrics)) 268 mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces)) 269 server := &http.Server{ 270 Handler: mux, 271 } 272 if cfg.WithTLS { 273 pem, err := generateWeakCertificate() 274 require.NoError(t, err) 275 tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey) 276 require.NoError(t, err) 277 server.TLSConfig = &tls.Config{ 278 Certificates: []tls.Certificate{tlsCertificate}, 279 } 280 281 m.clientTLSConfig = &tls.Config{ 282 InsecureSkipVerify: true, 283 } 284 } 285 go func() { 286 if cfg.WithTLS { 287 _ = server.ServeTLS(ln, "", "") 288 } else { 289 _ = server.Serve(ln) 290 } 291 }() 292 m.server = server 293 return m 294} 295