1import os
2import socket
3import tempfile
4import threading
5import time
6
7import mock
8import msgpack
9import pytest
10from six.moves import BaseHTTPServer
11from six.moves import socketserver
12
13from ddtrace.constants import KEEP_SPANS_RATE_KEY
14from ddtrace.internal.compat import PY3
15from ddtrace.internal.compat import get_connection_response
16from ddtrace.internal.compat import httplib
17from ddtrace.internal.encoding import MSGPACK_ENCODERS
18from ddtrace.internal.uds import UDSHTTPConnection
19from ddtrace.internal.writer import AgentWriter
20from ddtrace.internal.writer import LogWriter
21from ddtrace.internal.writer import Response
22from ddtrace.internal.writer import _human_size
23from ddtrace.span import Span
24from tests.utils import AnyInt
25from tests.utils import BaseTestCase
26from tests.utils import override_env
27
28
29class DummyOutput:
30    def __init__(self):
31        self.entries = []
32
33    def write(self, message):
34        self.entries.append(message)
35
36    def flush(self):
37        pass
38
39
40class AgentWriterTests(BaseTestCase):
41    N_TRACES = 11
42
43    def test_metrics_disabled(self):
44        statsd = mock.Mock()
45        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False)
46        for i in range(10):
47            writer.write(
48                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
49            )
50        writer.stop()
51        writer.join()
52
53        statsd.increment.assert_not_called()
54        statsd.distribution.assert_not_called()
55
56    def test_metrics_bad_endpoint(self):
57        statsd = mock.Mock()
58        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True)
59        for i in range(10):
60            writer.write(
61                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
62            )
63        writer.stop()
64        writer.join()
65
66        statsd.distribution.assert_has_calls(
67            [
68                mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
69                mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
70                mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
71                mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
72                mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
73            ],
74            any_order=True,
75        )
76
77    def test_metrics_trace_too_big(self):
78        statsd = mock.Mock()
79        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True)
80        for i in range(10):
81            writer.write(
82                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
83            )
84        writer.write(
85            [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)]
86        )
87        writer.stop()
88        writer.join()
89
90        statsd.distribution.assert_has_calls(
91            [
92                mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
93                mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
94                mock.call("datadog.tracer.buffer.dropped.traces", 1, tags=["reason:t_too_big"]),
95                mock.call("datadog.tracer.buffer.dropped.bytes", AnyInt(), tags=["reason:t_too_big"]),
96                mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
97                mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
98                mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
99            ],
100            any_order=True,
101        )
102
103    def test_metrics_multi(self):
104        statsd = mock.Mock()
105        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True)
106        for i in range(10):
107            writer.write(
108                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
109            )
110        writer.flush_queue()
111        statsd.distribution.assert_has_calls(
112            [
113                mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
114                mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
115                mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
116                mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
117                mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
118            ],
119            any_order=True,
120        )
121
122        statsd.reset_mock()
123
124        for i in range(10):
125            writer.write(
126                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
127            )
128        writer.stop()
129        writer.join()
130
131        statsd.distribution.assert_has_calls(
132            [
133                mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]),
134                mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]),
135                mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
136                mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
137                mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
138            ],
139            any_order=True,
140        )
141
142    def test_write_sync(self):
143        statsd = mock.Mock()
144        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True, sync_mode=True)
145        writer.write([Span(tracer=None, name="name", trace_id=1, span_id=j, parent_id=j - 1 or None) for j in range(5)])
146        statsd.distribution.assert_has_calls(
147            [
148                mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=[]),
149                mock.call("datadog.tracer.buffer.accepted.spans", 5, tags=[]),
150                mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]),
151                mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]),
152                mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]),
153            ],
154            any_order=True,
155        )
156
157    def test_drop_reason_bad_endpoint(self):
158        statsd = mock.Mock()
159        writer_metrics_reset = mock.Mock()
160        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False)
161        writer._metrics_reset = writer_metrics_reset
162        for i in range(10):
163            writer.write(
164                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
165            )
166        writer.stop()
167        writer.join()
168
169        writer_metrics_reset.assert_called_once()
170
171        assert 1 == writer._metrics["http.errors"]["count"]
172        assert 10 == writer._metrics["http.dropped.traces"]["count"]
173
174    def test_drop_reason_trace_too_big(self):
175        statsd = mock.Mock()
176        writer_metrics_reset = mock.Mock()
177        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False)
178        writer._metrics_reset = writer_metrics_reset
179        for i in range(10):
180            writer.write(
181                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
182            )
183        writer.write(
184            [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)]
185        )
186        writer.stop()
187        writer.join()
188
189        writer_metrics_reset.assert_called_once()
190
191        assert 1 == writer._metrics["buffer.dropped.traces"]["count"]
192        assert ["reason:t_too_big"] == writer._metrics["buffer.dropped.traces"]["tags"]
193
194    def test_drop_reason_buffer_full(self):
195        statsd = mock.Mock()
196        writer_metrics_reset = mock.Mock()
197        writer = AgentWriter(agent_url="http://asdf:1234", buffer_size=5300, dogstatsd=statsd, report_metrics=False)
198        writer._metrics_reset = writer_metrics_reset
199        for i in range(10):
200            writer.write(
201                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
202            )
203        writer.write([Span(tracer=None, name="a", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)])
204        writer.stop()
205        writer.join()
206
207        writer_metrics_reset.assert_called_once()
208
209        assert 1 == writer._metrics["buffer.dropped.traces"]["count"]
210        assert ["reason:full"] == writer._metrics["buffer.dropped.traces"]["tags"]
211
212    def test_drop_reason_encoding_error(self):
213        n_traces = 10
214        statsd = mock.Mock()
215        writer_encoder = mock.Mock()
216        writer_encoder.__len__ = (lambda *args: n_traces).__get__(writer_encoder)
217        writer_metrics_reset = mock.Mock()
218        writer_encoder.encode.side_effect = Exception
219        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False)
220        writer._encoder = writer_encoder
221        writer._metrics_reset = writer_metrics_reset
222        for i in range(n_traces):
223            writer.write(
224                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
225            )
226
227        writer.stop()
228        writer.join()
229
230        writer_metrics_reset.assert_called_once()
231
232        assert 10 == writer._metrics["encoder.dropped.traces"]["count"]
233
234    def test_keep_rate(self):
235        statsd = mock.Mock()
236        writer_run_periodic = mock.Mock()
237        writer_put = mock.Mock()
238        writer_put.return_value = Response(status=200)
239        writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False)
240        writer.run_periodic = writer_run_periodic
241        writer._put = writer_put
242
243        traces = [
244            [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]
245            for i in range(4)
246        ]
247
248        traces_too_big = [
249            [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)]
250            for i in range(4)
251        ]
252
253        # 1. We write 4 traces successfully.
254        for trace in traces:
255            writer.write(trace)
256        writer.flush_queue()
257
258        payload = msgpack.unpackb(writer_put.call_args.args[0])
259        # No previous drops.
260        assert 0.0 == writer._drop_sma.get()
261        # 4 traces written.
262        assert 4 == len(payload)
263        # 100% of traces kept (refers to the past).
264        # No traces sent before now so 100% kept.
265        for trace in payload:
266            assert 1.0 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1)
267
268        # 2. We fail to write 4 traces because of size limitation.
269        for trace in traces_too_big:
270            writer.write(trace)
271        writer.flush_queue()
272
273        # 50% of traces were dropped historically.
274        # 4 successfully written before and 4 dropped now.
275        assert 0.5 == writer._drop_sma.get()
276        # put not called since no new traces are available.
277        writer_put.assert_called_once()
278
279        # 3. We write 2 traces successfully.
280        for trace in traces[:2]:
281            writer.write(trace)
282        writer.flush_queue()
283
284        payload = msgpack.unpackb(writer_put.call_args.args[0])
285        # 40% of traces were dropped historically.
286        assert 0.4 == writer._drop_sma.get()
287        # 2 traces written.
288        assert 2 == len(payload)
289        # 50% of traces kept (refers to the past).
290        # We had 4 successfully written and 4 dropped.
291        for trace in payload:
292            assert 0.5 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1)
293
294        # 4. We write 1 trace successfully and fail to write 3.
295        writer.write(traces[0])
296        for trace in traces_too_big[:3]:
297            writer.write(trace)
298        writer.flush_queue()
299
300        payload = msgpack.unpackb(writer_put.call_args.args[0])
301        # 50% of traces were dropped historically.
302        assert 0.5 == writer._drop_sma.get()
303        # 1 trace written.
304        assert 1 == len(payload)
305        # 60% of traces kept (refers to the past).
306        # We had 4 successfully written, then 4 dropped, then 2 written.
307        for trace in payload:
308            assert 0.6 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1)
309
310
311class LogWriterTests(BaseTestCase):
312    N_TRACES = 11
313
314    def create_writer(self):
315        self.output = DummyOutput()
316        writer = LogWriter(out=self.output)
317        for i in range(self.N_TRACES):
318            writer.write(
319                [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(7)]
320            )
321        return writer
322
323    def test_log_writer(self):
324        self.create_writer()
325        self.assertEqual(len(self.output.entries), self.N_TRACES)
326
327
328def test_humansize():
329    assert _human_size(0) == "0B"
330    assert _human_size(999) == "999B"
331    assert _human_size(1000) == "1KB"
332    assert _human_size(10000) == "10KB"
333    assert _human_size(100000) == "100KB"
334    assert _human_size(1000000) == "1MB"
335    assert _human_size(10000000) == "10MB"
336    assert _human_size(1000000000) == "1GB"
337
338
339class _BaseHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
340    error_message_format = "%(message)s\n"
341    error_content_type = "text/plain"
342
343    @staticmethod
344    def log_message(format, *args):  # noqa: A002
345        pass
346
347
348class _APIEndpointRequestHandlerTest(_BaseHTTPRequestHandler):
349
350    expected_path_prefix = None
351
352    def do_PUT(self):
353        if self.expected_path_prefix is not None:
354            assert self.path.startswith(self.expected_path_prefix)
355        self.send_error(200, "OK")
356
357
358class _TimeoutAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler):
359    def do_PUT(self):
360        # This server sleeps longer than our timeout
361        time.sleep(5)
362
363
364class _ResetAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler):
365    def do_PUT(self):
366        return
367
368
369_HOST = "0.0.0.0"
370_PORT = 8743
371_TIMEOUT_PORT = _PORT + 1
372_RESET_PORT = _TIMEOUT_PORT + 1
373
374
375class UDSHTTPServer(socketserver.UnixStreamServer, BaseHTTPServer.HTTPServer):
376    def server_bind(self):
377        BaseHTTPServer.HTTPServer.server_bind(self)
378
379
380def _make_uds_server(path, request_handler):
381    server = UDSHTTPServer(path, request_handler)
382    t = threading.Thread(target=server.serve_forever)
383    # Set daemon just in case something fails
384    t.daemon = True
385    t.start()
386
387    # Wait for the server to start
388    resp = None
389    while resp != 200:
390        conn = UDSHTTPConnection(server.server_address, _HOST, 2019)
391        try:
392            conn.request("PUT", "/")
393            resp = get_connection_response(conn).status
394        finally:
395            conn.close()
396        time.sleep(0.01)
397
398    return server, t
399
400
401@pytest.fixture
402def endpoint_uds_server():
403    socket_name = tempfile.mktemp()
404    handler = _APIEndpointRequestHandlerTest
405    server, thread = _make_uds_server(socket_name, handler)
406    handler.expected_path_prefix = "/v0."
407    try:
408        yield server
409    finally:
410        handler.expected_path_prefix = None
411        server.shutdown()
412        thread.join()
413        os.unlink(socket_name)
414
415
416def _make_server(port, request_handler):
417    server = BaseHTTPServer.HTTPServer((_HOST, port), request_handler)
418    t = threading.Thread(target=server.serve_forever)
419    # Set daemon just in case something fails
420    t.daemon = True
421    t.start()
422    return server, t
423
424
425@pytest.fixture(scope="module")
426def endpoint_test_timeout_server():
427    server, thread = _make_server(_TIMEOUT_PORT, _TimeoutAPIEndpointRequestHandlerTest)
428    try:
429        yield thread
430    finally:
431        server.shutdown()
432        thread.join()
433
434
435@pytest.fixture(scope="module")
436def endpoint_test_reset_server():
437    server, thread = _make_server(_RESET_PORT, _ResetAPIEndpointRequestHandlerTest)
438    try:
439        yield thread
440    finally:
441        server.shutdown()
442        thread.join()
443
444
445@pytest.fixture
446def endpoint_assert_path():
447    handler = _APIEndpointRequestHandlerTest
448    server, thread = _make_server(_PORT, handler)
449
450    def configure(expected_path_prefix=None):
451        handler.expected_path_prefix = expected_path_prefix
452        return thread
453
454    try:
455        yield configure
456    finally:
457        handler.expected_path_prefix = None
458        server.shutdown()
459        thread.join()
460
461
462def test_agent_url_path(endpoint_assert_path):
463    # test without base path
464    endpoint_assert_path("/v0.")
465    writer = AgentWriter(agent_url="http://%s:%s/" % (_HOST, _PORT))
466    writer._encoder.put([Span(None, "foobar")])
467    writer.flush_queue(raise_exc=True)
468
469    # test without base path nor trailing slash
470    writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _PORT))
471    writer._encoder.put([Span(None, "foobar")])
472    writer.flush_queue(raise_exc=True)
473
474    # test with a base path
475    endpoint_assert_path("/test/v0.")
476    writer = AgentWriter(agent_url="http://%s:%s/test/" % (_HOST, _PORT))
477    writer._encoder.put([Span(None, "foobar")])
478    writer.flush_queue(raise_exc=True)
479
480
481def test_flush_connection_timeout_connect():
482    writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, 2019))
483    if PY3:
484        exc_type = OSError
485    else:
486        exc_type = socket.error
487    with pytest.raises(exc_type):
488        writer._encoder.put([Span(None, "foobar")])
489        writer.flush_queue(raise_exc=True)
490
491
492def test_flush_connection_timeout(endpoint_test_timeout_server):
493    writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _TIMEOUT_PORT))
494    with pytest.raises(socket.timeout):
495        writer._encoder.put([Span(None, "foobar")])
496        writer.flush_queue(raise_exc=True)
497
498
499def test_flush_connection_reset(endpoint_test_reset_server):
500    writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _RESET_PORT))
501    if PY3:
502        exc_types = (httplib.BadStatusLine, ConnectionResetError)
503    else:
504        exc_types = (httplib.BadStatusLine,)
505    with pytest.raises(exc_types):
506        writer._encoder.put([Span(None, "foobar")])
507        writer.flush_queue(raise_exc=True)
508
509
510def test_flush_connection_uds(endpoint_uds_server):
511    writer = AgentWriter(agent_url="unix://%s" % endpoint_uds_server.server_address)
512    writer._encoder.put([Span(None, "foobar")])
513    writer.flush_queue(raise_exc=True)
514
515
516def test_flush_queue_raise():
517    writer = AgentWriter(agent_url="http://dne:1234")
518
519    # Should not raise
520    writer.write([])
521    writer.flush_queue(raise_exc=False)
522
523    error = OSError if PY3 else IOError
524    with pytest.raises(error):
525        writer.write([])
526        writer.flush_queue(raise_exc=True)
527
528
529def test_racing_start():
530    writer = AgentWriter(agent_url="http://dne:1234")
531
532    def do_write(i):
533        writer.write([Span(None, str(i))])
534
535    ts = [threading.Thread(target=do_write, args=(i,)) for i in range(100)]
536    for t in ts:
537        t.start()
538
539    for t in ts:
540        t.join()
541
542    assert len(writer._encoder) == 100
543
544
545def test_additional_headers():
546    with override_env(dict(_DD_TRACE_WRITER_ADDITIONAL_HEADERS="additional-header:additional-value,header2:value2")):
547        writer = AgentWriter(agent_url="http://localhost:9126")
548        assert writer._headers["additional-header"] == "additional-value"
549        assert writer._headers["header2"] == "value2"
550
551
552def test_bad_encoding(monkeypatch):
553    monkeypatch.setenv("DD_TRACE_API_VERSION", "foo")
554
555    with pytest.raises(ValueError):
556        AgentWriter(agent_url="http://localhost:9126")
557
558
559@pytest.mark.parametrize(
560    "init_api_version,api_version,endpoint,encoder_cls",
561    [
562        (None, "v0.3", "v0.3/traces", MSGPACK_ENCODERS["v0.3"]),
563        ("v0.3", "v0.3", "v0.3/traces", MSGPACK_ENCODERS["v0.3"]),
564        ("v0.4", "v0.4", "v0.4/traces", MSGPACK_ENCODERS["v0.4"]),
565        ("v0.5", "v0.5", "v0.5/traces", MSGPACK_ENCODERS["v0.5"]),
566    ],
567)
568def test_writer_recreate_api_version(init_api_version, api_version, endpoint, encoder_cls):
569    writer = AgentWriter(agent_url="http://dne:1234", api_version=init_api_version)
570    assert writer._api_version == api_version
571    assert writer._endpoint == endpoint
572    assert isinstance(writer._encoder, encoder_cls)
573
574    writer = writer.recreate()
575    assert writer._api_version == api_version
576    assert writer._endpoint == endpoint
577    assert isinstance(writer._encoder, encoder_cls)
578