1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package otlpmetricgrpc_test 16 17import ( 18 "context" 19 "fmt" 20 "net" 21 "runtime" 22 "strings" 23 "sync" 24 "testing" 25 "time" 26 27 "google.golang.org/grpc/metadata" 28 29 "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" 30 31 "google.golang.org/grpc" 32 33 collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" 34 metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" 35) 36 37func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector { 38 return &mockCollector{ 39 t: t, 40 metricSvc: &mockMetricService{ 41 storage: otlpmetrictest.NewMetricsStorage(), 42 errors: mockConfig.errors, 43 }, 44 } 45} 46 47type mockMetricService struct { 48 collectormetricpb.UnimplementedMetricsServiceServer 49 50 requests int 51 errors []error 52 53 headers metadata.MD 54 mu sync.RWMutex 55 storage otlpmetrictest.MetricsStorage 56 delay time.Duration 57} 58 59func (mms *mockMetricService) getHeaders() metadata.MD { 60 mms.mu.RLock() 61 defer mms.mu.RUnlock() 62 return mms.headers 63} 64 65func (mms *mockMetricService) getMetrics() []*metricpb.Metric { 66 mms.mu.RLock() 67 defer mms.mu.RUnlock() 68 return mms.storage.GetMetrics() 69} 70 71func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) { 72 if mms.delay > 0 { 73 time.Sleep(mms.delay) 74 } 75 76 mms.mu.Lock() 77 defer func() { 78 mms.requests++ 79 mms.mu.Unlock() 80 }() 81 82 reply := &collectormetricpb.ExportMetricsServiceResponse{} 83 if mms.requests < len(mms.errors) { 84 idx := mms.requests 85 return reply, mms.errors[idx] 86 } 87 88 mms.headers, _ = metadata.FromIncomingContext(ctx) 89 mms.storage.AddMetrics(exp) 90 return reply, nil 91} 92 93type mockCollector struct { 94 t *testing.T 95 96 metricSvc *mockMetricService 97 98 endpoint string 99 ln *listener 100 stopFunc func() 101 stopOnce sync.Once 102} 103 104type mockConfig struct { 105 errors []error 106 endpoint string 107} 108 109var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil) 110 111var errAlreadyStopped = fmt.Errorf("already stopped") 112 113func (mc *mockCollector) stop() error { 114 var err = errAlreadyStopped 115 mc.stopOnce.Do(func() { 116 err = nil 117 if mc.stopFunc != nil { 118 mc.stopFunc() 119 } 120 }) 121 // Give it sometime to shutdown. 122 <-time.After(160 * time.Millisecond) 123 124 // Wait for services to finish reading/writing. 125 // Getting the lock ensures the metricSvc is done flushing. 126 mc.metricSvc.mu.Lock() 127 defer mc.metricSvc.mu.Unlock() 128 return err 129} 130 131func (mc *mockCollector) Stop() error { 132 return mc.stop() 133} 134 135func (mc *mockCollector) getHeaders() metadata.MD { 136 return mc.metricSvc.getHeaders() 137} 138 139func (mc *mockCollector) getMetrics() []*metricpb.Metric { 140 return mc.metricSvc.getMetrics() 141} 142 143func (mc *mockCollector) GetMetrics() []*metricpb.Metric { 144 return mc.getMetrics() 145} 146 147// runMockCollector is a helper function to create a mock Collector 148func runMockCollector(t *testing.T) *mockCollector { 149 return runMockCollectorAtEndpoint(t, "localhost:0") 150} 151 152func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector { 153 return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint}) 154} 155 156func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector { 157 ln, err := net.Listen("tcp", mockConfig.endpoint) 158 if err != nil { 159 t.Fatalf("Failed to get an endpoint: %v", err) 160 } 161 162 srv := grpc.NewServer() 163 mc := makeMockCollector(t, mockConfig) 164 collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc) 165 mc.ln = newListener(ln) 166 go func() { 167 _ = srv.Serve((net.Listener)(mc.ln)) 168 }() 169 170 mc.endpoint = ln.Addr().String() 171 // srv.Stop calls Close on mc.ln. 172 mc.stopFunc = srv.Stop 173 174 return mc 175} 176 177type listener struct { 178 closeOnce sync.Once 179 wrapped net.Listener 180 C chan struct{} 181} 182 183func newListener(wrapped net.Listener) *listener { 184 return &listener{ 185 wrapped: wrapped, 186 C: make(chan struct{}, 1), 187 } 188} 189 190func (l *listener) Close() error { return l.wrapped.Close() } 191 192func (l *listener) Addr() net.Addr { return l.wrapped.Addr() } 193 194// Accept waits for and returns the next connection to the listener. It will 195// send a signal on l.C that a connection has been made before returning. 196func (l *listener) Accept() (net.Conn, error) { 197 conn, err := l.wrapped.Accept() 198 if err != nil { 199 // Go 1.16 exported net.ErrClosed that could clean up this check, but to 200 // remain backwards compatible with previous versions of Go that we 201 // support the following string evaluation is used instead to keep in line 202 // with the previously recommended way to check this: 203 // https://github.com/golang/go/issues/4373#issuecomment-353076799 204 if strings.Contains(err.Error(), "use of closed network connection") { 205 // If the listener has been closed, do not allow callers of 206 // WaitForConn to wait for a connection that will never come. 207 l.closeOnce.Do(func() { close(l.C) }) 208 } 209 return conn, err 210 } 211 212 select { 213 case l.C <- struct{}{}: 214 default: 215 // If C is full, assume nobody is listening and move on. 216 } 217 return conn, nil 218} 219 220// WaitForConn will wait indefintely for a connection to be estabilished with 221// the listener before returning. 222func (l *listener) WaitForConn() { 223 for { 224 select { 225 case <-l.C: 226 return 227 default: 228 runtime.Gosched() 229 } 230 } 231} 232