1// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18	"errors"
19	"io"
20	"strings"
21	"sync"
22	"sync/atomic"
23	"testing"
24	"time"
25
26	"github.com/opentracing/opentracing-go"
27	"github.com/opentracing/opentracing-go/ext"
28	"github.com/stretchr/testify/assert"
29	"github.com/stretchr/testify/require"
30	"github.com/uber/jaeger-lib/metrics"
31	"github.com/uber/jaeger-lib/metrics/metricstest"
32
33	"github.com/uber/jaeger-client-go/log"
34	"github.com/uber/jaeger-client-go/testutils"
35	j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
36)
37
38type reporterSuite struct {
39	tracer         opentracing.Tracer
40	closer         io.Closer
41	serviceName    string
42	reporter       *remoteReporter
43	sender         *fakeSender
44	metricsFactory *metricstest.Factory
45	logger         *log.BytesBufferLogger
46}
47
48func makeReporterSuite(t *testing.T, opts ...ReporterOption) *reporterSuite {
49	return makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 5}, opts...)
50}
51
52func makeReporterSuiteWithSender(t *testing.T, sender *fakeSender, opts ...ReporterOption) *reporterSuite {
53	s := &reporterSuite{
54		metricsFactory: metricstest.NewFactory(0),
55		serviceName:    "DOOP",
56		sender:         sender,
57		logger:         &log.BytesBufferLogger{},
58	}
59	metrics := NewMetrics(s.metricsFactory, nil)
60	opts = append([]ReporterOption{
61		ReporterOptions.Metrics(metrics),
62		ReporterOptions.Logger(s.logger),
63		ReporterOptions.BufferFlushInterval(100 * time.Second),
64	}, opts...)
65	s.reporter = NewRemoteReporter(s.sender, opts...).(*remoteReporter)
66	s.tracer, s.closer = NewTracer(
67		"reporter-test-service",
68		NewConstSampler(true),
69		s.reporter,
70		// TracerOptions.Metrics(metrics),
71	)
72	require.NotNil(t, s.tracer)
73	return s
74}
75
76func (s *reporterSuite) close() {
77	s.closer.Close()
78}
79
80func (s *reporterSuite) assertCounter(t *testing.T, name string, tags map[string]string, expectedValue int64) {
81	getValue := func() int64 {
82		counters, _ := s.metricsFactory.Snapshot()
83		key := metrics.GetKey(name, tags, "|", "=")
84		return counters[key]
85	}
86	for i := 0; i < 1000; i++ {
87		if getValue() == expectedValue {
88			break
89		}
90		time.Sleep(time.Millisecond)
91	}
92	assert.Equal(t, expectedValue, getValue(), "expected counter: name=%s, tags=%+v", name, tags)
93}
94
95func (s *reporterSuite) assertLogsContain(t *testing.T, expectedLogs string) {
96	for i := 0; i < 1000; i++ {
97		if s.logger.String() == expectedLogs {
98			break
99		}
100		time.Sleep(time.Millisecond)
101	}
102	assert.Contains(t, s.logger.String(), expectedLogs, "expected logs: %s", expectedLogs)
103}
104
105func TestRemoteReporterAppend(t *testing.T) {
106	s := makeReporterSuite(t)
107	defer s.close()
108	s.tracer.StartSpan("sp1").Finish()
109	s.sender.assertBufferedSpans(t, 1)
110}
111
112func TestRemoteReporterAppendAndPeriodicFlush(t *testing.T) {
113	s := makeReporterSuite(t, ReporterOptions.BufferFlushInterval(50*time.Millisecond))
114	defer s.close()
115	s.tracer.StartSpan("sp1").Finish()
116	s.sender.assertBufferedSpans(t, 1)
117	// here we wait for periodic flush to occur
118	s.sender.assertFlushedSpans(t, 1)
119	s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 1)
120}
121
122func TestRemoteReporterFlushViaAppend(t *testing.T) {
123	s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 2})
124	defer s.close()
125	s.tracer.StartSpan("sp1").Finish()
126	s.tracer.StartSpan("sp2").Finish()
127	s.sender.assertFlushedSpans(t, 2)
128	s.tracer.StartSpan("sp3").Finish()
129	s.sender.assertBufferedSpans(t, 1)
130	s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 2)
131	s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "err"}, 0)
132}
133
134func TestRemoteReporterFailedFlushViaAppend(t *testing.T) {
135	s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 2, flushErr: errors.New("flush error")}, ReporterOptions.BufferFlushInterval(100*time.Second))
136	s.tracer.StartSpan("sp1").Finish()
137	s.tracer.StartSpan("sp2").Finish()
138	s.sender.assertFlushedSpans(t, 2)
139	s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n")
140	s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "err"}, 2)
141	s.assertCounter(t, "jaeger.tracer.reporter_spans", map[string]string{"result": "ok"}, 0)
142	s.close() // causes explicit flush that also fails with the same error
143	s.assertLogsContain(t, "ERROR: error reporting Jaeger span \"sp2\": flush error\n")
144	s.assertLogsContain(t, "ERROR: failed to flush Jaeger spans to server: flush error\n")
145}
146
147func TestRemoteReporterAppendWithPoolAllocator(t *testing.T) {
148	s := makeReporterSuiteWithSender(t, &fakeSender{bufferSize: 100}, ReporterOptions.BufferFlushInterval(time.Millisecond*10))
149	TracerOptions.PoolSpans(true)(s.tracer.(*Tracer))
150	for i := 0; i < 100; i++ {
151		s.tracer.StartSpan("sp").Finish()
152	}
153	time.Sleep(time.Second)
154	s.sender.assertFlushedSpans(t, 100)
155	s.close() // causes explicit flush that also fails with the same error
156}
157
158func TestRemoteReporterDroppedSpans(t *testing.T) {
159	s := makeReporterSuite(t, ReporterOptions.QueueSize(1))
160	defer s.close()
161
162	s.reporter.sendCloseEvent()       // manually shut down the worker
163	s.tracer.StartSpan("s1").Finish() // this span should be added to the queue
164	s.tracer.StartSpan("s2").Finish() // this span should be dropped since the queue is full
165
166	s.metricsFactory.AssertCounterMetrics(t,
167		metricstest.ExpectedMetric{
168			Name:  "jaeger.tracer.reporter_spans",
169			Tags:  map[string]string{"result": "ok"},
170			Value: 0,
171		},
172		metricstest.ExpectedMetric{
173			Name:  "jaeger.tracer.reporter_spans",
174			Tags:  map[string]string{"result": "dropped"},
175			Value: 1,
176		},
177	)
178
179	go s.reporter.processQueue() // restart the worker so that Close() doesn't deadlock
180}
181
182func TestRemoteReporterDoubleClose(t *testing.T) {
183	logger := &log.BytesBufferLogger{}
184	reporter := NewRemoteReporter(&fakeSender{}, ReporterOptions.QueueSize(1), ReporterOptions.Logger(logger))
185	reporter.Close()
186	reporter.Close()
187	assert.Contains(t, logger.String(), "ERROR: Repeated attempt to close the reporter is ignored\n")
188}
189
190func TestRemoteReporterReportAfterClose(t *testing.T) {
191	s := makeReporterSuite(t)
192	span := s.tracer.StartSpan("leela")
193
194	s.close() // Close the tracer, which also closes and flushes the reporter
195
196	assert.EqualValues(t, 1, atomic.LoadInt64(&s.reporter.closed), "reporter state must be closed")
197	select {
198	case <-s.reporter.queue:
199		t.Fatal("Reporter queue must be empty")
200	default:
201		// expected to get here
202	}
203
204	span.Finish()
205	item := <-s.reporter.queue
206	assert.Equal(t, span, item.span, "since the reporter is closed and its worker routing finished, the span should be in the queue")
207}
208
209func TestUDPReporter(t *testing.T) {
210	agent, err := testutils.StartMockAgent()
211	require.NoError(t, err)
212	defer agent.Close()
213
214	testRemoteReporterWithSender(t,
215		func(m *Metrics) (Transport, error) {
216			return NewUDPTransport(agent.SpanServerAddr(), 0)
217		},
218		func() []*j.Batch {
219			return agent.GetJaegerBatches()
220		})
221}
222
223func testRemoteReporterWithSender(
224	t *testing.T,
225	senderFactory func(m *Metrics) (Transport, error),
226	getBatches func() []*j.Batch,
227) {
228	metricsFactory := metricstest.NewFactory(0)
229	metrics := NewMetrics(metricsFactory, nil)
230
231	sender, err := senderFactory(metrics)
232	require.NoError(t, err)
233	reporter := NewRemoteReporter(sender, ReporterOptions.Metrics(metrics)).(*remoteReporter)
234
235	tracer, closer := NewTracer(
236		"reporter-test-service",
237		NewConstSampler(true),
238		reporter,
239		TracerOptions.Metrics(metrics))
240
241	span := tracer.StartSpan("leela")
242	ext.SpanKindRPCClient.Set(span)
243	ext.PeerService.Set(span, "downstream")
244	span.Finish()
245	closer.Close() // close the tracer, which also closes and flushes the reporter
246
247	// UDP transport uses fire and forget, so we need to wait for spans to get to the agent
248	for i := 0; i < 1000; i++ {
249		time.Sleep(1 * time.Millisecond)
250		if batches := getBatches(); len(batches) > 0 {
251			break
252		}
253	}
254
255	batches := getBatches()
256	require.Equal(t, 1, len(batches))
257	require.Equal(t, 1, len(batches[0].Spans))
258	assert.Equal(t, "leela", batches[0].Spans[0].OperationName)
259	assert.Equal(t, "reporter-test-service", batches[0].Process.ServiceName)
260	tag := findJaegerTag("peer.service", batches[0].Spans[0].Tags)
261	assert.NotNil(t, tag)
262	assert.Equal(t, "downstream", *tag.VStr)
263
264	metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{
265		{Name: "jaeger.tracer.reporter_spans", Tags: map[string]string{"result": "ok"}, Value: 1},
266		{Name: "jaeger.tracer.reporter_spans", Tags: map[string]string{"result": "err"}, Value: 0},
267	}...)
268}
269
270func TestMemoryReporterReport(t *testing.T) {
271	reporter := NewInMemoryReporter()
272	tracer, closer := NewTracer("DOOP", NewConstSampler(true), reporter)
273	defer closer.Close()
274	tracer.StartSpan("leela").Finish()
275	assert.Len(t, reporter.GetSpans(), 1, "expected number of spans submitted")
276	assert.Equal(t, 1, reporter.SpansSubmitted(), "expected number of spans submitted")
277	reporter.Reset()
278	assert.Len(t, reporter.GetSpans(), 0, "expected number of spans submitted")
279	assert.Equal(t, 0, reporter.SpansSubmitted(), "expected number of spans submitted")
280}
281
282func TestCompositeReporterReport(t *testing.T) {
283	reporter1 := NewInMemoryReporter()
284	reporter2 := NewInMemoryReporter()
285	reporter3 := NewCompositeReporter(reporter1, reporter2)
286	tracer, closer := NewTracer("DOOP", NewConstSampler(true), reporter3)
287	defer closer.Close()
288	tracer.StartSpan("leela").Finish()
289	assert.Len(t, reporter1.GetSpans(), 1, "expected number of spans submitted")
290	assert.Len(t, reporter2.GetSpans(), 1, "expected number of spans submitted")
291}
292
293func TestLoggingReporter(t *testing.T) {
294	logger := &log.BytesBufferLogger{}
295	reporter := NewLoggingReporter(logger)
296	tracer, closer := NewTracer("test", NewConstSampler(true), reporter)
297	defer closer.Close() // will call Close on the reporter
298	tracer.StartSpan("sp1").Finish()
299	assert.True(t, strings.HasPrefix(logger.String(), "INFO: Reporting span"))
300}
301
302type fakeSender struct {
303	bufferSize int
304	appendErr  error
305	flushErr   error
306
307	spans   []*Span
308	flushed []*Span
309	mutex   sync.Mutex
310}
311
312func (s *fakeSender) Append(span *Span) (int, error) {
313	s.mutex.Lock()
314	defer s.mutex.Unlock()
315
316	s.spans = append(s.spans, span)
317	if n := len(s.spans); n == s.bufferSize {
318		return s.flushNoLock()
319	}
320	return 0, s.appendErr
321}
322
323func (s *fakeSender) Flush() (int, error) {
324	s.mutex.Lock()
325	defer s.mutex.Unlock()
326	return s.flushNoLock()
327}
328
329func (s *fakeSender) flushNoLock() (int, error) {
330	n := len(s.spans)
331	s.flushed = append(s.flushed, s.spans...)
332	s.spans = nil
333	return n, s.flushErr
334}
335
336func (s *fakeSender) Close() error { return nil }
337
338func (s *fakeSender) BufferedSpans() []*Span {
339	s.mutex.Lock()
340	defer s.mutex.Unlock()
341	res := make([]*Span, len(s.spans))
342	copy(res, s.spans)
343	return res
344}
345
346func (s *fakeSender) FlushedSpans() []*Span {
347	s.mutex.Lock()
348	defer s.mutex.Unlock()
349	res := make([]*Span, len(s.flushed))
350	copy(res, s.flushed)
351	return res
352}
353
354func (s *fakeSender) assertBufferedSpans(t *testing.T, count int) {
355	for i := 0; i < 1000; i++ {
356		if len(s.BufferedSpans()) == count {
357			break
358		}
359		time.Sleep(time.Millisecond)
360	}
361	assert.Len(t, s.BufferedSpans(), count)
362}
363
364func (s *fakeSender) assertFlushedSpans(t *testing.T, count int) {
365	for i := 0; i < 1000; i++ {
366		if len(s.FlushedSpans()) == count {
367			break
368		}
369		time.Sleep(time.Millisecond)
370	}
371	assert.Len(t, s.FlushedSpans(), count)
372}
373
374func findDomainLog(span *Span, key string) *opentracing.LogRecord {
375	for _, log := range span.logs {
376		if log.Fields[0].Value().(string) == key {
377			return &log
378		}
379	}
380	return nil
381}
382
383func findDomainTag(span *Span, key string) *Tag {
384	for _, tag := range span.tags {
385		if tag.key == key {
386			return &tag
387		}
388	}
389	return nil
390}
391