1import os 2import socket 3import tempfile 4import threading 5import time 6 7import mock 8import msgpack 9import pytest 10from six.moves import BaseHTTPServer 11from six.moves import socketserver 12 13from ddtrace.constants import KEEP_SPANS_RATE_KEY 14from ddtrace.internal.compat import PY3 15from ddtrace.internal.compat import get_connection_response 16from ddtrace.internal.compat import httplib 17from ddtrace.internal.encoding import MSGPACK_ENCODERS 18from ddtrace.internal.uds import UDSHTTPConnection 19from ddtrace.internal.writer import AgentWriter 20from ddtrace.internal.writer import LogWriter 21from ddtrace.internal.writer import Response 22from ddtrace.internal.writer import _human_size 23from ddtrace.span import Span 24from tests.utils import AnyInt 25from tests.utils import BaseTestCase 26from tests.utils import override_env 27 28 29class DummyOutput: 30 def __init__(self): 31 self.entries = [] 32 33 def write(self, message): 34 self.entries.append(message) 35 36 def flush(self): 37 pass 38 39 40class AgentWriterTests(BaseTestCase): 41 N_TRACES = 11 42 43 def test_metrics_disabled(self): 44 statsd = mock.Mock() 45 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False) 46 for i in range(10): 47 writer.write( 48 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 49 ) 50 writer.stop() 51 writer.join() 52 53 statsd.increment.assert_not_called() 54 statsd.distribution.assert_not_called() 55 56 def test_metrics_bad_endpoint(self): 57 statsd = mock.Mock() 58 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True) 59 for i in range(10): 60 writer.write( 61 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 62 ) 63 writer.stop() 64 writer.join() 65 66 statsd.distribution.assert_has_calls( 67 [ 68 mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]), 69 mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]), 70 mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]), 71 mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]), 72 mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]), 73 ], 74 any_order=True, 75 ) 76 77 def test_metrics_trace_too_big(self): 78 statsd = mock.Mock() 79 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True) 80 for i in range(10): 81 writer.write( 82 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 83 ) 84 writer.write( 85 [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)] 86 ) 87 writer.stop() 88 writer.join() 89 90 statsd.distribution.assert_has_calls( 91 [ 92 mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]), 93 mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]), 94 mock.call("datadog.tracer.buffer.dropped.traces", 1, tags=["reason:t_too_big"]), 95 mock.call("datadog.tracer.buffer.dropped.bytes", AnyInt(), tags=["reason:t_too_big"]), 96 mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]), 97 mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]), 98 mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]), 99 ], 100 any_order=True, 101 ) 102 103 def test_metrics_multi(self): 104 statsd = mock.Mock() 105 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True) 106 for i in range(10): 107 writer.write( 108 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 109 ) 110 writer.flush_queue() 111 statsd.distribution.assert_has_calls( 112 [ 113 mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]), 114 mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]), 115 mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]), 116 mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]), 117 mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]), 118 ], 119 any_order=True, 120 ) 121 122 statsd.reset_mock() 123 124 for i in range(10): 125 writer.write( 126 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 127 ) 128 writer.stop() 129 writer.join() 130 131 statsd.distribution.assert_has_calls( 132 [ 133 mock.call("datadog.tracer.buffer.accepted.traces", 10, tags=[]), 134 mock.call("datadog.tracer.buffer.accepted.spans", 50, tags=[]), 135 mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]), 136 mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]), 137 mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]), 138 ], 139 any_order=True, 140 ) 141 142 def test_write_sync(self): 143 statsd = mock.Mock() 144 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=True, sync_mode=True) 145 writer.write([Span(tracer=None, name="name", trace_id=1, span_id=j, parent_id=j - 1 or None) for j in range(5)]) 146 statsd.distribution.assert_has_calls( 147 [ 148 mock.call("datadog.tracer.buffer.accepted.traces", 1, tags=[]), 149 mock.call("datadog.tracer.buffer.accepted.spans", 5, tags=[]), 150 mock.call("datadog.tracer.http.requests", writer.RETRY_ATTEMPTS, tags=[]), 151 mock.call("datadog.tracer.http.errors", 1, tags=["type:err"]), 152 mock.call("datadog.tracer.http.dropped.bytes", AnyInt(), tags=[]), 153 ], 154 any_order=True, 155 ) 156 157 def test_drop_reason_bad_endpoint(self): 158 statsd = mock.Mock() 159 writer_metrics_reset = mock.Mock() 160 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False) 161 writer._metrics_reset = writer_metrics_reset 162 for i in range(10): 163 writer.write( 164 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 165 ) 166 writer.stop() 167 writer.join() 168 169 writer_metrics_reset.assert_called_once() 170 171 assert 1 == writer._metrics["http.errors"]["count"] 172 assert 10 == writer._metrics["http.dropped.traces"]["count"] 173 174 def test_drop_reason_trace_too_big(self): 175 statsd = mock.Mock() 176 writer_metrics_reset = mock.Mock() 177 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False) 178 writer._metrics_reset = writer_metrics_reset 179 for i in range(10): 180 writer.write( 181 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 182 ) 183 writer.write( 184 [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)] 185 ) 186 writer.stop() 187 writer.join() 188 189 writer_metrics_reset.assert_called_once() 190 191 assert 1 == writer._metrics["buffer.dropped.traces"]["count"] 192 assert ["reason:t_too_big"] == writer._metrics["buffer.dropped.traces"]["tags"] 193 194 def test_drop_reason_buffer_full(self): 195 statsd = mock.Mock() 196 writer_metrics_reset = mock.Mock() 197 writer = AgentWriter(agent_url="http://asdf:1234", buffer_size=5300, dogstatsd=statsd, report_metrics=False) 198 writer._metrics_reset = writer_metrics_reset 199 for i in range(10): 200 writer.write( 201 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 202 ) 203 writer.write([Span(tracer=None, name="a", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)]) 204 writer.stop() 205 writer.join() 206 207 writer_metrics_reset.assert_called_once() 208 209 assert 1 == writer._metrics["buffer.dropped.traces"]["count"] 210 assert ["reason:full"] == writer._metrics["buffer.dropped.traces"]["tags"] 211 212 def test_drop_reason_encoding_error(self): 213 n_traces = 10 214 statsd = mock.Mock() 215 writer_encoder = mock.Mock() 216 writer_encoder.__len__ = (lambda *args: n_traces).__get__(writer_encoder) 217 writer_metrics_reset = mock.Mock() 218 writer_encoder.encode.side_effect = Exception 219 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False) 220 writer._encoder = writer_encoder 221 writer._metrics_reset = writer_metrics_reset 222 for i in range(n_traces): 223 writer.write( 224 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 225 ) 226 227 writer.stop() 228 writer.join() 229 230 writer_metrics_reset.assert_called_once() 231 232 assert 10 == writer._metrics["encoder.dropped.traces"]["count"] 233 234 def test_keep_rate(self): 235 statsd = mock.Mock() 236 writer_run_periodic = mock.Mock() 237 writer_put = mock.Mock() 238 writer_put.return_value = Response(status=200) 239 writer = AgentWriter(agent_url="http://asdf:1234", dogstatsd=statsd, report_metrics=False) 240 writer.run_periodic = writer_run_periodic 241 writer._put = writer_put 242 243 traces = [ 244 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(5)] 245 for i in range(4) 246 ] 247 248 traces_too_big = [ 249 [Span(tracer=None, name="a" * 5000, trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(2 ** 10)] 250 for i in range(4) 251 ] 252 253 # 1. We write 4 traces successfully. 254 for trace in traces: 255 writer.write(trace) 256 writer.flush_queue() 257 258 payload = msgpack.unpackb(writer_put.call_args.args[0]) 259 # No previous drops. 260 assert 0.0 == writer._drop_sma.get() 261 # 4 traces written. 262 assert 4 == len(payload) 263 # 100% of traces kept (refers to the past). 264 # No traces sent before now so 100% kept. 265 for trace in payload: 266 assert 1.0 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1) 267 268 # 2. We fail to write 4 traces because of size limitation. 269 for trace in traces_too_big: 270 writer.write(trace) 271 writer.flush_queue() 272 273 # 50% of traces were dropped historically. 274 # 4 successfully written before and 4 dropped now. 275 assert 0.5 == writer._drop_sma.get() 276 # put not called since no new traces are available. 277 writer_put.assert_called_once() 278 279 # 3. We write 2 traces successfully. 280 for trace in traces[:2]: 281 writer.write(trace) 282 writer.flush_queue() 283 284 payload = msgpack.unpackb(writer_put.call_args.args[0]) 285 # 40% of traces were dropped historically. 286 assert 0.4 == writer._drop_sma.get() 287 # 2 traces written. 288 assert 2 == len(payload) 289 # 50% of traces kept (refers to the past). 290 # We had 4 successfully written and 4 dropped. 291 for trace in payload: 292 assert 0.5 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1) 293 294 # 4. We write 1 trace successfully and fail to write 3. 295 writer.write(traces[0]) 296 for trace in traces_too_big[:3]: 297 writer.write(trace) 298 writer.flush_queue() 299 300 payload = msgpack.unpackb(writer_put.call_args.args[0]) 301 # 50% of traces were dropped historically. 302 assert 0.5 == writer._drop_sma.get() 303 # 1 trace written. 304 assert 1 == len(payload) 305 # 60% of traces kept (refers to the past). 306 # We had 4 successfully written, then 4 dropped, then 2 written. 307 for trace in payload: 308 assert 0.6 == trace[0]["metrics"].get(KEEP_SPANS_RATE_KEY, -1) 309 310 311class LogWriterTests(BaseTestCase): 312 N_TRACES = 11 313 314 def create_writer(self): 315 self.output = DummyOutput() 316 writer = LogWriter(out=self.output) 317 for i in range(self.N_TRACES): 318 writer.write( 319 [Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None) for j in range(7)] 320 ) 321 return writer 322 323 def test_log_writer(self): 324 self.create_writer() 325 self.assertEqual(len(self.output.entries), self.N_TRACES) 326 327 328def test_humansize(): 329 assert _human_size(0) == "0B" 330 assert _human_size(999) == "999B" 331 assert _human_size(1000) == "1KB" 332 assert _human_size(10000) == "10KB" 333 assert _human_size(100000) == "100KB" 334 assert _human_size(1000000) == "1MB" 335 assert _human_size(10000000) == "10MB" 336 assert _human_size(1000000000) == "1GB" 337 338 339class _BaseHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 340 error_message_format = "%(message)s\n" 341 error_content_type = "text/plain" 342 343 @staticmethod 344 def log_message(format, *args): # noqa: A002 345 pass 346 347 348class _APIEndpointRequestHandlerTest(_BaseHTTPRequestHandler): 349 350 expected_path_prefix = None 351 352 def do_PUT(self): 353 if self.expected_path_prefix is not None: 354 assert self.path.startswith(self.expected_path_prefix) 355 self.send_error(200, "OK") 356 357 358class _TimeoutAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler): 359 def do_PUT(self): 360 # This server sleeps longer than our timeout 361 time.sleep(5) 362 363 364class _ResetAPIEndpointRequestHandlerTest(_BaseHTTPRequestHandler): 365 def do_PUT(self): 366 return 367 368 369_HOST = "0.0.0.0" 370_PORT = 8743 371_TIMEOUT_PORT = _PORT + 1 372_RESET_PORT = _TIMEOUT_PORT + 1 373 374 375class UDSHTTPServer(socketserver.UnixStreamServer, BaseHTTPServer.HTTPServer): 376 def server_bind(self): 377 BaseHTTPServer.HTTPServer.server_bind(self) 378 379 380def _make_uds_server(path, request_handler): 381 server = UDSHTTPServer(path, request_handler) 382 t = threading.Thread(target=server.serve_forever) 383 # Set daemon just in case something fails 384 t.daemon = True 385 t.start() 386 387 # Wait for the server to start 388 resp = None 389 while resp != 200: 390 conn = UDSHTTPConnection(server.server_address, _HOST, 2019) 391 try: 392 conn.request("PUT", "/") 393 resp = get_connection_response(conn).status 394 finally: 395 conn.close() 396 time.sleep(0.01) 397 398 return server, t 399 400 401@pytest.fixture 402def endpoint_uds_server(): 403 socket_name = tempfile.mktemp() 404 handler = _APIEndpointRequestHandlerTest 405 server, thread = _make_uds_server(socket_name, handler) 406 handler.expected_path_prefix = "/v0." 407 try: 408 yield server 409 finally: 410 handler.expected_path_prefix = None 411 server.shutdown() 412 thread.join() 413 os.unlink(socket_name) 414 415 416def _make_server(port, request_handler): 417 server = BaseHTTPServer.HTTPServer((_HOST, port), request_handler) 418 t = threading.Thread(target=server.serve_forever) 419 # Set daemon just in case something fails 420 t.daemon = True 421 t.start() 422 return server, t 423 424 425@pytest.fixture(scope="module") 426def endpoint_test_timeout_server(): 427 server, thread = _make_server(_TIMEOUT_PORT, _TimeoutAPIEndpointRequestHandlerTest) 428 try: 429 yield thread 430 finally: 431 server.shutdown() 432 thread.join() 433 434 435@pytest.fixture(scope="module") 436def endpoint_test_reset_server(): 437 server, thread = _make_server(_RESET_PORT, _ResetAPIEndpointRequestHandlerTest) 438 try: 439 yield thread 440 finally: 441 server.shutdown() 442 thread.join() 443 444 445@pytest.fixture 446def endpoint_assert_path(): 447 handler = _APIEndpointRequestHandlerTest 448 server, thread = _make_server(_PORT, handler) 449 450 def configure(expected_path_prefix=None): 451 handler.expected_path_prefix = expected_path_prefix 452 return thread 453 454 try: 455 yield configure 456 finally: 457 handler.expected_path_prefix = None 458 server.shutdown() 459 thread.join() 460 461 462def test_agent_url_path(endpoint_assert_path): 463 # test without base path 464 endpoint_assert_path("/v0.") 465 writer = AgentWriter(agent_url="http://%s:%s/" % (_HOST, _PORT)) 466 writer._encoder.put([Span(None, "foobar")]) 467 writer.flush_queue(raise_exc=True) 468 469 # test without base path nor trailing slash 470 writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _PORT)) 471 writer._encoder.put([Span(None, "foobar")]) 472 writer.flush_queue(raise_exc=True) 473 474 # test with a base path 475 endpoint_assert_path("/test/v0.") 476 writer = AgentWriter(agent_url="http://%s:%s/test/" % (_HOST, _PORT)) 477 writer._encoder.put([Span(None, "foobar")]) 478 writer.flush_queue(raise_exc=True) 479 480 481def test_flush_connection_timeout_connect(): 482 writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, 2019)) 483 if PY3: 484 exc_type = OSError 485 else: 486 exc_type = socket.error 487 with pytest.raises(exc_type): 488 writer._encoder.put([Span(None, "foobar")]) 489 writer.flush_queue(raise_exc=True) 490 491 492def test_flush_connection_timeout(endpoint_test_timeout_server): 493 writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _TIMEOUT_PORT)) 494 with pytest.raises(socket.timeout): 495 writer._encoder.put([Span(None, "foobar")]) 496 writer.flush_queue(raise_exc=True) 497 498 499def test_flush_connection_reset(endpoint_test_reset_server): 500 writer = AgentWriter(agent_url="http://%s:%s" % (_HOST, _RESET_PORT)) 501 if PY3: 502 exc_types = (httplib.BadStatusLine, ConnectionResetError) 503 else: 504 exc_types = (httplib.BadStatusLine,) 505 with pytest.raises(exc_types): 506 writer._encoder.put([Span(None, "foobar")]) 507 writer.flush_queue(raise_exc=True) 508 509 510def test_flush_connection_uds(endpoint_uds_server): 511 writer = AgentWriter(agent_url="unix://%s" % endpoint_uds_server.server_address) 512 writer._encoder.put([Span(None, "foobar")]) 513 writer.flush_queue(raise_exc=True) 514 515 516def test_flush_queue_raise(): 517 writer = AgentWriter(agent_url="http://dne:1234") 518 519 # Should not raise 520 writer.write([]) 521 writer.flush_queue(raise_exc=False) 522 523 error = OSError if PY3 else IOError 524 with pytest.raises(error): 525 writer.write([]) 526 writer.flush_queue(raise_exc=True) 527 528 529def test_racing_start(): 530 writer = AgentWriter(agent_url="http://dne:1234") 531 532 def do_write(i): 533 writer.write([Span(None, str(i))]) 534 535 ts = [threading.Thread(target=do_write, args=(i,)) for i in range(100)] 536 for t in ts: 537 t.start() 538 539 for t in ts: 540 t.join() 541 542 assert len(writer._encoder) == 100 543 544 545def test_additional_headers(): 546 with override_env(dict(_DD_TRACE_WRITER_ADDITIONAL_HEADERS="additional-header:additional-value,header2:value2")): 547 writer = AgentWriter(agent_url="http://localhost:9126") 548 assert writer._headers["additional-header"] == "additional-value" 549 assert writer._headers["header2"] == "value2" 550 551 552def test_bad_encoding(monkeypatch): 553 monkeypatch.setenv("DD_TRACE_API_VERSION", "foo") 554 555 with pytest.raises(ValueError): 556 AgentWriter(agent_url="http://localhost:9126") 557 558 559@pytest.mark.parametrize( 560 "init_api_version,api_version,endpoint,encoder_cls", 561 [ 562 (None, "v0.3", "v0.3/traces", MSGPACK_ENCODERS["v0.3"]), 563 ("v0.3", "v0.3", "v0.3/traces", MSGPACK_ENCODERS["v0.3"]), 564 ("v0.4", "v0.4", "v0.4/traces", MSGPACK_ENCODERS["v0.4"]), 565 ("v0.5", "v0.5", "v0.5/traces", MSGPACK_ENCODERS["v0.5"]), 566 ], 567) 568def test_writer_recreate_api_version(init_api_version, api_version, endpoint, encoder_cls): 569 writer = AgentWriter(agent_url="http://dne:1234", api_version=init_api_version) 570 assert writer._api_version == api_version 571 assert writer._endpoint == endpoint 572 assert isinstance(writer._encoder, encoder_cls) 573 574 writer = writer.recreate() 575 assert writer._api_version == api_version 576 assert writer._endpoint == endpoint 577 assert isinstance(writer._encoder, encoder_cls) 578