1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package jaeger 16 17import ( 18 "testing" 19 "time" 20 21 "github.com/stretchr/testify/assert" 22 "github.com/stretchr/testify/require" 23 24 "github.com/uber/jaeger-client-go/testutils" 25 "github.com/uber/jaeger-client-go/thrift" 26 j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" 27) 28 29var ( 30 testTracer, _ = NewTracer("svcName", NewConstSampler(false), NewNullReporter()) 31 jaegerTracer = testTracer.(*Tracer) 32) 33 34func getThriftSpanByteLength(t *testing.T, span *Span) int { 35 jSpan := BuildJaegerThrift(span) 36 transport := thrift.NewTMemoryBufferLen(1000) 37 protocolFactory := thrift.NewTCompactProtocolFactory() 38 err := jSpan.Write(protocolFactory.GetProtocol(transport)) 39 require.NoError(t, err) 40 return transport.Len() 41} 42 43func getThriftProcessByteLengthFromTracer(t *testing.T, tracer *Tracer) int { 44 process := buildJaegerProcessThrift(tracer) 45 return getThriftProcessByteLength(t, process) 46} 47 48func getThriftProcessByteLength(t *testing.T, process *j.Process) int { 49 transport := thrift.NewTMemoryBufferLen(1000) 50 protocolFactory := thrift.NewTCompactProtocolFactory() 51 err := process.Write(protocolFactory.GetProtocol(transport)) 52 require.NoError(t, err) 53 return transport.Len() 54} 55 56func newSpan() *Span { 57 span := &Span{operationName: "test-span", tracer: jaegerTracer} 58 span.context.samplingState = &samplingState{} 59 return span 60} 61 62func TestEmitBatchOverhead(t *testing.T) { 63 transport := thrift.NewTMemoryBufferLen(1000) 64 protocolFactory := thrift.NewTCompactProtocolFactory() 65 client := j.NewAgentClientFactory(transport, protocolFactory) 66 67 span := newSpan() 68 spanSize := getThriftSpanByteLength(t, span) 69 70 tests := []int{1, 2, 14, 15, 377, 500, 65000, 0xFFFF} 71 for i, n := range tests { 72 transport.Reset() 73 batch := make([]*j.Span, n) 74 processTags := make([]*j.Tag, n) 75 for x := 0; x < n; x++ { 76 batch[x] = BuildJaegerThrift(span) 77 processTags[x] = &j.Tag{} 78 } 79 process := &j.Process{ServiceName: "svcName", Tags: processTags} 80 client.SeqId = -2 // this causes the longest encoding of varint32 as 5 bytes 81 err := client.EmitBatch(&j.Batch{Process: process, Spans: batch}) 82 processSize := getThriftProcessByteLength(t, process) 83 require.NoError(t, err) 84 overhead := transport.Len() - n*spanSize - processSize 85 assert.True(t, overhead <= emitBatchOverhead, 86 "test %d, n=%d, expected overhead %d <= %d", i, n, overhead, emitBatchOverhead) 87 t.Logf("span count: %d, overhead: %d", n, overhead) 88 } 89} 90 91func TestUDPSenderFlush(t *testing.T) { 92 agent, err := testutils.StartMockAgent() 93 require.NoError(t, err) 94 defer agent.Close() 95 96 span := newSpan() 97 spanSize := getThriftSpanByteLength(t, span) 98 processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) 99 100 sender, err := NewUDPTransport(agent.SpanServerAddr(), 5*spanSize+processSize+emitBatchOverhead) 101 require.NoError(t, err) 102 udpSender := sender.(*udpSender) 103 104 // test empty flush 105 n, err := sender.Flush() 106 require.NoError(t, err) 107 assert.Equal(t, 0, n) 108 109 // test early flush 110 n, err = sender.Append(span) 111 require.NoError(t, err) 112 assert.Equal(t, 0, n, "span should be in buffer, not flushed") 113 buffer := udpSender.spanBuffer 114 require.Equal(t, 1, len(buffer), "span should be in buffer, not flushed") 115 assert.Equal(t, BuildJaegerThrift(span), buffer[0], "span should be in buffer, not flushed") 116 117 n, err = sender.Flush() 118 require.NoError(t, err) 119 assert.Equal(t, 1, n) 120 assert.Equal(t, 0, len(udpSender.spanBuffer), "buffer should become empty") 121 assert.Equal(t, processSize, udpSender.byteBufferSize, "buffer size counter should be equal to the processSize") 122 assert.Nil(t, buffer[0], "buffer should not keep reference to the span") 123 124 for i := 0; i < 10000; i++ { 125 batches := agent.GetJaegerBatches() 126 if len(batches) > 0 { 127 break 128 } 129 time.Sleep(1 * time.Millisecond) 130 } 131 batches := agent.GetJaegerBatches() 132 require.Equal(t, 1, len(batches), "agent should have received the batch") 133 require.Equal(t, 1, len(batches[0].Spans)) 134 assert.Equal(t, span.operationName, batches[0].Spans[0].OperationName) 135} 136 137func TestUDPSenderAppend(t *testing.T) { 138 agent, err := testutils.StartMockAgent() 139 require.NoError(t, err) 140 defer agent.Close() 141 142 span := newSpan() 143 spanSize := getThriftSpanByteLength(t, span) 144 processSize := getThriftProcessByteLengthFromTracer(t, jaegerTracer) 145 146 tests := []struct { 147 bufferSizeOffset int 148 expectFlush bool 149 expectSpansFlushed int 150 expectBatchesFlushed int 151 manualFlush bool 152 expectSpansFlushed2 int 153 expectBatchesFlushed2 int 154 description string 155 }{ 156 {1, false, 0, 0, true, 5, 1, "in test: buffer bigger than 5 spans"}, 157 {0, true, 5, 1, false, 0, 0, "in test: buffer fits exactly 5 spans"}, 158 {-1, true, 4, 1, true, 1, 1, "in test: buffer smaller than 5 spans"}, 159 } 160 161 for _, test := range tests { 162 bufferSize := 5*spanSize + test.bufferSizeOffset + processSize + emitBatchOverhead 163 sender, err := NewUDPTransport(agent.SpanServerAddr(), bufferSize) 164 require.NoError(t, err, test.description) 165 166 agent.ResetJaegerBatches() 167 for i := 0; i < 5; i++ { 168 n, err := sender.Append(span) 169 require.NoError(t, err, test.description) 170 if i < 4 { 171 assert.Equal(t, 0, n, test.description) 172 } else { 173 assert.Equal(t, test.expectSpansFlushed, n, test.description) 174 } 175 } 176 if test.expectFlush { 177 time.Sleep(5 * time.Millisecond) 178 } 179 batches := agent.GetJaegerBatches() 180 require.Equal(t, test.expectBatchesFlushed, len(batches), test.description) 181 var spans []*j.Span 182 if test.expectBatchesFlushed > 0 { 183 spans = batches[0].Spans 184 } 185 require.Equal(t, test.expectSpansFlushed, len(spans), test.description) 186 for i := 0; i < test.expectSpansFlushed; i++ { 187 assert.Equal(t, span.operationName, spans[i].OperationName, test.description) 188 } 189 190 if test.manualFlush { 191 agent.ResetJaegerBatches() 192 n, err := sender.Flush() 193 require.NoError(t, err, test.description) 194 assert.Equal(t, test.expectSpansFlushed2, n, test.description) 195 196 time.Sleep(5 * time.Millisecond) 197 batches = agent.GetJaegerBatches() 198 require.Equal(t, test.expectBatchesFlushed2, len(batches), test.description) 199 spans = []*j.Span{} 200 if test.expectBatchesFlushed2 > 0 { 201 spans = batches[0].Spans 202 } 203 require.Equal(t, test.expectSpansFlushed2, len(spans), test.description) 204 for i := 0; i < test.expectSpansFlushed2; i++ { 205 assert.Equal(t, span.operationName, spans[i].OperationName, test.description) 206 } 207 } 208 209 } 210} 211 212func TestUDPSenderHugeSpan(t *testing.T) { 213 agent, err := testutils.StartMockAgent() 214 require.NoError(t, err) 215 defer agent.Close() 216 217 span := newSpan() 218 spanSize := getThriftSpanByteLength(t, span) 219 220 sender, err := NewUDPTransport(agent.SpanServerAddr(), spanSize/2+emitBatchOverhead) 221 require.NoError(t, err) 222 223 n, err := sender.Append(span) 224 assert.Equal(t, errSpanTooLarge, err) 225 assert.Equal(t, 1, n) 226} 227