1import contextlib 2import os 3import sys 4import tracemalloc 5import unittest 6from unittest.mock import patch 7from test.support.script_helper import (assert_python_ok, assert_python_failure, 8 interpreter_requires_environment) 9from test import support 10 11try: 12 import _testcapi 13except ImportError: 14 _testcapi = None 15 16 17EMPTY_STRING_SIZE = sys.getsizeof(b'') 18INVALID_NFRAME = (-1, 2**30) 19 20 21def get_frames(nframe, lineno_delta): 22 frames = [] 23 frame = sys._getframe(1) 24 for index in range(nframe): 25 code = frame.f_code 26 lineno = frame.f_lineno + lineno_delta 27 frames.append((code.co_filename, lineno)) 28 lineno_delta = 0 29 frame = frame.f_back 30 if frame is None: 31 break 32 return tuple(frames) 33 34def allocate_bytes(size): 35 nframe = tracemalloc.get_traceback_limit() 36 bytes_len = (size - EMPTY_STRING_SIZE) 37 frames = get_frames(nframe, 1) 38 data = b'x' * bytes_len 39 return data, tracemalloc.Traceback(frames) 40 41def create_snapshots(): 42 traceback_limit = 2 43 44 # _tracemalloc._get_traces() returns a list of (domain, size, 45 # traceback_frames) tuples. traceback_frames is a tuple of (filename, 46 # line_number) tuples. 47 raw_traces = [ 48 (0, 10, (('a.py', 2), ('b.py', 4))), 49 (0, 10, (('a.py', 2), ('b.py', 4))), 50 (0, 10, (('a.py', 2), ('b.py', 4))), 51 52 (1, 2, (('a.py', 5), ('b.py', 4))), 53 54 (2, 66, (('b.py', 1),)), 55 56 (3, 7, (('<unknown>', 0),)), 57 ] 58 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) 59 60 raw_traces2 = [ 61 (0, 10, (('a.py', 2), ('b.py', 4))), 62 (0, 10, (('a.py', 2), ('b.py', 4))), 63 (0, 10, (('a.py', 2), ('b.py', 4))), 64 65 (2, 2, (('a.py', 5), ('b.py', 4))), 66 (2, 5000, (('a.py', 5), ('b.py', 4))), 67 68 (4, 400, (('c.py', 578),)), 69 ] 70 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) 71 72 return (snapshot, snapshot2) 73 74def frame(filename, lineno): 75 return tracemalloc._Frame((filename, lineno)) 76 77def traceback(*frames): 78 return tracemalloc.Traceback(frames) 79 80def traceback_lineno(filename, lineno): 81 return traceback((filename, lineno)) 82 83def traceback_filename(filename): 84 return traceback_lineno(filename, 0) 85 86 87class TestTracemallocEnabled(unittest.TestCase): 88 def setUp(self): 89 if tracemalloc.is_tracing(): 90 self.skipTest("tracemalloc must be stopped before the test") 91 92 tracemalloc.start(1) 93 94 def tearDown(self): 95 tracemalloc.stop() 96 97 def test_get_tracemalloc_memory(self): 98 data = [allocate_bytes(123) for count in range(1000)] 99 size = tracemalloc.get_tracemalloc_memory() 100 self.assertGreaterEqual(size, 0) 101 102 tracemalloc.clear_traces() 103 size2 = tracemalloc.get_tracemalloc_memory() 104 self.assertGreaterEqual(size2, 0) 105 self.assertLessEqual(size2, size) 106 107 def test_get_object_traceback(self): 108 tracemalloc.clear_traces() 109 obj_size = 12345 110 obj, obj_traceback = allocate_bytes(obj_size) 111 traceback = tracemalloc.get_object_traceback(obj) 112 self.assertEqual(traceback, obj_traceback) 113 114 def test_new_reference(self): 115 tracemalloc.clear_traces() 116 # gc.collect() indirectly calls PyList_ClearFreeList() 117 support.gc_collect() 118 119 # Create a list and "destroy it": put it in the PyListObject free list 120 obj = [] 121 obj = None 122 123 # Create a list which should reuse the previously created empty list 124 obj = [] 125 126 nframe = tracemalloc.get_traceback_limit() 127 frames = get_frames(nframe, -3) 128 obj_traceback = tracemalloc.Traceback(frames) 129 130 traceback = tracemalloc.get_object_traceback(obj) 131 self.assertIsNotNone(traceback) 132 self.assertEqual(traceback, obj_traceback) 133 134 def test_set_traceback_limit(self): 135 obj_size = 10 136 137 tracemalloc.stop() 138 self.assertRaises(ValueError, tracemalloc.start, -1) 139 140 tracemalloc.stop() 141 tracemalloc.start(10) 142 obj2, obj2_traceback = allocate_bytes(obj_size) 143 traceback = tracemalloc.get_object_traceback(obj2) 144 self.assertEqual(len(traceback), 10) 145 self.assertEqual(traceback, obj2_traceback) 146 147 tracemalloc.stop() 148 tracemalloc.start(1) 149 obj, obj_traceback = allocate_bytes(obj_size) 150 traceback = tracemalloc.get_object_traceback(obj) 151 self.assertEqual(len(traceback), 1) 152 self.assertEqual(traceback, obj_traceback) 153 154 def find_trace(self, traces, traceback): 155 for trace in traces: 156 if trace[2] == traceback._frames: 157 return trace 158 159 self.fail("trace not found") 160 161 def test_get_traces(self): 162 tracemalloc.clear_traces() 163 obj_size = 12345 164 obj, obj_traceback = allocate_bytes(obj_size) 165 166 traces = tracemalloc._get_traces() 167 trace = self.find_trace(traces, obj_traceback) 168 169 self.assertIsInstance(trace, tuple) 170 domain, size, traceback = trace 171 self.assertEqual(size, obj_size) 172 self.assertEqual(traceback, obj_traceback._frames) 173 174 tracemalloc.stop() 175 self.assertEqual(tracemalloc._get_traces(), []) 176 177 def test_get_traces_intern_traceback(self): 178 # dummy wrappers to get more useful and identical frames in the traceback 179 def allocate_bytes2(size): 180 return allocate_bytes(size) 181 def allocate_bytes3(size): 182 return allocate_bytes2(size) 183 def allocate_bytes4(size): 184 return allocate_bytes3(size) 185 186 # Ensure that two identical tracebacks are not duplicated 187 tracemalloc.stop() 188 tracemalloc.start(4) 189 obj_size = 123 190 obj1, obj1_traceback = allocate_bytes4(obj_size) 191 obj2, obj2_traceback = allocate_bytes4(obj_size) 192 193 traces = tracemalloc._get_traces() 194 195 obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) 196 obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) 197 198 trace1 = self.find_trace(traces, obj1_traceback) 199 trace2 = self.find_trace(traces, obj2_traceback) 200 domain1, size1, traceback1 = trace1 201 domain2, size2, traceback2 = trace2 202 self.assertIs(traceback2, traceback1) 203 204 def test_get_traced_memory(self): 205 # Python allocates some internals objects, so the test must tolerate 206 # a small difference between the expected size and the real usage 207 max_error = 2048 208 209 # allocate one object 210 obj_size = 1024 * 1024 211 tracemalloc.clear_traces() 212 obj, obj_traceback = allocate_bytes(obj_size) 213 size, peak_size = tracemalloc.get_traced_memory() 214 self.assertGreaterEqual(size, obj_size) 215 self.assertGreaterEqual(peak_size, size) 216 217 self.assertLessEqual(size - obj_size, max_error) 218 self.assertLessEqual(peak_size - size, max_error) 219 220 # destroy the object 221 obj = None 222 size2, peak_size2 = tracemalloc.get_traced_memory() 223 self.assertLess(size2, size) 224 self.assertGreaterEqual(size - size2, obj_size - max_error) 225 self.assertGreaterEqual(peak_size2, peak_size) 226 227 # clear_traces() must reset traced memory counters 228 tracemalloc.clear_traces() 229 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 230 231 # allocate another object 232 obj, obj_traceback = allocate_bytes(obj_size) 233 size, peak_size = tracemalloc.get_traced_memory() 234 self.assertGreaterEqual(size, obj_size) 235 236 # stop() also resets traced memory counters 237 tracemalloc.stop() 238 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 239 240 def test_clear_traces(self): 241 obj, obj_traceback = allocate_bytes(123) 242 traceback = tracemalloc.get_object_traceback(obj) 243 self.assertIsNotNone(traceback) 244 245 tracemalloc.clear_traces() 246 traceback2 = tracemalloc.get_object_traceback(obj) 247 self.assertIsNone(traceback2) 248 249 def test_is_tracing(self): 250 tracemalloc.stop() 251 self.assertFalse(tracemalloc.is_tracing()) 252 253 tracemalloc.start() 254 self.assertTrue(tracemalloc.is_tracing()) 255 256 def test_snapshot(self): 257 obj, source = allocate_bytes(123) 258 259 # take a snapshot 260 snapshot = tracemalloc.take_snapshot() 261 262 # write on disk 263 snapshot.dump(support.TESTFN) 264 self.addCleanup(support.unlink, support.TESTFN) 265 266 # load from disk 267 snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) 268 self.assertEqual(snapshot2.traces, snapshot.traces) 269 270 # tracemalloc must be tracing memory allocations to take a snapshot 271 tracemalloc.stop() 272 with self.assertRaises(RuntimeError) as cm: 273 tracemalloc.take_snapshot() 274 self.assertEqual(str(cm.exception), 275 "the tracemalloc module must be tracing memory " 276 "allocations to take a snapshot") 277 278 def test_snapshot_save_attr(self): 279 # take a snapshot with a new attribute 280 snapshot = tracemalloc.take_snapshot() 281 snapshot.test_attr = "new" 282 snapshot.dump(support.TESTFN) 283 self.addCleanup(support.unlink, support.TESTFN) 284 285 # load() should recreate the attribute 286 snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) 287 self.assertEqual(snapshot2.test_attr, "new") 288 289 def fork_child(self): 290 if not tracemalloc.is_tracing(): 291 return 2 292 293 obj_size = 12345 294 obj, obj_traceback = allocate_bytes(obj_size) 295 traceback = tracemalloc.get_object_traceback(obj) 296 if traceback is None: 297 return 3 298 299 # everything is fine 300 return 0 301 302 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') 303 def test_fork(self): 304 # check that tracemalloc is still working after fork 305 pid = os.fork() 306 if not pid: 307 # child 308 exitcode = 1 309 try: 310 exitcode = self.fork_child() 311 finally: 312 os._exit(exitcode) 313 else: 314 pid2, status = os.waitpid(pid, 0) 315 self.assertTrue(os.WIFEXITED(status)) 316 exitcode = os.WEXITSTATUS(status) 317 self.assertEqual(exitcode, 0) 318 319 320class TestSnapshot(unittest.TestCase): 321 maxDiff = 4000 322 323 def test_create_snapshot(self): 324 raw_traces = [(0, 5, (('a.py', 2),))] 325 326 with contextlib.ExitStack() as stack: 327 stack.enter_context(patch.object(tracemalloc, 'is_tracing', 328 return_value=True)) 329 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', 330 return_value=5)) 331 stack.enter_context(patch.object(tracemalloc, '_get_traces', 332 return_value=raw_traces)) 333 334 snapshot = tracemalloc.take_snapshot() 335 self.assertEqual(snapshot.traceback_limit, 5) 336 self.assertEqual(len(snapshot.traces), 1) 337 trace = snapshot.traces[0] 338 self.assertEqual(trace.size, 5) 339 self.assertEqual(len(trace.traceback), 1) 340 self.assertEqual(trace.traceback[0].filename, 'a.py') 341 self.assertEqual(trace.traceback[0].lineno, 2) 342 343 def test_filter_traces(self): 344 snapshot, snapshot2 = create_snapshots() 345 filter1 = tracemalloc.Filter(False, "b.py") 346 filter2 = tracemalloc.Filter(True, "a.py", 2) 347 filter3 = tracemalloc.Filter(True, "a.py", 5) 348 349 original_traces = list(snapshot.traces._traces) 350 351 # exclude b.py 352 snapshot3 = snapshot.filter_traces((filter1,)) 353 self.assertEqual(snapshot3.traces._traces, [ 354 (0, 10, (('a.py', 2), ('b.py', 4))), 355 (0, 10, (('a.py', 2), ('b.py', 4))), 356 (0, 10, (('a.py', 2), ('b.py', 4))), 357 (1, 2, (('a.py', 5), ('b.py', 4))), 358 (3, 7, (('<unknown>', 0),)), 359 ]) 360 361 # filter_traces() must not touch the original snapshot 362 self.assertEqual(snapshot.traces._traces, original_traces) 363 364 # only include two lines of a.py 365 snapshot4 = snapshot3.filter_traces((filter2, filter3)) 366 self.assertEqual(snapshot4.traces._traces, [ 367 (0, 10, (('a.py', 2), ('b.py', 4))), 368 (0, 10, (('a.py', 2), ('b.py', 4))), 369 (0, 10, (('a.py', 2), ('b.py', 4))), 370 (1, 2, (('a.py', 5), ('b.py', 4))), 371 ]) 372 373 # No filter: just duplicate the snapshot 374 snapshot5 = snapshot.filter_traces(()) 375 self.assertIsNot(snapshot5, snapshot) 376 self.assertIsNot(snapshot5.traces, snapshot.traces) 377 self.assertEqual(snapshot5.traces, snapshot.traces) 378 379 self.assertRaises(TypeError, snapshot.filter_traces, filter1) 380 381 def test_filter_traces_domain(self): 382 snapshot, snapshot2 = create_snapshots() 383 filter1 = tracemalloc.Filter(False, "a.py", domain=1) 384 filter2 = tracemalloc.Filter(True, "a.py", domain=1) 385 386 original_traces = list(snapshot.traces._traces) 387 388 # exclude a.py of domain 1 389 snapshot3 = snapshot.filter_traces((filter1,)) 390 self.assertEqual(snapshot3.traces._traces, [ 391 (0, 10, (('a.py', 2), ('b.py', 4))), 392 (0, 10, (('a.py', 2), ('b.py', 4))), 393 (0, 10, (('a.py', 2), ('b.py', 4))), 394 (2, 66, (('b.py', 1),)), 395 (3, 7, (('<unknown>', 0),)), 396 ]) 397 398 # include domain 1 399 snapshot3 = snapshot.filter_traces((filter1,)) 400 self.assertEqual(snapshot3.traces._traces, [ 401 (0, 10, (('a.py', 2), ('b.py', 4))), 402 (0, 10, (('a.py', 2), ('b.py', 4))), 403 (0, 10, (('a.py', 2), ('b.py', 4))), 404 (2, 66, (('b.py', 1),)), 405 (3, 7, (('<unknown>', 0),)), 406 ]) 407 408 def test_filter_traces_domain_filter(self): 409 snapshot, snapshot2 = create_snapshots() 410 filter1 = tracemalloc.DomainFilter(False, domain=3) 411 filter2 = tracemalloc.DomainFilter(True, domain=3) 412 413 # exclude domain 2 414 snapshot3 = snapshot.filter_traces((filter1,)) 415 self.assertEqual(snapshot3.traces._traces, [ 416 (0, 10, (('a.py', 2), ('b.py', 4))), 417 (0, 10, (('a.py', 2), ('b.py', 4))), 418 (0, 10, (('a.py', 2), ('b.py', 4))), 419 (1, 2, (('a.py', 5), ('b.py', 4))), 420 (2, 66, (('b.py', 1),)), 421 ]) 422 423 # include domain 2 424 snapshot3 = snapshot.filter_traces((filter2,)) 425 self.assertEqual(snapshot3.traces._traces, [ 426 (3, 7, (('<unknown>', 0),)), 427 ]) 428 429 def test_snapshot_group_by_line(self): 430 snapshot, snapshot2 = create_snapshots() 431 tb_0 = traceback_lineno('<unknown>', 0) 432 tb_a_2 = traceback_lineno('a.py', 2) 433 tb_a_5 = traceback_lineno('a.py', 5) 434 tb_b_1 = traceback_lineno('b.py', 1) 435 tb_c_578 = traceback_lineno('c.py', 578) 436 437 # stats per file and line 438 stats1 = snapshot.statistics('lineno') 439 self.assertEqual(stats1, [ 440 tracemalloc.Statistic(tb_b_1, 66, 1), 441 tracemalloc.Statistic(tb_a_2, 30, 3), 442 tracemalloc.Statistic(tb_0, 7, 1), 443 tracemalloc.Statistic(tb_a_5, 2, 1), 444 ]) 445 446 # stats per file and line (2) 447 stats2 = snapshot2.statistics('lineno') 448 self.assertEqual(stats2, [ 449 tracemalloc.Statistic(tb_a_5, 5002, 2), 450 tracemalloc.Statistic(tb_c_578, 400, 1), 451 tracemalloc.Statistic(tb_a_2, 30, 3), 452 ]) 453 454 # stats diff per file and line 455 statistics = snapshot2.compare_to(snapshot, 'lineno') 456 self.assertEqual(statistics, [ 457 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), 458 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), 459 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), 460 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 461 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), 462 ]) 463 464 def test_snapshot_group_by_file(self): 465 snapshot, snapshot2 = create_snapshots() 466 tb_0 = traceback_filename('<unknown>') 467 tb_a = traceback_filename('a.py') 468 tb_b = traceback_filename('b.py') 469 tb_c = traceback_filename('c.py') 470 471 # stats per file 472 stats1 = snapshot.statistics('filename') 473 self.assertEqual(stats1, [ 474 tracemalloc.Statistic(tb_b, 66, 1), 475 tracemalloc.Statistic(tb_a, 32, 4), 476 tracemalloc.Statistic(tb_0, 7, 1), 477 ]) 478 479 # stats per file (2) 480 stats2 = snapshot2.statistics('filename') 481 self.assertEqual(stats2, [ 482 tracemalloc.Statistic(tb_a, 5032, 5), 483 tracemalloc.Statistic(tb_c, 400, 1), 484 ]) 485 486 # stats diff per file 487 diff = snapshot2.compare_to(snapshot, 'filename') 488 self.assertEqual(diff, [ 489 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), 490 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), 491 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), 492 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 493 ]) 494 495 def test_snapshot_group_by_traceback(self): 496 snapshot, snapshot2 = create_snapshots() 497 498 # stats per file 499 tb1 = traceback(('a.py', 2), ('b.py', 4)) 500 tb2 = traceback(('a.py', 5), ('b.py', 4)) 501 tb3 = traceback(('b.py', 1)) 502 tb4 = traceback(('<unknown>', 0)) 503 stats1 = snapshot.statistics('traceback') 504 self.assertEqual(stats1, [ 505 tracemalloc.Statistic(tb3, 66, 1), 506 tracemalloc.Statistic(tb1, 30, 3), 507 tracemalloc.Statistic(tb4, 7, 1), 508 tracemalloc.Statistic(tb2, 2, 1), 509 ]) 510 511 # stats per file (2) 512 tb5 = traceback(('c.py', 578)) 513 stats2 = snapshot2.statistics('traceback') 514 self.assertEqual(stats2, [ 515 tracemalloc.Statistic(tb2, 5002, 2), 516 tracemalloc.Statistic(tb5, 400, 1), 517 tracemalloc.Statistic(tb1, 30, 3), 518 ]) 519 520 # stats diff per file 521 diff = snapshot2.compare_to(snapshot, 'traceback') 522 self.assertEqual(diff, [ 523 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), 524 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), 525 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), 526 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), 527 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), 528 ]) 529 530 self.assertRaises(ValueError, 531 snapshot.statistics, 'traceback', cumulative=True) 532 533 def test_snapshot_group_by_cumulative(self): 534 snapshot, snapshot2 = create_snapshots() 535 tb_0 = traceback_filename('<unknown>') 536 tb_a = traceback_filename('a.py') 537 tb_b = traceback_filename('b.py') 538 tb_a_2 = traceback_lineno('a.py', 2) 539 tb_a_5 = traceback_lineno('a.py', 5) 540 tb_b_1 = traceback_lineno('b.py', 1) 541 tb_b_4 = traceback_lineno('b.py', 4) 542 543 # per file 544 stats = snapshot.statistics('filename', True) 545 self.assertEqual(stats, [ 546 tracemalloc.Statistic(tb_b, 98, 5), 547 tracemalloc.Statistic(tb_a, 32, 4), 548 tracemalloc.Statistic(tb_0, 7, 1), 549 ]) 550 551 # per line 552 stats = snapshot.statistics('lineno', True) 553 self.assertEqual(stats, [ 554 tracemalloc.Statistic(tb_b_1, 66, 1), 555 tracemalloc.Statistic(tb_b_4, 32, 4), 556 tracemalloc.Statistic(tb_a_2, 30, 3), 557 tracemalloc.Statistic(tb_0, 7, 1), 558 tracemalloc.Statistic(tb_a_5, 2, 1), 559 ]) 560 561 def test_trace_format(self): 562 snapshot, snapshot2 = create_snapshots() 563 trace = snapshot.traces[0] 564 self.assertEqual(str(trace), 'b.py:4: 10 B') 565 traceback = trace.traceback 566 self.assertEqual(str(traceback), 'b.py:4') 567 frame = traceback[0] 568 self.assertEqual(str(frame), 'b.py:4') 569 570 def test_statistic_format(self): 571 snapshot, snapshot2 = create_snapshots() 572 stats = snapshot.statistics('lineno') 573 stat = stats[0] 574 self.assertEqual(str(stat), 575 'b.py:1: size=66 B, count=1, average=66 B') 576 577 def test_statistic_diff_format(self): 578 snapshot, snapshot2 = create_snapshots() 579 stats = snapshot2.compare_to(snapshot, 'lineno') 580 stat = stats[0] 581 self.assertEqual(str(stat), 582 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') 583 584 def test_slices(self): 585 snapshot, snapshot2 = create_snapshots() 586 self.assertEqual(snapshot.traces[:2], 587 (snapshot.traces[0], snapshot.traces[1])) 588 589 traceback = snapshot.traces[0].traceback 590 self.assertEqual(traceback[:2], 591 (traceback[0], traceback[1])) 592 593 def test_format_traceback(self): 594 snapshot, snapshot2 = create_snapshots() 595 def getline(filename, lineno): 596 return ' <%s, %s>' % (filename, lineno) 597 with unittest.mock.patch('tracemalloc.linecache.getline', 598 side_effect=getline): 599 tb = snapshot.traces[0].traceback 600 self.assertEqual(tb.format(), 601 [' File "b.py", line 4', 602 ' <b.py, 4>', 603 ' File "a.py", line 2', 604 ' <a.py, 2>']) 605 606 self.assertEqual(tb.format(limit=1), 607 [' File "a.py", line 2', 608 ' <a.py, 2>']) 609 610 self.assertEqual(tb.format(limit=-1), 611 [' File "b.py", line 4', 612 ' <b.py, 4>']) 613 614 self.assertEqual(tb.format(most_recent_first=True), 615 [' File "a.py", line 2', 616 ' <a.py, 2>', 617 ' File "b.py", line 4', 618 ' <b.py, 4>']) 619 620 self.assertEqual(tb.format(limit=1, most_recent_first=True), 621 [' File "a.py", line 2', 622 ' <a.py, 2>']) 623 624 self.assertEqual(tb.format(limit=-1, most_recent_first=True), 625 [' File "b.py", line 4', 626 ' <b.py, 4>']) 627 628 629class TestFilters(unittest.TestCase): 630 maxDiff = 2048 631 632 def test_filter_attributes(self): 633 # test default values 634 f = tracemalloc.Filter(True, "abc") 635 self.assertEqual(f.inclusive, True) 636 self.assertEqual(f.filename_pattern, "abc") 637 self.assertIsNone(f.lineno) 638 self.assertEqual(f.all_frames, False) 639 640 # test custom values 641 f = tracemalloc.Filter(False, "test.py", 123, True) 642 self.assertEqual(f.inclusive, False) 643 self.assertEqual(f.filename_pattern, "test.py") 644 self.assertEqual(f.lineno, 123) 645 self.assertEqual(f.all_frames, True) 646 647 # parameters passed by keyword 648 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) 649 self.assertEqual(f.inclusive, False) 650 self.assertEqual(f.filename_pattern, "test.py") 651 self.assertEqual(f.lineno, 123) 652 self.assertEqual(f.all_frames, True) 653 654 # read-only attribute 655 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") 656 657 def test_filter_match(self): 658 # filter without line number 659 f = tracemalloc.Filter(True, "abc") 660 self.assertTrue(f._match_frame("abc", 0)) 661 self.assertTrue(f._match_frame("abc", 5)) 662 self.assertTrue(f._match_frame("abc", 10)) 663 self.assertFalse(f._match_frame("12356", 0)) 664 self.assertFalse(f._match_frame("12356", 5)) 665 self.assertFalse(f._match_frame("12356", 10)) 666 667 f = tracemalloc.Filter(False, "abc") 668 self.assertFalse(f._match_frame("abc", 0)) 669 self.assertFalse(f._match_frame("abc", 5)) 670 self.assertFalse(f._match_frame("abc", 10)) 671 self.assertTrue(f._match_frame("12356", 0)) 672 self.assertTrue(f._match_frame("12356", 5)) 673 self.assertTrue(f._match_frame("12356", 10)) 674 675 # filter with line number > 0 676 f = tracemalloc.Filter(True, "abc", 5) 677 self.assertFalse(f._match_frame("abc", 0)) 678 self.assertTrue(f._match_frame("abc", 5)) 679 self.assertFalse(f._match_frame("abc", 10)) 680 self.assertFalse(f._match_frame("12356", 0)) 681 self.assertFalse(f._match_frame("12356", 5)) 682 self.assertFalse(f._match_frame("12356", 10)) 683 684 f = tracemalloc.Filter(False, "abc", 5) 685 self.assertTrue(f._match_frame("abc", 0)) 686 self.assertFalse(f._match_frame("abc", 5)) 687 self.assertTrue(f._match_frame("abc", 10)) 688 self.assertTrue(f._match_frame("12356", 0)) 689 self.assertTrue(f._match_frame("12356", 5)) 690 self.assertTrue(f._match_frame("12356", 10)) 691 692 # filter with line number 0 693 f = tracemalloc.Filter(True, "abc", 0) 694 self.assertTrue(f._match_frame("abc", 0)) 695 self.assertFalse(f._match_frame("abc", 5)) 696 self.assertFalse(f._match_frame("abc", 10)) 697 self.assertFalse(f._match_frame("12356", 0)) 698 self.assertFalse(f._match_frame("12356", 5)) 699 self.assertFalse(f._match_frame("12356", 10)) 700 701 f = tracemalloc.Filter(False, "abc", 0) 702 self.assertFalse(f._match_frame("abc", 0)) 703 self.assertTrue(f._match_frame("abc", 5)) 704 self.assertTrue(f._match_frame("abc", 10)) 705 self.assertTrue(f._match_frame("12356", 0)) 706 self.assertTrue(f._match_frame("12356", 5)) 707 self.assertTrue(f._match_frame("12356", 10)) 708 709 def test_filter_match_filename(self): 710 def fnmatch(inclusive, filename, pattern): 711 f = tracemalloc.Filter(inclusive, pattern) 712 return f._match_frame(filename, 0) 713 714 self.assertTrue(fnmatch(True, "abc", "abc")) 715 self.assertFalse(fnmatch(True, "12356", "abc")) 716 self.assertFalse(fnmatch(True, "<unknown>", "abc")) 717 718 self.assertFalse(fnmatch(False, "abc", "abc")) 719 self.assertTrue(fnmatch(False, "12356", "abc")) 720 self.assertTrue(fnmatch(False, "<unknown>", "abc")) 721 722 def test_filter_match_filename_joker(self): 723 def fnmatch(filename, pattern): 724 filter = tracemalloc.Filter(True, pattern) 725 return filter._match_frame(filename, 0) 726 727 # empty string 728 self.assertFalse(fnmatch('abc', '')) 729 self.assertFalse(fnmatch('', 'abc')) 730 self.assertTrue(fnmatch('', '')) 731 self.assertTrue(fnmatch('', '*')) 732 733 # no * 734 self.assertTrue(fnmatch('abc', 'abc')) 735 self.assertFalse(fnmatch('abc', 'abcd')) 736 self.assertFalse(fnmatch('abc', 'def')) 737 738 # a* 739 self.assertTrue(fnmatch('abc', 'a*')) 740 self.assertTrue(fnmatch('abc', 'abc*')) 741 self.assertFalse(fnmatch('abc', 'b*')) 742 self.assertFalse(fnmatch('abc', 'abcd*')) 743 744 # a*b 745 self.assertTrue(fnmatch('abc', 'a*c')) 746 self.assertTrue(fnmatch('abcdcx', 'a*cx')) 747 self.assertFalse(fnmatch('abb', 'a*c')) 748 self.assertFalse(fnmatch('abcdce', 'a*cx')) 749 750 # a*b*c 751 self.assertTrue(fnmatch('abcde', 'a*c*e')) 752 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) 753 self.assertFalse(fnmatch('abcdd', 'a*c*e')) 754 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) 755 756 # replace .pyc suffix with .py 757 self.assertTrue(fnmatch('a.pyc', 'a.py')) 758 self.assertTrue(fnmatch('a.py', 'a.pyc')) 759 760 if os.name == 'nt': 761 # case insensitive 762 self.assertTrue(fnmatch('aBC', 'ABc')) 763 self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) 764 765 self.assertTrue(fnmatch('a.pyc', 'a.PY')) 766 self.assertTrue(fnmatch('a.py', 'a.PYC')) 767 else: 768 # case sensitive 769 self.assertFalse(fnmatch('aBC', 'ABc')) 770 self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) 771 772 self.assertFalse(fnmatch('a.pyc', 'a.PY')) 773 self.assertFalse(fnmatch('a.py', 'a.PYC')) 774 775 if os.name == 'nt': 776 # normalize alternate separator "/" to the standard separator "\" 777 self.assertTrue(fnmatch(r'a/b', r'a\b')) 778 self.assertTrue(fnmatch(r'a\b', r'a/b')) 779 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) 780 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) 781 else: 782 # there is no alternate separator 783 self.assertFalse(fnmatch(r'a/b', r'a\b')) 784 self.assertFalse(fnmatch(r'a\b', r'a/b')) 785 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) 786 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) 787 788 # as of 3.5, .pyo is no longer munged to .py 789 self.assertFalse(fnmatch('a.pyo', 'a.py')) 790 791 def test_filter_match_trace(self): 792 t1 = (("a.py", 2), ("b.py", 3)) 793 t2 = (("b.py", 4), ("b.py", 5)) 794 t3 = (("c.py", 5), ('<unknown>', 0)) 795 unknown = (('<unknown>', 0),) 796 797 f = tracemalloc.Filter(True, "b.py", all_frames=True) 798 self.assertTrue(f._match_traceback(t1)) 799 self.assertTrue(f._match_traceback(t2)) 800 self.assertFalse(f._match_traceback(t3)) 801 self.assertFalse(f._match_traceback(unknown)) 802 803 f = tracemalloc.Filter(True, "b.py", all_frames=False) 804 self.assertFalse(f._match_traceback(t1)) 805 self.assertTrue(f._match_traceback(t2)) 806 self.assertFalse(f._match_traceback(t3)) 807 self.assertFalse(f._match_traceback(unknown)) 808 809 f = tracemalloc.Filter(False, "b.py", all_frames=True) 810 self.assertFalse(f._match_traceback(t1)) 811 self.assertFalse(f._match_traceback(t2)) 812 self.assertTrue(f._match_traceback(t3)) 813 self.assertTrue(f._match_traceback(unknown)) 814 815 f = tracemalloc.Filter(False, "b.py", all_frames=False) 816 self.assertTrue(f._match_traceback(t1)) 817 self.assertFalse(f._match_traceback(t2)) 818 self.assertTrue(f._match_traceback(t3)) 819 self.assertTrue(f._match_traceback(unknown)) 820 821 f = tracemalloc.Filter(False, "<unknown>", all_frames=False) 822 self.assertTrue(f._match_traceback(t1)) 823 self.assertTrue(f._match_traceback(t2)) 824 self.assertTrue(f._match_traceback(t3)) 825 self.assertFalse(f._match_traceback(unknown)) 826 827 f = tracemalloc.Filter(True, "<unknown>", all_frames=True) 828 self.assertFalse(f._match_traceback(t1)) 829 self.assertFalse(f._match_traceback(t2)) 830 self.assertTrue(f._match_traceback(t3)) 831 self.assertTrue(f._match_traceback(unknown)) 832 833 f = tracemalloc.Filter(False, "<unknown>", all_frames=True) 834 self.assertTrue(f._match_traceback(t1)) 835 self.assertTrue(f._match_traceback(t2)) 836 self.assertFalse(f._match_traceback(t3)) 837 self.assertFalse(f._match_traceback(unknown)) 838 839 840class TestCommandLine(unittest.TestCase): 841 def test_env_var_disabled_by_default(self): 842 # not tracing by default 843 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 844 ok, stdout, stderr = assert_python_ok('-c', code) 845 stdout = stdout.rstrip() 846 self.assertEqual(stdout, b'False') 847 848 @unittest.skipIf(interpreter_requires_environment(), 849 'Cannot run -E tests when PYTHON env vars are required.') 850 def test_env_var_ignored_with_E(self): 851 """PYTHON* environment variables must be ignored when -E is present.""" 852 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 853 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') 854 stdout = stdout.rstrip() 855 self.assertEqual(stdout, b'False') 856 857 def test_env_var_disabled(self): 858 # tracing at startup 859 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 860 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') 861 stdout = stdout.rstrip() 862 self.assertEqual(stdout, b'False') 863 864 def test_env_var_enabled_at_startup(self): 865 # tracing at startup 866 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 867 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') 868 stdout = stdout.rstrip() 869 self.assertEqual(stdout, b'True') 870 871 def test_env_limit(self): 872 # start and set the number of frames 873 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 874 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') 875 stdout = stdout.rstrip() 876 self.assertEqual(stdout, b'10') 877 878 def check_env_var_invalid(self, nframe): 879 with support.SuppressCrashReport(): 880 ok, stdout, stderr = assert_python_failure( 881 '-c', 'pass', 882 PYTHONTRACEMALLOC=str(nframe)) 883 884 if b'ValueError: the number of frames must be in range' in stderr: 885 return 886 if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: 887 return 888 self.fail(f"unexpected output: {stderr!a}") 889 890 891 def test_env_var_invalid(self): 892 for nframe in INVALID_NFRAME: 893 with self.subTest(nframe=nframe): 894 self.check_env_var_invalid(nframe) 895 896 def test_sys_xoptions(self): 897 for xoptions, nframe in ( 898 ('tracemalloc', 1), 899 ('tracemalloc=1', 1), 900 ('tracemalloc=15', 15), 901 ): 902 with self.subTest(xoptions=xoptions, nframe=nframe): 903 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 904 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) 905 stdout = stdout.rstrip() 906 self.assertEqual(stdout, str(nframe).encode('ascii')) 907 908 def check_sys_xoptions_invalid(self, nframe): 909 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') 910 with support.SuppressCrashReport(): 911 ok, stdout, stderr = assert_python_failure(*args) 912 913 if b'ValueError: the number of frames must be in range' in stderr: 914 return 915 if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: 916 return 917 self.fail(f"unexpected output: {stderr!a}") 918 919 def test_sys_xoptions_invalid(self): 920 for nframe in INVALID_NFRAME: 921 with self.subTest(nframe=nframe): 922 self.check_sys_xoptions_invalid(nframe) 923 924 @unittest.skipIf(_testcapi is None, 'need _testcapi') 925 def test_pymem_alloc0(self): 926 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled 927 # does not crash. 928 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' 929 assert_python_ok('-X', 'tracemalloc', '-c', code) 930 931 932@unittest.skipIf(_testcapi is None, 'need _testcapi') 933class TestCAPI(unittest.TestCase): 934 maxDiff = 80 * 20 935 936 def setUp(self): 937 if tracemalloc.is_tracing(): 938 self.skipTest("tracemalloc must be stopped before the test") 939 940 self.domain = 5 941 self.size = 123 942 self.obj = allocate_bytes(self.size)[0] 943 944 # for the type "object", id(obj) is the address of its memory block. 945 # This type is not tracked by the garbage collector 946 self.ptr = id(self.obj) 947 948 def tearDown(self): 949 tracemalloc.stop() 950 951 def get_traceback(self): 952 frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) 953 if frames is not None: 954 return tracemalloc.Traceback(frames) 955 else: 956 return None 957 958 def track(self, release_gil=False, nframe=1): 959 frames = get_frames(nframe, 1) 960 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, 961 release_gil) 962 return frames 963 964 def untrack(self): 965 _testcapi.tracemalloc_untrack(self.domain, self.ptr) 966 967 def get_traced_memory(self): 968 # Get the traced size in the domain 969 snapshot = tracemalloc.take_snapshot() 970 domain_filter = tracemalloc.DomainFilter(True, self.domain) 971 snapshot = snapshot.filter_traces([domain_filter]) 972 return sum(trace.size for trace in snapshot.traces) 973 974 def check_track(self, release_gil): 975 nframe = 5 976 tracemalloc.start(nframe) 977 978 size = tracemalloc.get_traced_memory()[0] 979 980 frames = self.track(release_gil, nframe) 981 self.assertEqual(self.get_traceback(), 982 tracemalloc.Traceback(frames)) 983 984 self.assertEqual(self.get_traced_memory(), self.size) 985 986 def test_track(self): 987 self.check_track(False) 988 989 def test_track_without_gil(self): 990 # check that calling _PyTraceMalloc_Track() without holding the GIL 991 # works too 992 self.check_track(True) 993 994 def test_track_already_tracked(self): 995 nframe = 5 996 tracemalloc.start(nframe) 997 998 # track a first time 999 self.track() 1000 1001 # calling _PyTraceMalloc_Track() must remove the old trace and add 1002 # a new trace with the new traceback 1003 frames = self.track(nframe=nframe) 1004 self.assertEqual(self.get_traceback(), 1005 tracemalloc.Traceback(frames)) 1006 1007 def test_untrack(self): 1008 tracemalloc.start() 1009 1010 self.track() 1011 self.assertIsNotNone(self.get_traceback()) 1012 self.assertEqual(self.get_traced_memory(), self.size) 1013 1014 # untrack must remove the trace 1015 self.untrack() 1016 self.assertIsNone(self.get_traceback()) 1017 self.assertEqual(self.get_traced_memory(), 0) 1018 1019 # calling _PyTraceMalloc_Untrack() multiple times must not crash 1020 self.untrack() 1021 self.untrack() 1022 1023 def test_stop_track(self): 1024 tracemalloc.start() 1025 tracemalloc.stop() 1026 1027 with self.assertRaises(RuntimeError): 1028 self.track() 1029 self.assertIsNone(self.get_traceback()) 1030 1031 def test_stop_untrack(self): 1032 tracemalloc.start() 1033 self.track() 1034 1035 tracemalloc.stop() 1036 with self.assertRaises(RuntimeError): 1037 self.untrack() 1038 1039 1040def test_main(): 1041 support.run_unittest( 1042 TestTracemallocEnabled, 1043 TestSnapshot, 1044 TestFilters, 1045 TestCommandLine, 1046 TestCAPI, 1047 ) 1048 1049if __name__ == "__main__": 1050 test_main() 1051