1// Copyright (c) 2017 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package jaeger
16
17import (
18	"testing"
19	"time"
20
21	"github.com/stretchr/testify/assert"
22	"github.com/stretchr/testify/require"
23
24	"github.com/uber/jaeger-client-go/testutils"
25	"github.com/uber/jaeger-client-go/thrift"
26	j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
27)
28
29var (
30	testTracer, _ = NewTracer("svcName", NewConstSampler(false), NewNullReporter())
31	jaegerTracer  = testTracer.(*Tracer)
32)
33
34func getThriftSpanByteLength(t *testing.T, span *Span) int {
35	jSpan := BuildJaegerThrift(span)
36	transport := thrift.NewTMemoryBufferLen(1000)
37	protocolFactory := thrift.NewTCompactProtocolFactory()
38	err := jSpan.Write(protocolFactory.GetProtocol(transport))
39	require.NoError(t, err)
40	return transport.Len()
41}
42
43func getThriftProcessByteLengthFromTracer(t *testing.T, tracer *Tracer) int {
44	process := buildJaegerProcessThrift(tracer)
45	return getThriftProcessByteLength(t, process)
46}
47
48func getThriftProcessByteLength(t *testing.T, process *j.Process) int {
49	transport := thrift.NewTMemoryBufferLen(1000)
50	protocolFactory := thrift.NewTCompactProtocolFactory()
51	err := process.Write(protocolFactory.GetProtocol(transport))
52	require.NoError(t, err)
53	return transport.Len()
54}
55
56func newSpan() *Span {
57	span := &Span{operationName: "test-span", tracer: jaegerTracer}
58	span.context.samplingState = &samplingState{}
59	return span
60}
61
62func TestEmitBatchOverhead(t *testing.T) {
63	transport := thrift.NewTMemoryBufferLen(1000)
64	protocolFactory := thrift.NewTCompactProtocolFactory()
65	client := j.NewAgentClientFactory(transport, protocolFactory)
66
67	span := newSpan()
68	spanSize := getThriftSpanByteLength(t, span)
69
70	tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF}
71	for i, n := range tests {
72		transport.Reset()
73		batch := make([]*j.Span, n)
74		processTags := make([]*j.Tag, n)
75		for x := 0; x < n; x++ {
76			batch[x] = BuildJaegerThrift(span)
77			processTags[x] = &j.Tag{}
78		}
79		process := &j.Process{ServiceName: "svcName", Tags: processTags}
80		client.SeqId = -2 // this causes the longest encoding of varint32 as 5 bytes
81		err := client.EmitBatch(&j.Batch{Process: process, Spans: batch})
82		processSize := getThriftProcessByteLength(t, process)
83		require.NoError(t, err)
84		overhead := transport.Len() - n*spanSize - processSize
85		assert.True(t, overhead <= emitBatchOverhead,
86			"test %d, n=%d, expected overhead %d <= %d", i, n, overhead, emitBatchOverhead)
87		t.Logf("span count: %d, overhead: %d", n, overhead)
88	}
89}
90
91func TestUDPSenderFlush(t *testing.T) {
92	agent, err := testutils.StartMockAgent()
93	require.NoError(t, err)
94	defer agent.Close()
95
96	span := newSpan()
97	spanSize := getThriftSpanByteLength(t, span)
98	processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)
99
100	sender, err := NewUDPTransport(agent.SpanServerAddr(), 5*spanSize+processSize+emitBatchOverhead)
101	require.NoError(t, err)
102	udpSender := sender.(*udpSender)
103
104	// test empty flush
105	n, err := sender.Flush()
106	require.NoError(t, err)
107	assert.Equal(t, 0, n)
108
109	// test early flush
110	n, err = sender.Append(span)
111	require.NoError(t, err)
112	assert.Equal(t, 0, n, "span should be in buffer, not flushed")
113	buffer := udpSender.spanBuffer
114	require.Equal(t, 1, len(buffer), "span should be in buffer, not flushed")
115	assert.Equal(t, BuildJaegerThrift(span), buffer[0], "span should be in buffer, not flushed")
116
117	n, err = sender.Flush()
118	require.NoError(t, err)
119	assert.Equal(t, 1, n)
120	assert.Equal(t, 0, len(udpSender.spanBuffer), "buffer should become empty")
121	assert.Equal(t, processSize, udpSender.byteBufferSize, "buffer size counter should be equal to the processSize")
122	assert.Nil(t, buffer[0], "buffer should not keep reference to the span")
123
124	for i := 0; i < 10000; i++ {
125		batches := agent.GetJaegerBatches()
126		if len(batches) > 0 {
127			break
128		}
129		time.Sleep(1 * time.Millisecond)
130	}
131	batches := agent.GetJaegerBatches()
132	require.Equal(t, 1, len(batches), "agent should have received the batch")
133	require.Equal(t, 1, len(batches[0].Spans))
134	assert.Equal(t, span.operationName, batches[0].Spans[0].OperationName)
135}
136
137func TestUDPSenderAppend(t *testing.T) {
138	agent, err := testutils.StartMockAgent()
139	require.NoError(t, err)
140	defer agent.Close()
141
142	span := newSpan()
143	spanSize := getThriftSpanByteLength(t, span)
144	processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer)
145
146	tests := []struct {
147		bufferSizeOffset      int
148		expectFlush           bool
149		expectSpansFlushed    int
150		expectBatchesFlushed  int
151		manualFlush           bool
152		expectSpansFlushed2   int
153		expectBatchesFlushed2 int
154		description           string
155	}{
156		{1, false, 0, 0, true, 5, 1, "in test: buffer bigger than 5 spans"},
157		{0, true, 5, 1, false, 0, 0, "in test: buffer fits exactly 5 spans"},
158		{-1, true, 4, 1, true, 1, 1, "in test: buffer smaller than 5 spans"},
159	}
160
161	for _, test := range tests {
162		bufferSize := 5*spanSize + test.bufferSizeOffset + processSize + emitBatchOverhead
163		sender, err := NewUDPTransport(agent.SpanServerAddr(), bufferSize)
164		require.NoError(t, err, test.description)
165
166		agent.ResetJaegerBatches()
167		for i := 0; i < 5; i++ {
168			n, err := sender.Append(span)
169			require.NoError(t, err, test.description)
170			if i < 4 {
171				assert.Equal(t, 0, n, test.description)
172			} else {
173				assert.Equal(t, test.expectSpansFlushed, n, test.description)
174			}
175		}
176		if test.expectFlush {
177			time.Sleep(5 * time.Millisecond)
178		}
179		batches := agent.GetJaegerBatches()
180		require.Equal(t, test.expectBatchesFlushed, len(batches), test.description)
181		var spans []*j.Span
182		if test.expectBatchesFlushed > 0 {
183			spans = batches[0].Spans
184		}
185		require.Equal(t, test.expectSpansFlushed, len(spans), test.description)
186		for i := 0; i < test.expectSpansFlushed; i++ {
187			assert.Equal(t, span.operationName, spans[i].OperationName, test.description)
188		}
189
190		if test.manualFlush {
191			agent.ResetJaegerBatches()
192			n, err := sender.Flush()
193			require.NoError(t, err, test.description)
194			assert.Equal(t, test.expectSpansFlushed2, n, test.description)
195
196			time.Sleep(5 * time.Millisecond)
197			batches = agent.GetJaegerBatches()
198			require.Equal(t, test.expectBatchesFlushed2, len(batches), test.description)
199			spans = []*j.Span{}
200			if test.expectBatchesFlushed2 > 0 {
201				spans = batches[0].Spans
202			}
203			require.Equal(t, test.expectSpansFlushed2, len(spans), test.description)
204			for i := 0; i < test.expectSpansFlushed2; i++ {
205				assert.Equal(t, span.operationName, spans[i].OperationName, test.description)
206			}
207		}
208
209	}
210}
211
212func TestUDPSenderHugeSpan(t *testing.T) {
213	agent, err := testutils.StartMockAgent()
214	require.NoError(t, err)
215	defer agent.Close()
216
217	span := newSpan()
218	spanSize := getThriftSpanByteLength(t, span)
219
220	sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead)
221	require.NoError(t, err)
222
223	n, err := sender.Append(span)
224	assert.Equal(t, errSpanTooLarge, err)
225	assert.Equal(t, 1, n)
226}
227