1import contextlib 2import os 3import sys 4import tracemalloc 5import unittest 6from unittest.mock import patch 7from test.support.script_helper import (assert_python_ok, assert_python_failure, 8 interpreter_requires_environment) 9from test import support 10from test.support import os_helper 11 12try: 13 import _testcapi 14except ImportError: 15 _testcapi = None 16 17 18EMPTY_STRING_SIZE = sys.getsizeof(b'') 19INVALID_NFRAME = (-1, 2**30) 20 21 22def get_frames(nframe, lineno_delta): 23 frames = [] 24 frame = sys._getframe(1) 25 for index in range(nframe): 26 code = frame.f_code 27 lineno = frame.f_lineno + lineno_delta 28 frames.append((code.co_filename, lineno)) 29 lineno_delta = 0 30 frame = frame.f_back 31 if frame is None: 32 break 33 return tuple(frames) 34 35def allocate_bytes(size): 36 nframe = tracemalloc.get_traceback_limit() 37 bytes_len = (size - EMPTY_STRING_SIZE) 38 frames = get_frames(nframe, 1) 39 data = b'x' * bytes_len 40 return data, tracemalloc.Traceback(frames, min(len(frames), nframe)) 41 42def create_snapshots(): 43 traceback_limit = 2 44 45 # _tracemalloc._get_traces() returns a list of (domain, size, 46 # traceback_frames) tuples. traceback_frames is a tuple of (filename, 47 # line_number) tuples. 48 raw_traces = [ 49 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 50 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 51 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 52 53 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 54 55 (2, 66, (('b.py', 1),), 1), 56 57 (3, 7, (('<unknown>', 0),), 1), 58 ] 59 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) 60 61 raw_traces2 = [ 62 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 63 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 64 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 65 66 (2, 2, (('a.py', 5), ('b.py', 4)), 3), 67 (2, 5000, (('a.py', 5), ('b.py', 4)), 3), 68 69 (4, 400, (('c.py', 578),), 1), 70 ] 71 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) 72 73 return (snapshot, snapshot2) 74 75def frame(filename, lineno): 76 return tracemalloc._Frame((filename, lineno)) 77 78def traceback(*frames): 79 return tracemalloc.Traceback(frames) 80 81def traceback_lineno(filename, lineno): 82 return traceback((filename, lineno)) 83 84def traceback_filename(filename): 85 return traceback_lineno(filename, 0) 86 87 88class TestTraceback(unittest.TestCase): 89 def test_repr(self): 90 def get_repr(*args) -> str: 91 return repr(tracemalloc.Traceback(*args)) 92 93 self.assertEqual(get_repr(()), "<Traceback ()>") 94 self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>") 95 96 frames = (("f1", 1), ("f2", 2)) 97 exp_repr_frames = ( 98 "(<Frame filename='f2' lineno=2>," 99 " <Frame filename='f1' lineno=1>)" 100 ) 101 self.assertEqual(get_repr(frames), 102 f"<Traceback {exp_repr_frames}>") 103 self.assertEqual(get_repr(frames, 2), 104 f"<Traceback {exp_repr_frames} total_nframe=2>") 105 106 107class TestTracemallocEnabled(unittest.TestCase): 108 def setUp(self): 109 if tracemalloc.is_tracing(): 110 self.skipTest("tracemalloc must be stopped before the test") 111 112 tracemalloc.start(1) 113 114 def tearDown(self): 115 tracemalloc.stop() 116 117 def test_get_tracemalloc_memory(self): 118 data = [allocate_bytes(123) for count in range(1000)] 119 size = tracemalloc.get_tracemalloc_memory() 120 self.assertGreaterEqual(size, 0) 121 122 tracemalloc.clear_traces() 123 size2 = tracemalloc.get_tracemalloc_memory() 124 self.assertGreaterEqual(size2, 0) 125 self.assertLessEqual(size2, size) 126 127 def test_get_object_traceback(self): 128 tracemalloc.clear_traces() 129 obj_size = 12345 130 obj, obj_traceback = allocate_bytes(obj_size) 131 traceback = tracemalloc.get_object_traceback(obj) 132 self.assertEqual(traceback, obj_traceback) 133 134 def test_new_reference(self): 135 tracemalloc.clear_traces() 136 # gc.collect() indirectly calls PyList_ClearFreeList() 137 support.gc_collect() 138 139 # Create a list and "destroy it": put it in the PyListObject free list 140 obj = [] 141 obj = None 142 143 # Create a list which should reuse the previously created empty list 144 obj = [] 145 146 nframe = tracemalloc.get_traceback_limit() 147 frames = get_frames(nframe, -3) 148 obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe)) 149 150 traceback = tracemalloc.get_object_traceback(obj) 151 self.assertIsNotNone(traceback) 152 self.assertEqual(traceback, obj_traceback) 153 154 def test_set_traceback_limit(self): 155 obj_size = 10 156 157 tracemalloc.stop() 158 self.assertRaises(ValueError, tracemalloc.start, -1) 159 160 tracemalloc.stop() 161 tracemalloc.start(10) 162 obj2, obj2_traceback = allocate_bytes(obj_size) 163 traceback = tracemalloc.get_object_traceback(obj2) 164 self.assertEqual(len(traceback), 10) 165 self.assertEqual(traceback, obj2_traceback) 166 167 tracemalloc.stop() 168 tracemalloc.start(1) 169 obj, obj_traceback = allocate_bytes(obj_size) 170 traceback = tracemalloc.get_object_traceback(obj) 171 self.assertEqual(len(traceback), 1) 172 self.assertEqual(traceback, obj_traceback) 173 174 def find_trace(self, traces, traceback): 175 for trace in traces: 176 if trace[2] == traceback._frames: 177 return trace 178 179 self.fail("trace not found") 180 181 def test_get_traces(self): 182 tracemalloc.clear_traces() 183 obj_size = 12345 184 obj, obj_traceback = allocate_bytes(obj_size) 185 186 traces = tracemalloc._get_traces() 187 trace = self.find_trace(traces, obj_traceback) 188 189 self.assertIsInstance(trace, tuple) 190 domain, size, traceback, length = trace 191 self.assertEqual(size, obj_size) 192 self.assertEqual(traceback, obj_traceback._frames) 193 194 tracemalloc.stop() 195 self.assertEqual(tracemalloc._get_traces(), []) 196 197 def test_get_traces_intern_traceback(self): 198 # dummy wrappers to get more useful and identical frames in the traceback 199 def allocate_bytes2(size): 200 return allocate_bytes(size) 201 def allocate_bytes3(size): 202 return allocate_bytes2(size) 203 def allocate_bytes4(size): 204 return allocate_bytes3(size) 205 206 # Ensure that two identical tracebacks are not duplicated 207 tracemalloc.stop() 208 tracemalloc.start(4) 209 obj_size = 123 210 obj1, obj1_traceback = allocate_bytes4(obj_size) 211 obj2, obj2_traceback = allocate_bytes4(obj_size) 212 213 traces = tracemalloc._get_traces() 214 215 obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) 216 obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) 217 218 trace1 = self.find_trace(traces, obj1_traceback) 219 trace2 = self.find_trace(traces, obj2_traceback) 220 domain1, size1, traceback1, length1 = trace1 221 domain2, size2, traceback2, length2 = trace2 222 self.assertIs(traceback2, traceback1) 223 224 def test_get_traced_memory(self): 225 # Python allocates some internals objects, so the test must tolerate 226 # a small difference between the expected size and the real usage 227 max_error = 2048 228 229 # allocate one object 230 obj_size = 1024 * 1024 231 tracemalloc.clear_traces() 232 obj, obj_traceback = allocate_bytes(obj_size) 233 size, peak_size = tracemalloc.get_traced_memory() 234 self.assertGreaterEqual(size, obj_size) 235 self.assertGreaterEqual(peak_size, size) 236 237 self.assertLessEqual(size - obj_size, max_error) 238 self.assertLessEqual(peak_size - size, max_error) 239 240 # destroy the object 241 obj = None 242 size2, peak_size2 = tracemalloc.get_traced_memory() 243 self.assertLess(size2, size) 244 self.assertGreaterEqual(size - size2, obj_size - max_error) 245 self.assertGreaterEqual(peak_size2, peak_size) 246 247 # clear_traces() must reset traced memory counters 248 tracemalloc.clear_traces() 249 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 250 251 # allocate another object 252 obj, obj_traceback = allocate_bytes(obj_size) 253 size, peak_size = tracemalloc.get_traced_memory() 254 self.assertGreaterEqual(size, obj_size) 255 256 # stop() also resets traced memory counters 257 tracemalloc.stop() 258 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 259 260 def test_clear_traces(self): 261 obj, obj_traceback = allocate_bytes(123) 262 traceback = tracemalloc.get_object_traceback(obj) 263 self.assertIsNotNone(traceback) 264 265 tracemalloc.clear_traces() 266 traceback2 = tracemalloc.get_object_traceback(obj) 267 self.assertIsNone(traceback2) 268 269 def test_reset_peak(self): 270 # Python allocates some internals objects, so the test must tolerate 271 # a small difference between the expected size and the real usage 272 tracemalloc.clear_traces() 273 274 # Example: allocate a large piece of memory, temporarily 275 large_sum = sum(list(range(100000))) 276 size1, peak1 = tracemalloc.get_traced_memory() 277 278 # reset_peak() resets peak to traced memory: peak2 < peak1 279 tracemalloc.reset_peak() 280 size2, peak2 = tracemalloc.get_traced_memory() 281 self.assertGreaterEqual(peak2, size2) 282 self.assertLess(peak2, peak1) 283 284 # check that peak continue to be updated if new memory is allocated: 285 # peak3 > peak2 286 obj_size = 1024 * 1024 287 obj, obj_traceback = allocate_bytes(obj_size) 288 size3, peak3 = tracemalloc.get_traced_memory() 289 self.assertGreaterEqual(peak3, size3) 290 self.assertGreater(peak3, peak2) 291 self.assertGreaterEqual(peak3 - peak2, obj_size) 292 293 def test_is_tracing(self): 294 tracemalloc.stop() 295 self.assertFalse(tracemalloc.is_tracing()) 296 297 tracemalloc.start() 298 self.assertTrue(tracemalloc.is_tracing()) 299 300 def test_snapshot(self): 301 obj, source = allocate_bytes(123) 302 303 # take a snapshot 304 snapshot = tracemalloc.take_snapshot() 305 306 # This can vary 307 self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10) 308 309 # write on disk 310 snapshot.dump(os_helper.TESTFN) 311 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 312 313 # load from disk 314 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 315 self.assertEqual(snapshot2.traces, snapshot.traces) 316 317 # tracemalloc must be tracing memory allocations to take a snapshot 318 tracemalloc.stop() 319 with self.assertRaises(RuntimeError) as cm: 320 tracemalloc.take_snapshot() 321 self.assertEqual(str(cm.exception), 322 "the tracemalloc module must be tracing memory " 323 "allocations to take a snapshot") 324 325 def test_snapshot_save_attr(self): 326 # take a snapshot with a new attribute 327 snapshot = tracemalloc.take_snapshot() 328 snapshot.test_attr = "new" 329 snapshot.dump(os_helper.TESTFN) 330 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 331 332 # load() should recreate the attribute 333 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 334 self.assertEqual(snapshot2.test_attr, "new") 335 336 def fork_child(self): 337 if not tracemalloc.is_tracing(): 338 return 2 339 340 obj_size = 12345 341 obj, obj_traceback = allocate_bytes(obj_size) 342 traceback = tracemalloc.get_object_traceback(obj) 343 if traceback is None: 344 return 3 345 346 # everything is fine 347 return 0 348 349 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') 350 def test_fork(self): 351 # check that tracemalloc is still working after fork 352 pid = os.fork() 353 if not pid: 354 # child 355 exitcode = 1 356 try: 357 exitcode = self.fork_child() 358 finally: 359 os._exit(exitcode) 360 else: 361 support.wait_process(pid, exitcode=0) 362 363 364class TestSnapshot(unittest.TestCase): 365 maxDiff = 4000 366 367 def test_create_snapshot(self): 368 raw_traces = [(0, 5, (('a.py', 2),), 10)] 369 370 with contextlib.ExitStack() as stack: 371 stack.enter_context(patch.object(tracemalloc, 'is_tracing', 372 return_value=True)) 373 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', 374 return_value=5)) 375 stack.enter_context(patch.object(tracemalloc, '_get_traces', 376 return_value=raw_traces)) 377 378 snapshot = tracemalloc.take_snapshot() 379 self.assertEqual(snapshot.traceback_limit, 5) 380 self.assertEqual(len(snapshot.traces), 1) 381 trace = snapshot.traces[0] 382 self.assertEqual(trace.size, 5) 383 self.assertEqual(trace.traceback.total_nframe, 10) 384 self.assertEqual(len(trace.traceback), 1) 385 self.assertEqual(trace.traceback[0].filename, 'a.py') 386 self.assertEqual(trace.traceback[0].lineno, 2) 387 388 def test_filter_traces(self): 389 snapshot, snapshot2 = create_snapshots() 390 filter1 = tracemalloc.Filter(False, "b.py") 391 filter2 = tracemalloc.Filter(True, "a.py", 2) 392 filter3 = tracemalloc.Filter(True, "a.py", 5) 393 394 original_traces = list(snapshot.traces._traces) 395 396 # exclude b.py 397 snapshot3 = snapshot.filter_traces((filter1,)) 398 self.assertEqual(snapshot3.traces._traces, [ 399 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 400 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 401 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 402 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 403 (3, 7, (('<unknown>', 0),), 1), 404 ]) 405 406 # filter_traces() must not touch the original snapshot 407 self.assertEqual(snapshot.traces._traces, original_traces) 408 409 # only include two lines of a.py 410 snapshot4 = snapshot3.filter_traces((filter2, filter3)) 411 self.assertEqual(snapshot4.traces._traces, [ 412 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 413 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 414 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 415 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 416 ]) 417 418 # No filter: just duplicate the snapshot 419 snapshot5 = snapshot.filter_traces(()) 420 self.assertIsNot(snapshot5, snapshot) 421 self.assertIsNot(snapshot5.traces, snapshot.traces) 422 self.assertEqual(snapshot5.traces, snapshot.traces) 423 424 self.assertRaises(TypeError, snapshot.filter_traces, filter1) 425 426 def test_filter_traces_domain(self): 427 snapshot, snapshot2 = create_snapshots() 428 filter1 = tracemalloc.Filter(False, "a.py", domain=1) 429 filter2 = tracemalloc.Filter(True, "a.py", domain=1) 430 431 original_traces = list(snapshot.traces._traces) 432 433 # exclude a.py of domain 1 434 snapshot3 = snapshot.filter_traces((filter1,)) 435 self.assertEqual(snapshot3.traces._traces, [ 436 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 437 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 438 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 439 (2, 66, (('b.py', 1),), 1), 440 (3, 7, (('<unknown>', 0),), 1), 441 ]) 442 443 # include domain 1 444 snapshot3 = snapshot.filter_traces((filter1,)) 445 self.assertEqual(snapshot3.traces._traces, [ 446 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 447 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 448 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 449 (2, 66, (('b.py', 1),), 1), 450 (3, 7, (('<unknown>', 0),), 1), 451 ]) 452 453 def test_filter_traces_domain_filter(self): 454 snapshot, snapshot2 = create_snapshots() 455 filter1 = tracemalloc.DomainFilter(False, domain=3) 456 filter2 = tracemalloc.DomainFilter(True, domain=3) 457 458 # exclude domain 2 459 snapshot3 = snapshot.filter_traces((filter1,)) 460 self.assertEqual(snapshot3.traces._traces, [ 461 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 462 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 463 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 464 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 465 (2, 66, (('b.py', 1),), 1), 466 ]) 467 468 # include domain 2 469 snapshot3 = snapshot.filter_traces((filter2,)) 470 self.assertEqual(snapshot3.traces._traces, [ 471 (3, 7, (('<unknown>', 0),), 1), 472 ]) 473 474 def test_snapshot_group_by_line(self): 475 snapshot, snapshot2 = create_snapshots() 476 tb_0 = traceback_lineno('<unknown>', 0) 477 tb_a_2 = traceback_lineno('a.py', 2) 478 tb_a_5 = traceback_lineno('a.py', 5) 479 tb_b_1 = traceback_lineno('b.py', 1) 480 tb_c_578 = traceback_lineno('c.py', 578) 481 482 # stats per file and line 483 stats1 = snapshot.statistics('lineno') 484 self.assertEqual(stats1, [ 485 tracemalloc.Statistic(tb_b_1, 66, 1), 486 tracemalloc.Statistic(tb_a_2, 30, 3), 487 tracemalloc.Statistic(tb_0, 7, 1), 488 tracemalloc.Statistic(tb_a_5, 2, 1), 489 ]) 490 491 # stats per file and line (2) 492 stats2 = snapshot2.statistics('lineno') 493 self.assertEqual(stats2, [ 494 tracemalloc.Statistic(tb_a_5, 5002, 2), 495 tracemalloc.Statistic(tb_c_578, 400, 1), 496 tracemalloc.Statistic(tb_a_2, 30, 3), 497 ]) 498 499 # stats diff per file and line 500 statistics = snapshot2.compare_to(snapshot, 'lineno') 501 self.assertEqual(statistics, [ 502 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), 503 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), 504 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), 505 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 506 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), 507 ]) 508 509 def test_snapshot_group_by_file(self): 510 snapshot, snapshot2 = create_snapshots() 511 tb_0 = traceback_filename('<unknown>') 512 tb_a = traceback_filename('a.py') 513 tb_b = traceback_filename('b.py') 514 tb_c = traceback_filename('c.py') 515 516 # stats per file 517 stats1 = snapshot.statistics('filename') 518 self.assertEqual(stats1, [ 519 tracemalloc.Statistic(tb_b, 66, 1), 520 tracemalloc.Statistic(tb_a, 32, 4), 521 tracemalloc.Statistic(tb_0, 7, 1), 522 ]) 523 524 # stats per file (2) 525 stats2 = snapshot2.statistics('filename') 526 self.assertEqual(stats2, [ 527 tracemalloc.Statistic(tb_a, 5032, 5), 528 tracemalloc.Statistic(tb_c, 400, 1), 529 ]) 530 531 # stats diff per file 532 diff = snapshot2.compare_to(snapshot, 'filename') 533 self.assertEqual(diff, [ 534 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), 535 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), 536 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), 537 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 538 ]) 539 540 def test_snapshot_group_by_traceback(self): 541 snapshot, snapshot2 = create_snapshots() 542 543 # stats per file 544 tb1 = traceback(('a.py', 2), ('b.py', 4)) 545 tb2 = traceback(('a.py', 5), ('b.py', 4)) 546 tb3 = traceback(('b.py', 1)) 547 tb4 = traceback(('<unknown>', 0)) 548 stats1 = snapshot.statistics('traceback') 549 self.assertEqual(stats1, [ 550 tracemalloc.Statistic(tb3, 66, 1), 551 tracemalloc.Statistic(tb1, 30, 3), 552 tracemalloc.Statistic(tb4, 7, 1), 553 tracemalloc.Statistic(tb2, 2, 1), 554 ]) 555 556 # stats per file (2) 557 tb5 = traceback(('c.py', 578)) 558 stats2 = snapshot2.statistics('traceback') 559 self.assertEqual(stats2, [ 560 tracemalloc.Statistic(tb2, 5002, 2), 561 tracemalloc.Statistic(tb5, 400, 1), 562 tracemalloc.Statistic(tb1, 30, 3), 563 ]) 564 565 # stats diff per file 566 diff = snapshot2.compare_to(snapshot, 'traceback') 567 self.assertEqual(diff, [ 568 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), 569 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), 570 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), 571 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), 572 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), 573 ]) 574 575 self.assertRaises(ValueError, 576 snapshot.statistics, 'traceback', cumulative=True) 577 578 def test_snapshot_group_by_cumulative(self): 579 snapshot, snapshot2 = create_snapshots() 580 tb_0 = traceback_filename('<unknown>') 581 tb_a = traceback_filename('a.py') 582 tb_b = traceback_filename('b.py') 583 tb_a_2 = traceback_lineno('a.py', 2) 584 tb_a_5 = traceback_lineno('a.py', 5) 585 tb_b_1 = traceback_lineno('b.py', 1) 586 tb_b_4 = traceback_lineno('b.py', 4) 587 588 # per file 589 stats = snapshot.statistics('filename', True) 590 self.assertEqual(stats, [ 591 tracemalloc.Statistic(tb_b, 98, 5), 592 tracemalloc.Statistic(tb_a, 32, 4), 593 tracemalloc.Statistic(tb_0, 7, 1), 594 ]) 595 596 # per line 597 stats = snapshot.statistics('lineno', True) 598 self.assertEqual(stats, [ 599 tracemalloc.Statistic(tb_b_1, 66, 1), 600 tracemalloc.Statistic(tb_b_4, 32, 4), 601 tracemalloc.Statistic(tb_a_2, 30, 3), 602 tracemalloc.Statistic(tb_0, 7, 1), 603 tracemalloc.Statistic(tb_a_5, 2, 1), 604 ]) 605 606 def test_trace_format(self): 607 snapshot, snapshot2 = create_snapshots() 608 trace = snapshot.traces[0] 609 self.assertEqual(str(trace), 'b.py:4: 10 B') 610 traceback = trace.traceback 611 self.assertEqual(str(traceback), 'b.py:4') 612 frame = traceback[0] 613 self.assertEqual(str(frame), 'b.py:4') 614 615 def test_statistic_format(self): 616 snapshot, snapshot2 = create_snapshots() 617 stats = snapshot.statistics('lineno') 618 stat = stats[0] 619 self.assertEqual(str(stat), 620 'b.py:1: size=66 B, count=1, average=66 B') 621 622 def test_statistic_diff_format(self): 623 snapshot, snapshot2 = create_snapshots() 624 stats = snapshot2.compare_to(snapshot, 'lineno') 625 stat = stats[0] 626 self.assertEqual(str(stat), 627 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') 628 629 def test_slices(self): 630 snapshot, snapshot2 = create_snapshots() 631 self.assertEqual(snapshot.traces[:2], 632 (snapshot.traces[0], snapshot.traces[1])) 633 634 traceback = snapshot.traces[0].traceback 635 self.assertEqual(traceback[:2], 636 (traceback[0], traceback[1])) 637 638 def test_format_traceback(self): 639 snapshot, snapshot2 = create_snapshots() 640 def getline(filename, lineno): 641 return ' <%s, %s>' % (filename, lineno) 642 with unittest.mock.patch('tracemalloc.linecache.getline', 643 side_effect=getline): 644 tb = snapshot.traces[0].traceback 645 self.assertEqual(tb.format(), 646 [' File "b.py", line 4', 647 ' <b.py, 4>', 648 ' File "a.py", line 2', 649 ' <a.py, 2>']) 650 651 self.assertEqual(tb.format(limit=1), 652 [' File "a.py", line 2', 653 ' <a.py, 2>']) 654 655 self.assertEqual(tb.format(limit=-1), 656 [' File "b.py", line 4', 657 ' <b.py, 4>']) 658 659 self.assertEqual(tb.format(most_recent_first=True), 660 [' File "a.py", line 2', 661 ' <a.py, 2>', 662 ' File "b.py", line 4', 663 ' <b.py, 4>']) 664 665 self.assertEqual(tb.format(limit=1, most_recent_first=True), 666 [' File "a.py", line 2', 667 ' <a.py, 2>']) 668 669 self.assertEqual(tb.format(limit=-1, most_recent_first=True), 670 [' File "b.py", line 4', 671 ' <b.py, 4>']) 672 673 674class TestFilters(unittest.TestCase): 675 maxDiff = 2048 676 677 def test_filter_attributes(self): 678 # test default values 679 f = tracemalloc.Filter(True, "abc") 680 self.assertEqual(f.inclusive, True) 681 self.assertEqual(f.filename_pattern, "abc") 682 self.assertIsNone(f.lineno) 683 self.assertEqual(f.all_frames, False) 684 685 # test custom values 686 f = tracemalloc.Filter(False, "test.py", 123, True) 687 self.assertEqual(f.inclusive, False) 688 self.assertEqual(f.filename_pattern, "test.py") 689 self.assertEqual(f.lineno, 123) 690 self.assertEqual(f.all_frames, True) 691 692 # parameters passed by keyword 693 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) 694 self.assertEqual(f.inclusive, False) 695 self.assertEqual(f.filename_pattern, "test.py") 696 self.assertEqual(f.lineno, 123) 697 self.assertEqual(f.all_frames, True) 698 699 # read-only attribute 700 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") 701 702 def test_filter_match(self): 703 # filter without line number 704 f = tracemalloc.Filter(True, "abc") 705 self.assertTrue(f._match_frame("abc", 0)) 706 self.assertTrue(f._match_frame("abc", 5)) 707 self.assertTrue(f._match_frame("abc", 10)) 708 self.assertFalse(f._match_frame("12356", 0)) 709 self.assertFalse(f._match_frame("12356", 5)) 710 self.assertFalse(f._match_frame("12356", 10)) 711 712 f = tracemalloc.Filter(False, "abc") 713 self.assertFalse(f._match_frame("abc", 0)) 714 self.assertFalse(f._match_frame("abc", 5)) 715 self.assertFalse(f._match_frame("abc", 10)) 716 self.assertTrue(f._match_frame("12356", 0)) 717 self.assertTrue(f._match_frame("12356", 5)) 718 self.assertTrue(f._match_frame("12356", 10)) 719 720 # filter with line number > 0 721 f = tracemalloc.Filter(True, "abc", 5) 722 self.assertFalse(f._match_frame("abc", 0)) 723 self.assertTrue(f._match_frame("abc", 5)) 724 self.assertFalse(f._match_frame("abc", 10)) 725 self.assertFalse(f._match_frame("12356", 0)) 726 self.assertFalse(f._match_frame("12356", 5)) 727 self.assertFalse(f._match_frame("12356", 10)) 728 729 f = tracemalloc.Filter(False, "abc", 5) 730 self.assertTrue(f._match_frame("abc", 0)) 731 self.assertFalse(f._match_frame("abc", 5)) 732 self.assertTrue(f._match_frame("abc", 10)) 733 self.assertTrue(f._match_frame("12356", 0)) 734 self.assertTrue(f._match_frame("12356", 5)) 735 self.assertTrue(f._match_frame("12356", 10)) 736 737 # filter with line number 0 738 f = tracemalloc.Filter(True, "abc", 0) 739 self.assertTrue(f._match_frame("abc", 0)) 740 self.assertFalse(f._match_frame("abc", 5)) 741 self.assertFalse(f._match_frame("abc", 10)) 742 self.assertFalse(f._match_frame("12356", 0)) 743 self.assertFalse(f._match_frame("12356", 5)) 744 self.assertFalse(f._match_frame("12356", 10)) 745 746 f = tracemalloc.Filter(False, "abc", 0) 747 self.assertFalse(f._match_frame("abc", 0)) 748 self.assertTrue(f._match_frame("abc", 5)) 749 self.assertTrue(f._match_frame("abc", 10)) 750 self.assertTrue(f._match_frame("12356", 0)) 751 self.assertTrue(f._match_frame("12356", 5)) 752 self.assertTrue(f._match_frame("12356", 10)) 753 754 def test_filter_match_filename(self): 755 def fnmatch(inclusive, filename, pattern): 756 f = tracemalloc.Filter(inclusive, pattern) 757 return f._match_frame(filename, 0) 758 759 self.assertTrue(fnmatch(True, "abc", "abc")) 760 self.assertFalse(fnmatch(True, "12356", "abc")) 761 self.assertFalse(fnmatch(True, "<unknown>", "abc")) 762 763 self.assertFalse(fnmatch(False, "abc", "abc")) 764 self.assertTrue(fnmatch(False, "12356", "abc")) 765 self.assertTrue(fnmatch(False, "<unknown>", "abc")) 766 767 def test_filter_match_filename_joker(self): 768 def fnmatch(filename, pattern): 769 filter = tracemalloc.Filter(True, pattern) 770 return filter._match_frame(filename, 0) 771 772 # empty string 773 self.assertFalse(fnmatch('abc', '')) 774 self.assertFalse(fnmatch('', 'abc')) 775 self.assertTrue(fnmatch('', '')) 776 self.assertTrue(fnmatch('', '*')) 777 778 # no * 779 self.assertTrue(fnmatch('abc', 'abc')) 780 self.assertFalse(fnmatch('abc', 'abcd')) 781 self.assertFalse(fnmatch('abc', 'def')) 782 783 # a* 784 self.assertTrue(fnmatch('abc', 'a*')) 785 self.assertTrue(fnmatch('abc', 'abc*')) 786 self.assertFalse(fnmatch('abc', 'b*')) 787 self.assertFalse(fnmatch('abc', 'abcd*')) 788 789 # a*b 790 self.assertTrue(fnmatch('abc', 'a*c')) 791 self.assertTrue(fnmatch('abcdcx', 'a*cx')) 792 self.assertFalse(fnmatch('abb', 'a*c')) 793 self.assertFalse(fnmatch('abcdce', 'a*cx')) 794 795 # a*b*c 796 self.assertTrue(fnmatch('abcde', 'a*c*e')) 797 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) 798 self.assertFalse(fnmatch('abcdd', 'a*c*e')) 799 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) 800 801 # replace .pyc suffix with .py 802 self.assertTrue(fnmatch('a.pyc', 'a.py')) 803 self.assertTrue(fnmatch('a.py', 'a.pyc')) 804 805 if os.name == 'nt': 806 # case insensitive 807 self.assertTrue(fnmatch('aBC', 'ABc')) 808 self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) 809 810 self.assertTrue(fnmatch('a.pyc', 'a.PY')) 811 self.assertTrue(fnmatch('a.py', 'a.PYC')) 812 else: 813 # case sensitive 814 self.assertFalse(fnmatch('aBC', 'ABc')) 815 self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) 816 817 self.assertFalse(fnmatch('a.pyc', 'a.PY')) 818 self.assertFalse(fnmatch('a.py', 'a.PYC')) 819 820 if os.name == 'nt': 821 # normalize alternate separator "/" to the standard separator "\" 822 self.assertTrue(fnmatch(r'a/b', r'a\b')) 823 self.assertTrue(fnmatch(r'a\b', r'a/b')) 824 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) 825 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) 826 else: 827 # there is no alternate separator 828 self.assertFalse(fnmatch(r'a/b', r'a\b')) 829 self.assertFalse(fnmatch(r'a\b', r'a/b')) 830 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) 831 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) 832 833 # as of 3.5, .pyo is no longer munged to .py 834 self.assertFalse(fnmatch('a.pyo', 'a.py')) 835 836 def test_filter_match_trace(self): 837 t1 = (("a.py", 2), ("b.py", 3)) 838 t2 = (("b.py", 4), ("b.py", 5)) 839 t3 = (("c.py", 5), ('<unknown>', 0)) 840 unknown = (('<unknown>', 0),) 841 842 f = tracemalloc.Filter(True, "b.py", all_frames=True) 843 self.assertTrue(f._match_traceback(t1)) 844 self.assertTrue(f._match_traceback(t2)) 845 self.assertFalse(f._match_traceback(t3)) 846 self.assertFalse(f._match_traceback(unknown)) 847 848 f = tracemalloc.Filter(True, "b.py", all_frames=False) 849 self.assertFalse(f._match_traceback(t1)) 850 self.assertTrue(f._match_traceback(t2)) 851 self.assertFalse(f._match_traceback(t3)) 852 self.assertFalse(f._match_traceback(unknown)) 853 854 f = tracemalloc.Filter(False, "b.py", all_frames=True) 855 self.assertFalse(f._match_traceback(t1)) 856 self.assertFalse(f._match_traceback(t2)) 857 self.assertTrue(f._match_traceback(t3)) 858 self.assertTrue(f._match_traceback(unknown)) 859 860 f = tracemalloc.Filter(False, "b.py", all_frames=False) 861 self.assertTrue(f._match_traceback(t1)) 862 self.assertFalse(f._match_traceback(t2)) 863 self.assertTrue(f._match_traceback(t3)) 864 self.assertTrue(f._match_traceback(unknown)) 865 866 f = tracemalloc.Filter(False, "<unknown>", all_frames=False) 867 self.assertTrue(f._match_traceback(t1)) 868 self.assertTrue(f._match_traceback(t2)) 869 self.assertTrue(f._match_traceback(t3)) 870 self.assertFalse(f._match_traceback(unknown)) 871 872 f = tracemalloc.Filter(True, "<unknown>", all_frames=True) 873 self.assertFalse(f._match_traceback(t1)) 874 self.assertFalse(f._match_traceback(t2)) 875 self.assertTrue(f._match_traceback(t3)) 876 self.assertTrue(f._match_traceback(unknown)) 877 878 f = tracemalloc.Filter(False, "<unknown>", all_frames=True) 879 self.assertTrue(f._match_traceback(t1)) 880 self.assertTrue(f._match_traceback(t2)) 881 self.assertFalse(f._match_traceback(t3)) 882 self.assertFalse(f._match_traceback(unknown)) 883 884 885class TestCommandLine(unittest.TestCase): 886 def test_env_var_disabled_by_default(self): 887 # not tracing by default 888 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 889 ok, stdout, stderr = assert_python_ok('-c', code) 890 stdout = stdout.rstrip() 891 self.assertEqual(stdout, b'False') 892 893 @unittest.skipIf(interpreter_requires_environment(), 894 'Cannot run -E tests when PYTHON env vars are required.') 895 def test_env_var_ignored_with_E(self): 896 """PYTHON* environment variables must be ignored when -E is present.""" 897 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 898 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') 899 stdout = stdout.rstrip() 900 self.assertEqual(stdout, b'False') 901 902 def test_env_var_disabled(self): 903 # tracing at startup 904 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 905 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') 906 stdout = stdout.rstrip() 907 self.assertEqual(stdout, b'False') 908 909 def test_env_var_enabled_at_startup(self): 910 # tracing at startup 911 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 912 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') 913 stdout = stdout.rstrip() 914 self.assertEqual(stdout, b'True') 915 916 def test_env_limit(self): 917 # start and set the number of frames 918 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 919 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') 920 stdout = stdout.rstrip() 921 self.assertEqual(stdout, b'10') 922 923 def check_env_var_invalid(self, nframe): 924 with support.SuppressCrashReport(): 925 ok, stdout, stderr = assert_python_failure( 926 '-c', 'pass', 927 PYTHONTRACEMALLOC=str(nframe)) 928 929 if b'ValueError: the number of frames must be in range' in stderr: 930 return 931 if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: 932 return 933 self.fail(f"unexpected output: {stderr!a}") 934 935 936 def test_env_var_invalid(self): 937 for nframe in INVALID_NFRAME: 938 with self.subTest(nframe=nframe): 939 self.check_env_var_invalid(nframe) 940 941 def test_sys_xoptions(self): 942 for xoptions, nframe in ( 943 ('tracemalloc', 1), 944 ('tracemalloc=1', 1), 945 ('tracemalloc=15', 15), 946 ): 947 with self.subTest(xoptions=xoptions, nframe=nframe): 948 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 949 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) 950 stdout = stdout.rstrip() 951 self.assertEqual(stdout, str(nframe).encode('ascii')) 952 953 def check_sys_xoptions_invalid(self, nframe): 954 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') 955 with support.SuppressCrashReport(): 956 ok, stdout, stderr = assert_python_failure(*args) 957 958 if b'ValueError: the number of frames must be in range' in stderr: 959 return 960 if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: 961 return 962 self.fail(f"unexpected output: {stderr!a}") 963 964 def test_sys_xoptions_invalid(self): 965 for nframe in INVALID_NFRAME: 966 with self.subTest(nframe=nframe): 967 self.check_sys_xoptions_invalid(nframe) 968 969 @unittest.skipIf(_testcapi is None, 'need _testcapi') 970 def test_pymem_alloc0(self): 971 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled 972 # does not crash. 973 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' 974 assert_python_ok('-X', 'tracemalloc', '-c', code) 975 976 977@unittest.skipIf(_testcapi is None, 'need _testcapi') 978class TestCAPI(unittest.TestCase): 979 maxDiff = 80 * 20 980 981 def setUp(self): 982 if tracemalloc.is_tracing(): 983 self.skipTest("tracemalloc must be stopped before the test") 984 985 self.domain = 5 986 self.size = 123 987 self.obj = allocate_bytes(self.size)[0] 988 989 # for the type "object", id(obj) is the address of its memory block. 990 # This type is not tracked by the garbage collector 991 self.ptr = id(self.obj) 992 993 def tearDown(self): 994 tracemalloc.stop() 995 996 def get_traceback(self): 997 frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) 998 if frames is not None: 999 return tracemalloc.Traceback(frames) 1000 else: 1001 return None 1002 1003 def track(self, release_gil=False, nframe=1): 1004 frames = get_frames(nframe, 1) 1005 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, 1006 release_gil) 1007 return frames 1008 1009 def untrack(self): 1010 _testcapi.tracemalloc_untrack(self.domain, self.ptr) 1011 1012 def get_traced_memory(self): 1013 # Get the traced size in the domain 1014 snapshot = tracemalloc.take_snapshot() 1015 domain_filter = tracemalloc.DomainFilter(True, self.domain) 1016 snapshot = snapshot.filter_traces([domain_filter]) 1017 return sum(trace.size for trace in snapshot.traces) 1018 1019 def check_track(self, release_gil): 1020 nframe = 5 1021 tracemalloc.start(nframe) 1022 1023 size = tracemalloc.get_traced_memory()[0] 1024 1025 frames = self.track(release_gil, nframe) 1026 self.assertEqual(self.get_traceback(), 1027 tracemalloc.Traceback(frames)) 1028 1029 self.assertEqual(self.get_traced_memory(), self.size) 1030 1031 def test_track(self): 1032 self.check_track(False) 1033 1034 def test_track_without_gil(self): 1035 # check that calling _PyTraceMalloc_Track() without holding the GIL 1036 # works too 1037 self.check_track(True) 1038 1039 def test_track_already_tracked(self): 1040 nframe = 5 1041 tracemalloc.start(nframe) 1042 1043 # track a first time 1044 self.track() 1045 1046 # calling _PyTraceMalloc_Track() must remove the old trace and add 1047 # a new trace with the new traceback 1048 frames = self.track(nframe=nframe) 1049 self.assertEqual(self.get_traceback(), 1050 tracemalloc.Traceback(frames)) 1051 1052 def test_untrack(self): 1053 tracemalloc.start() 1054 1055 self.track() 1056 self.assertIsNotNone(self.get_traceback()) 1057 self.assertEqual(self.get_traced_memory(), self.size) 1058 1059 # untrack must remove the trace 1060 self.untrack() 1061 self.assertIsNone(self.get_traceback()) 1062 self.assertEqual(self.get_traced_memory(), 0) 1063 1064 # calling _PyTraceMalloc_Untrack() multiple times must not crash 1065 self.untrack() 1066 self.untrack() 1067 1068 def test_stop_track(self): 1069 tracemalloc.start() 1070 tracemalloc.stop() 1071 1072 with self.assertRaises(RuntimeError): 1073 self.track() 1074 self.assertIsNone(self.get_traceback()) 1075 1076 def test_stop_untrack(self): 1077 tracemalloc.start() 1078 self.track() 1079 1080 tracemalloc.stop() 1081 with self.assertRaises(RuntimeError): 1082 self.untrack() 1083 1084 1085if __name__ == "__main__": 1086 unittest.main() 1087