1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "errors" 19 "io" 20 "strings" 21 "sync" 22 "sync/atomic" 23 "testing" 24 "time" 25 26 "github.com/opentracing/opentracing-go" 27 "github.com/opentracing/opentracing-go/ext" 28 "github.com/stretchr/testify/assert" 29 "github.com/stretchr/testify/require" 30 "github.com/uber/jaeger-lib/metrics" 31 "github.com/uber/jaeger-lib/metrics/metricstest" 32 33 "github.com/uber/jaeger-client-go/log" 34 "github.com/uber/jaeger-client-go/testutils" 35 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" 36) 37 38type reporterSuite struct { 39 tracer opentracing.Tracer 40 closer io.Closer 41 serviceName string 42 reporter *remoteReporter 43 sender *fakeSender 44 metricsFactory *metricstest.Factory 45 logger *log.BytesBufferLogger 46} 47 48func makeReporterSuite(t *testing.T, opts ...ReporterOption) *reporterSuite { 49 return makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 5}, opts...) 50} 51 52func makeReporterSuiteWithSender(t *testing.T, sender *fakeSender, opts ...ReporterOption) *reporterSuite { 53 s := &reporterSuite{ 54 metricsFactory: metricstest.NewFactory(0), 55 serviceName: "DOOP", 56 sender: sender, 57 logger: &log.BytesBufferLogger{}, 58 } 59 metrics := NewMetrics(s.metricsFactory, nil) 60 opts = append([]ReporterOption{ 61 ReporterOptions.Metrics(metrics), 62 ReporterOptions.Logger(s.logger), 63 ReporterOptions.BufferFlushInterval(100 * time.Second), 64 }, opts...) 65 s.reporter = NewRemoteReporter(s.sender, opts...).(*remoteReporter) 66 s.tracer, s.closer = NewTracer( 67 "reporter-test-service", 68 NewConstSampler(true), 69 s.reporter, 70 // TracerOptions.Metrics(metrics), 71 ) 72 require.NotNil(t, s.tracer) 73 return s 74} 75 76func (s *reporterSuite) close() { 77 s.closer.Close() 78} 79 80func (s *reporterSuite) assertCounter(t *testing.T, name string, tags map[string]string, expectedValue int64) { 81 getValue := func() int64 { 82 counters, _ := s.metricsFactory.Snapshot() 83 key := metrics.GetKey(name, tags, "|", "=") 84 return counters[key] 85 } 86 for i := 0; i < 1000; i++ { 87 if getValue() == expectedValue { 88 break 89 } 90 time.Sleep(time.Millisecond) 91 } 92 assert.Equal(t, expectedValue, getValue(), "expected counter: name=%s, tags=%+v", name, tags) 93} 94 95func (s *reporterSuite) assertLogsContain(t *testing.T, expectedLogs string) { 96 for i := 0; i < 1000; i++ { 97 if s.logger.String() == expectedLogs { 98 break 99 } 100 time.Sleep(time.Millisecond) 101 } 102 assert.Contains(t, s.logger.String(), expectedLogs, "expected logs: %s", expectedLogs) 103} 104 105func TestRemoteReporterAppend(t *testing.T) { 106 s := makeReporterSuite(t) 107 defer s.close() 108 s.tracer.StartSpan("sp1").Finish() 109 s.sender.assertBufferedSpans(t, 1) 110} 111 112func TestRemoteReporterAppendAndPeriodicFlush(t *testing.T) { 113 s := makeReporterSuite(t, ReporterOptions.BufferFlushInterval(50*time.Millisecond)) 114 defer s.close() 115 s.tracer.StartSpan("sp1").Finish() 116 s.sender.assertBufferedSpans(t, 1) 117 // here we wait for periodic flush to occur 118 s.sender.assertFlushedSpans(t, 1) 119 s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 1) 120} 121 122func TestRemoteReporterFlushViaAppend(t *testing.T) { 123 s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 2}) 124 defer s.close() 125 s.tracer.StartSpan("sp1").Finish() 126 s.tracer.StartSpan("sp2").Finish() 127 s.sender.assertFlushedSpans(t, 2) 128 s.tracer.StartSpan("sp3").Finish() 129 s.sender.assertBufferedSpans(t, 1) 130 s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 2) 131 s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "err"}, 0) 132} 133 134func TestRemoteReporterFailedFlushViaAppend(t *testing.T) { 135 s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 2, flushErr: errors.New("flush error")}, ReporterOptions.BufferFlushInterval(100*time.Second)) 136 s.tracer.StartSpan("sp1").Finish() 137 s.tracer.StartSpan("sp2").Finish() 138 s.sender.assertFlushedSpans(t, 2) 139 s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n") 140 s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "err"}, 2) 141 s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 0) 142 s.close() // causes explicit flush that also fails with the same error 143 s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n") 144 s.assertLogsContain(t, "ERROR: failed to flush Jaeger spans to server: flush error\n") 145} 146 147func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) { 148 s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 100}, ReporterOptions.BufferFlushInterval(time.Millisecond*10)) 149 TracerOptions.PoolSpans(true)(s.tracer.(*Tracer)) 150 for i := 0; i < 100; i++ { 151 s.tracer.StartSpan("sp").Finish() 152 } 153 time.Sleep(time.Second) 154 s.sender.assertFlushedSpans(t, 100) 155 s.close() // causes explicit flush that also fails with the same error 156} 157 158func TestRemoteReporterDroppedSpans(t *testing.T) { 159 s := makeReporterSuite(t, ReporterOptions.QueueSize(1)) 160 defer s.close() 161 162 s.reporter.sendCloseEvent() // manually shut down the worker 163 s.tracer.StartSpan("s1").Finish() // this span should be added to the queue 164 s.tracer.StartSpan("s2").Finish() // this span should be dropped since the queue is full 165 166 s.metricsFactory.AssertCounterMetrics(t, 167 metricstest.ExpectedMetric{ 168 Name: "jaeger.tracer.reporter_spans", 169 Tags: map[string]string{"result": "ok"}, 170 Value: 0, 171 }, 172 metricstest.ExpectedMetric{ 173 Name: "jaeger.tracer.reporter_spans", 174 Tags: map[string]string{"result": "dropped"}, 175 Value: 1, 176 }, 177 ) 178 179 go s.reporter.processQueue() // restart the worker so that Close() doesn't deadlock 180} 181 182func TestRemoteReporterDoubleClose(t *testing.T) { 183 logger := &log.BytesBufferLogger{} 184 reporter := NewRemoteReporter(&fakeSender{}, ReporterOptions.QueueSize(1), ReporterOptions.Logger(logger)) 185 reporter.Close() 186 reporter.Close() 187 assert.Contains(t, logger.String(), "ERROR: Repeated attempt to close the reporter is ignored\n") 188} 189 190func TestRemoteReporterReportAfterClose(t *testing.T) { 191 s := makeReporterSuite(t) 192 span := s.tracer.StartSpan("leela") 193 194 s.close() // Close the tracer, which also closes and flushes the reporter 195 196 assert.EqualValues(t, 1, atomic.LoadInt64(&s.reporter.closed), "reporter state must be closed") 197 select { 198 case <-s.reporter.queue: 199 t.Fatal("Reporter queue must be empty") 200 default: 201 // expected to get here 202 } 203 204 span.Finish() 205 item := <-s.reporter.queue 206 assert.Equal(t, span, item.span, "since the reporter is closed and its worker routing finished, the span should be in the queue") 207} 208 209func TestUDPReporter(t *testing.T) { 210 agent, err := testutils.StartMockAgent() 211 require.NoError(t, err) 212 defer agent.Close() 213 214 testRemoteReporterWithSender(t, 215 func(m *Metrics) (Transport, error) { 216 return NewUDPTransport(agent.SpanServerAddr(), 0) 217 }, 218 func() []*j.Batch { 219 return agent.GetJaegerBatches() 220 }) 221} 222 223func testRemoteReporterWithSender( 224 t *testing.T, 225 senderFactory func(m *Metrics) (Transport, error), 226 getBatches func() []*j.Batch, 227) { 228 metricsFactory := metricstest.NewFactory(0) 229 metrics := NewMetrics(metricsFactory, nil) 230 231 sender, err := senderFactory(metrics) 232 require.NoError(t, err) 233 reporter := NewRemoteReporter(sender, ReporterOptions.Metrics(metrics)).(*remoteReporter) 234 235 tracer, closer := NewTracer( 236 "reporter-test-service", 237 NewConstSampler(true), 238 reporter, 239 TracerOptions.Metrics(metrics)) 240 241 span := tracer.StartSpan("leela") 242 ext.SpanKindRPCClient.Set(span) 243 ext.PeerService.Set(span, "downstream") 244 span.Finish() 245 closer.Close() // close the tracer, which also closes and flushes the reporter 246 247 // UDP transport uses fire and forget, so we need to wait for spans to get to the agent 248 for i := 0; i < 1000; i++ { 249 time.Sleep(1 * time.Millisecond) 250 if batches := getBatches(); len(batches) > 0 { 251 break 252 } 253 } 254 255 batches := getBatches() 256 require.Equal(t, 1, len(batches)) 257 require.Equal(t, 1, len(batches[0].Spans)) 258 assert.Equal(t, "leela", batches[0].Spans[0].OperationName) 259 assert.Equal(t, "reporter-test-service", batches[0].Process.ServiceName) 260 tag := findJaegerTag("peer.service", batches[0].Spans[0].Tags) 261 assert.NotNil(t, tag) 262 assert.Equal(t, "downstream", *tag.VStr) 263 264 metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ 265 {Name: "jaeger.tracer.reporter_spans", Tags: map[string]string{"result": "ok"}, Value: 1}, 266 {Name: "jaeger.tracer.reporter_spans", Tags: map[string]string{"result": "err"}, Value: 0}, 267 }...) 268} 269 270func TestMemoryReporterReport(t *testing.T) { 271 reporter := NewInMemoryReporter() 272 tracer, closer := NewTracer("DOOP", NewConstSampler(true), reporter) 273 defer closer.Close() 274 tracer.StartSpan("leela").Finish() 275 assert.Len(t, reporter.GetSpans(), 1, "expected number of spans submitted") 276 assert.Equal(t, 1, reporter.SpansSubmitted(), "expected number of spans submitted") 277 reporter.Reset() 278 assert.Len(t, reporter.GetSpans(), 0, "expected number of spans submitted") 279 assert.Equal(t, 0, reporter.SpansSubmitted(), "expected number of spans submitted") 280} 281 282func TestCompositeReporterReport(t *testing.T) { 283 reporter1 := NewInMemoryReporter() 284 reporter2 := NewInMemoryReporter() 285 reporter3 := NewCompositeReporter(reporter1, reporter2) 286 tracer, closer := NewTracer("DOOP", NewConstSampler(true), reporter3) 287 defer closer.Close() 288 tracer.StartSpan("leela").Finish() 289 assert.Len(t, reporter1.GetSpans(), 1, "expected number of spans submitted") 290 assert.Len(t, reporter2.GetSpans(), 1, "expected number of spans submitted") 291} 292 293func TestLoggingReporter(t *testing.T) { 294 logger := &log.BytesBufferLogger{} 295 reporter := NewLoggingReporter(logger) 296 tracer, closer := NewTracer("test", NewConstSampler(true), reporter) 297 defer closer.Close() // will call Close on the reporter 298 tracer.StartSpan("sp1").Finish() 299 assert.True(t, strings.HasPrefix(logger.String(), "INFO: Reporting span")) 300} 301 302type fakeSender struct { 303 bufferSize int 304 appendErr error 305 flushErr error 306 307 spans []*Span 308 flushed []*Span 309 mutex sync.Mutex 310} 311 312func (s *fakeSender) Append(span *Span) (int, error) { 313 s.mutex.Lock() 314 defer s.mutex.Unlock() 315 316 s.spans = append(s.spans, span) 317 if n := len(s.spans); n == s.bufferSize { 318 return s.flushNoLock() 319 } 320 return 0, s.appendErr 321} 322 323func (s *fakeSender) Flush() (int, error) { 324 s.mutex.Lock() 325 defer s.mutex.Unlock() 326 return s.flushNoLock() 327} 328 329func (s *fakeSender) flushNoLock() (int, error) { 330 n := len(s.spans) 331 s.flushed = append(s.flushed, s.spans...) 332 s.spans = nil 333 return n, s.flushErr 334} 335 336func (s *fakeSender) Close() error { return nil } 337 338func (s *fakeSender) BufferedSpans() []*Span { 339 s.mutex.Lock() 340 defer s.mutex.Unlock() 341 res := make([]*Span, len(s.spans)) 342 copy(res, s.spans) 343 return res 344} 345 346func (s *fakeSender) FlushedSpans() []*Span { 347 s.mutex.Lock() 348 defer s.mutex.Unlock() 349 res := make([]*Span, len(s.flushed)) 350 copy(res, s.flushed) 351 return res 352} 353 354func (s *fakeSender) assertBufferedSpans(t *testing.T, count int) { 355 for i := 0; i < 1000; i++ { 356 if len(s.BufferedSpans()) == count { 357 break 358 } 359 time.Sleep(time.Millisecond) 360 } 361 assert.Len(t, s.BufferedSpans(), count) 362} 363 364func (s *fakeSender) assertFlushedSpans(t *testing.T, count int) { 365 for i := 0; i < 1000; i++ { 366 if len(s.FlushedSpans()) == count { 367 break 368 } 369 time.Sleep(time.Millisecond) 370 } 371 assert.Len(t, s.FlushedSpans(), count) 372} 373 374func findDomainLog(span *Span, key string) *opentracing.LogRecord { 375 for _, log := range span.logs { 376 if log.Fields[0].Value().(string) == key { 377 return &log 378 } 379 } 380 return nil 381} 382 383func findDomainTag(span *Span, key string) *Tag { 384 for _, tag := range span.tags { 385 if tag.key == key { 386 return &tag 387 } 388 } 389 return nil 390} 391