1import contextlib
2import os
3import sys
4import tracemalloc
5import unittest
6from unittest.mock import patch
7from test.support.script_helper import (assert_python_ok, assert_python_failure,
8                                        interpreter_requires_environment)
9from test import support
10from test.support import os_helper
11
12try:
13    import _testcapi
14except ImportError:
15    _testcapi = None
16
17
18EMPTY_STRING_SIZE = sys.getsizeof(b'')
19INVALID_NFRAME = (-1, 2**30)
20
21
22def get_frames(nframe, lineno_delta):
23    frames = []
24    frame = sys._getframe(1)
25    for index in range(nframe):
26        code = frame.f_code
27        lineno = frame.f_lineno + lineno_delta
28        frames.append((code.co_filename, lineno))
29        lineno_delta = 0
30        frame = frame.f_back
31        if frame is None:
32            break
33    return tuple(frames)
34
35def allocate_bytes(size):
36    nframe = tracemalloc.get_traceback_limit()
37    bytes_len = (size - EMPTY_STRING_SIZE)
38    frames = get_frames(nframe, 1)
39    data = b'x' * bytes_len
40    return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
41
42def create_snapshots():
43    traceback_limit = 2
44
45    # _tracemalloc._get_traces() returns a list of (domain, size,
46    # traceback_frames) tuples. traceback_frames is a tuple of (filename,
47    # line_number) tuples.
48    raw_traces = [
49        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
50        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
51        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
52
53        (1, 2, (('a.py', 5), ('b.py', 4)), 3),
54
55        (2, 66, (('b.py', 1),), 1),
56
57        (3, 7, (('<unknown>', 0),), 1),
58    ]
59    snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
60
61    raw_traces2 = [
62        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
63        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
64        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
65
66        (2, 2, (('a.py', 5), ('b.py', 4)), 3),
67        (2, 5000, (('a.py', 5), ('b.py', 4)), 3),
68
69        (4, 400, (('c.py', 578),), 1),
70    ]
71    snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
72
73    return (snapshot, snapshot2)
74
75def frame(filename, lineno):
76    return tracemalloc._Frame((filename, lineno))
77
78def traceback(*frames):
79    return tracemalloc.Traceback(frames)
80
81def traceback_lineno(filename, lineno):
82    return traceback((filename, lineno))
83
84def traceback_filename(filename):
85    return traceback_lineno(filename, 0)
86
87
88class TestTraceback(unittest.TestCase):
89    def test_repr(self):
90        def get_repr(*args) -> str:
91            return repr(tracemalloc.Traceback(*args))
92
93        self.assertEqual(get_repr(()), "<Traceback ()>")
94        self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>")
95
96        frames = (("f1", 1), ("f2", 2))
97        exp_repr_frames = (
98            "(<Frame filename='f2' lineno=2>,"
99            " <Frame filename='f1' lineno=1>)"
100        )
101        self.assertEqual(get_repr(frames),
102                         f"<Traceback {exp_repr_frames}>")
103        self.assertEqual(get_repr(frames, 2),
104                         f"<Traceback {exp_repr_frames} total_nframe=2>")
105
106
107class TestTracemallocEnabled(unittest.TestCase):
108    def setUp(self):
109        if tracemalloc.is_tracing():
110            self.skipTest("tracemalloc must be stopped before the test")
111
112        tracemalloc.start(1)
113
114    def tearDown(self):
115        tracemalloc.stop()
116
117    def test_get_tracemalloc_memory(self):
118        data = [allocate_bytes(123) for count in range(1000)]
119        size = tracemalloc.get_tracemalloc_memory()
120        self.assertGreaterEqual(size, 0)
121
122        tracemalloc.clear_traces()
123        size2 = tracemalloc.get_tracemalloc_memory()
124        self.assertGreaterEqual(size2, 0)
125        self.assertLessEqual(size2, size)
126
127    def test_get_object_traceback(self):
128        tracemalloc.clear_traces()
129        obj_size = 12345
130        obj, obj_traceback = allocate_bytes(obj_size)
131        traceback = tracemalloc.get_object_traceback(obj)
132        self.assertEqual(traceback, obj_traceback)
133
134    def test_new_reference(self):
135        tracemalloc.clear_traces()
136        # gc.collect() indirectly calls PyList_ClearFreeList()
137        support.gc_collect()
138
139        # Create a list and "destroy it": put it in the PyListObject free list
140        obj = []
141        obj = None
142
143        # Create a list which should reuse the previously created empty list
144        obj = []
145
146        nframe = tracemalloc.get_traceback_limit()
147        frames = get_frames(nframe, -3)
148        obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
149
150        traceback = tracemalloc.get_object_traceback(obj)
151        self.assertIsNotNone(traceback)
152        self.assertEqual(traceback, obj_traceback)
153
154    def test_set_traceback_limit(self):
155        obj_size = 10
156
157        tracemalloc.stop()
158        self.assertRaises(ValueError, tracemalloc.start, -1)
159
160        tracemalloc.stop()
161        tracemalloc.start(10)
162        obj2, obj2_traceback = allocate_bytes(obj_size)
163        traceback = tracemalloc.get_object_traceback(obj2)
164        self.assertEqual(len(traceback), 10)
165        self.assertEqual(traceback, obj2_traceback)
166
167        tracemalloc.stop()
168        tracemalloc.start(1)
169        obj, obj_traceback = allocate_bytes(obj_size)
170        traceback = tracemalloc.get_object_traceback(obj)
171        self.assertEqual(len(traceback), 1)
172        self.assertEqual(traceback, obj_traceback)
173
174    def find_trace(self, traces, traceback):
175        for trace in traces:
176            if trace[2] == traceback._frames:
177                return trace
178
179        self.fail("trace not found")
180
181    def test_get_traces(self):
182        tracemalloc.clear_traces()
183        obj_size = 12345
184        obj, obj_traceback = allocate_bytes(obj_size)
185
186        traces = tracemalloc._get_traces()
187        trace = self.find_trace(traces, obj_traceback)
188
189        self.assertIsInstance(trace, tuple)
190        domain, size, traceback, length = trace
191        self.assertEqual(size, obj_size)
192        self.assertEqual(traceback, obj_traceback._frames)
193
194        tracemalloc.stop()
195        self.assertEqual(tracemalloc._get_traces(), [])
196
197    def test_get_traces_intern_traceback(self):
198        # dummy wrappers to get more useful and identical frames in the traceback
199        def allocate_bytes2(size):
200            return allocate_bytes(size)
201        def allocate_bytes3(size):
202            return allocate_bytes2(size)
203        def allocate_bytes4(size):
204            return allocate_bytes3(size)
205
206        # Ensure that two identical tracebacks are not duplicated
207        tracemalloc.stop()
208        tracemalloc.start(4)
209        obj_size = 123
210        obj1, obj1_traceback = allocate_bytes4(obj_size)
211        obj2, obj2_traceback = allocate_bytes4(obj_size)
212
213        traces = tracemalloc._get_traces()
214
215        obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
216        obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
217
218        trace1 = self.find_trace(traces, obj1_traceback)
219        trace2 = self.find_trace(traces, obj2_traceback)
220        domain1, size1, traceback1, length1 = trace1
221        domain2, size2, traceback2, length2 = trace2
222        self.assertIs(traceback2, traceback1)
223
224    def test_get_traced_memory(self):
225        # Python allocates some internals objects, so the test must tolerate
226        # a small difference between the expected size and the real usage
227        max_error = 2048
228
229        # allocate one object
230        obj_size = 1024 * 1024
231        tracemalloc.clear_traces()
232        obj, obj_traceback = allocate_bytes(obj_size)
233        size, peak_size = tracemalloc.get_traced_memory()
234        self.assertGreaterEqual(size, obj_size)
235        self.assertGreaterEqual(peak_size, size)
236
237        self.assertLessEqual(size - obj_size, max_error)
238        self.assertLessEqual(peak_size - size, max_error)
239
240        # destroy the object
241        obj = None
242        size2, peak_size2 = tracemalloc.get_traced_memory()
243        self.assertLess(size2, size)
244        self.assertGreaterEqual(size - size2, obj_size - max_error)
245        self.assertGreaterEqual(peak_size2, peak_size)
246
247        # clear_traces() must reset traced memory counters
248        tracemalloc.clear_traces()
249        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
250
251        # allocate another object
252        obj, obj_traceback = allocate_bytes(obj_size)
253        size, peak_size = tracemalloc.get_traced_memory()
254        self.assertGreaterEqual(size, obj_size)
255
256        # stop() also resets traced memory counters
257        tracemalloc.stop()
258        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
259
260    def test_clear_traces(self):
261        obj, obj_traceback = allocate_bytes(123)
262        traceback = tracemalloc.get_object_traceback(obj)
263        self.assertIsNotNone(traceback)
264
265        tracemalloc.clear_traces()
266        traceback2 = tracemalloc.get_object_traceback(obj)
267        self.assertIsNone(traceback2)
268
269    def test_reset_peak(self):
270        # Python allocates some internals objects, so the test must tolerate
271        # a small difference between the expected size and the real usage
272        tracemalloc.clear_traces()
273
274        # Example: allocate a large piece of memory, temporarily
275        large_sum = sum(list(range(100000)))
276        size1, peak1 = tracemalloc.get_traced_memory()
277
278        # reset_peak() resets peak to traced memory: peak2 < peak1
279        tracemalloc.reset_peak()
280        size2, peak2 = tracemalloc.get_traced_memory()
281        self.assertGreaterEqual(peak2, size2)
282        self.assertLess(peak2, peak1)
283
284        # check that peak continue to be updated if new memory is allocated:
285        # peak3 > peak2
286        obj_size = 1024 * 1024
287        obj, obj_traceback = allocate_bytes(obj_size)
288        size3, peak3 = tracemalloc.get_traced_memory()
289        self.assertGreaterEqual(peak3, size3)
290        self.assertGreater(peak3, peak2)
291        self.assertGreaterEqual(peak3 - peak2, obj_size)
292
293    def test_is_tracing(self):
294        tracemalloc.stop()
295        self.assertFalse(tracemalloc.is_tracing())
296
297        tracemalloc.start()
298        self.assertTrue(tracemalloc.is_tracing())
299
300    def test_snapshot(self):
301        obj, source = allocate_bytes(123)
302
303        # take a snapshot
304        snapshot = tracemalloc.take_snapshot()
305
306        # This can vary
307        self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
308
309        # write on disk
310        snapshot.dump(os_helper.TESTFN)
311        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
312
313        # load from disk
314        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
315        self.assertEqual(snapshot2.traces, snapshot.traces)
316
317        # tracemalloc must be tracing memory allocations to take a snapshot
318        tracemalloc.stop()
319        with self.assertRaises(RuntimeError) as cm:
320            tracemalloc.take_snapshot()
321        self.assertEqual(str(cm.exception),
322                         "the tracemalloc module must be tracing memory "
323                         "allocations to take a snapshot")
324
325    def test_snapshot_save_attr(self):
326        # take a snapshot with a new attribute
327        snapshot = tracemalloc.take_snapshot()
328        snapshot.test_attr = "new"
329        snapshot.dump(os_helper.TESTFN)
330        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
331
332        # load() should recreate the attribute
333        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
334        self.assertEqual(snapshot2.test_attr, "new")
335
336    def fork_child(self):
337        if not tracemalloc.is_tracing():
338            return 2
339
340        obj_size = 12345
341        obj, obj_traceback = allocate_bytes(obj_size)
342        traceback = tracemalloc.get_object_traceback(obj)
343        if traceback is None:
344            return 3
345
346        # everything is fine
347        return 0
348
349    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
350    def test_fork(self):
351        # check that tracemalloc is still working after fork
352        pid = os.fork()
353        if not pid:
354            # child
355            exitcode = 1
356            try:
357                exitcode = self.fork_child()
358            finally:
359                os._exit(exitcode)
360        else:
361            support.wait_process(pid, exitcode=0)
362
363
364class TestSnapshot(unittest.TestCase):
365    maxDiff = 4000
366
367    def test_create_snapshot(self):
368        raw_traces = [(0, 5, (('a.py', 2),), 10)]
369
370        with contextlib.ExitStack() as stack:
371            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
372                                             return_value=True))
373            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
374                                             return_value=5))
375            stack.enter_context(patch.object(tracemalloc, '_get_traces',
376                                             return_value=raw_traces))
377
378            snapshot = tracemalloc.take_snapshot()
379            self.assertEqual(snapshot.traceback_limit, 5)
380            self.assertEqual(len(snapshot.traces), 1)
381            trace = snapshot.traces[0]
382            self.assertEqual(trace.size, 5)
383            self.assertEqual(trace.traceback.total_nframe, 10)
384            self.assertEqual(len(trace.traceback), 1)
385            self.assertEqual(trace.traceback[0].filename, 'a.py')
386            self.assertEqual(trace.traceback[0].lineno, 2)
387
388    def test_filter_traces(self):
389        snapshot, snapshot2 = create_snapshots()
390        filter1 = tracemalloc.Filter(False, "b.py")
391        filter2 = tracemalloc.Filter(True, "a.py", 2)
392        filter3 = tracemalloc.Filter(True, "a.py", 5)
393
394        original_traces = list(snapshot.traces._traces)
395
396        # exclude b.py
397        snapshot3 = snapshot.filter_traces((filter1,))
398        self.assertEqual(snapshot3.traces._traces, [
399            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
400            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
401            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
402            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
403            (3, 7, (('<unknown>', 0),), 1),
404        ])
405
406        # filter_traces() must not touch the original snapshot
407        self.assertEqual(snapshot.traces._traces, original_traces)
408
409        # only include two lines of a.py
410        snapshot4 = snapshot3.filter_traces((filter2, filter3))
411        self.assertEqual(snapshot4.traces._traces, [
412            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
413            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
414            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
415            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
416        ])
417
418        # No filter: just duplicate the snapshot
419        snapshot5 = snapshot.filter_traces(())
420        self.assertIsNot(snapshot5, snapshot)
421        self.assertIsNot(snapshot5.traces, snapshot.traces)
422        self.assertEqual(snapshot5.traces, snapshot.traces)
423
424        self.assertRaises(TypeError, snapshot.filter_traces, filter1)
425
426    def test_filter_traces_domain(self):
427        snapshot, snapshot2 = create_snapshots()
428        filter1 = tracemalloc.Filter(False, "a.py", domain=1)
429        filter2 = tracemalloc.Filter(True, "a.py", domain=1)
430
431        original_traces = list(snapshot.traces._traces)
432
433        # exclude a.py of domain 1
434        snapshot3 = snapshot.filter_traces((filter1,))
435        self.assertEqual(snapshot3.traces._traces, [
436            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
437            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
438            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
439            (2, 66, (('b.py', 1),), 1),
440            (3, 7, (('<unknown>', 0),), 1),
441        ])
442
443        # include domain 1
444        snapshot3 = snapshot.filter_traces((filter1,))
445        self.assertEqual(snapshot3.traces._traces, [
446            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
447            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
448            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
449            (2, 66, (('b.py', 1),), 1),
450            (3, 7, (('<unknown>', 0),), 1),
451        ])
452
453    def test_filter_traces_domain_filter(self):
454        snapshot, snapshot2 = create_snapshots()
455        filter1 = tracemalloc.DomainFilter(False, domain=3)
456        filter2 = tracemalloc.DomainFilter(True, domain=3)
457
458        # exclude domain 2
459        snapshot3 = snapshot.filter_traces((filter1,))
460        self.assertEqual(snapshot3.traces._traces, [
461            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
462            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
463            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
464            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
465            (2, 66, (('b.py', 1),), 1),
466        ])
467
468        # include domain 2
469        snapshot3 = snapshot.filter_traces((filter2,))
470        self.assertEqual(snapshot3.traces._traces, [
471            (3, 7, (('<unknown>', 0),), 1),
472        ])
473
474    def test_snapshot_group_by_line(self):
475        snapshot, snapshot2 = create_snapshots()
476        tb_0 = traceback_lineno('<unknown>', 0)
477        tb_a_2 = traceback_lineno('a.py', 2)
478        tb_a_5 = traceback_lineno('a.py', 5)
479        tb_b_1 = traceback_lineno('b.py', 1)
480        tb_c_578 = traceback_lineno('c.py', 578)
481
482        # stats per file and line
483        stats1 = snapshot.statistics('lineno')
484        self.assertEqual(stats1, [
485            tracemalloc.Statistic(tb_b_1, 66, 1),
486            tracemalloc.Statistic(tb_a_2, 30, 3),
487            tracemalloc.Statistic(tb_0, 7, 1),
488            tracemalloc.Statistic(tb_a_5, 2, 1),
489        ])
490
491        # stats per file and line (2)
492        stats2 = snapshot2.statistics('lineno')
493        self.assertEqual(stats2, [
494            tracemalloc.Statistic(tb_a_5, 5002, 2),
495            tracemalloc.Statistic(tb_c_578, 400, 1),
496            tracemalloc.Statistic(tb_a_2, 30, 3),
497        ])
498
499        # stats diff per file and line
500        statistics = snapshot2.compare_to(snapshot, 'lineno')
501        self.assertEqual(statistics, [
502            tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
503            tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
504            tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
505            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
506            tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
507        ])
508
509    def test_snapshot_group_by_file(self):
510        snapshot, snapshot2 = create_snapshots()
511        tb_0 = traceback_filename('<unknown>')
512        tb_a = traceback_filename('a.py')
513        tb_b = traceback_filename('b.py')
514        tb_c = traceback_filename('c.py')
515
516        # stats per file
517        stats1 = snapshot.statistics('filename')
518        self.assertEqual(stats1, [
519            tracemalloc.Statistic(tb_b, 66, 1),
520            tracemalloc.Statistic(tb_a, 32, 4),
521            tracemalloc.Statistic(tb_0, 7, 1),
522        ])
523
524        # stats per file (2)
525        stats2 = snapshot2.statistics('filename')
526        self.assertEqual(stats2, [
527            tracemalloc.Statistic(tb_a, 5032, 5),
528            tracemalloc.Statistic(tb_c, 400, 1),
529        ])
530
531        # stats diff per file
532        diff = snapshot2.compare_to(snapshot, 'filename')
533        self.assertEqual(diff, [
534            tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
535            tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
536            tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
537            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
538        ])
539
540    def test_snapshot_group_by_traceback(self):
541        snapshot, snapshot2 = create_snapshots()
542
543        # stats per file
544        tb1 = traceback(('a.py', 2), ('b.py', 4))
545        tb2 = traceback(('a.py', 5), ('b.py', 4))
546        tb3 = traceback(('b.py', 1))
547        tb4 = traceback(('<unknown>', 0))
548        stats1 = snapshot.statistics('traceback')
549        self.assertEqual(stats1, [
550            tracemalloc.Statistic(tb3, 66, 1),
551            tracemalloc.Statistic(tb1, 30, 3),
552            tracemalloc.Statistic(tb4, 7, 1),
553            tracemalloc.Statistic(tb2, 2, 1),
554        ])
555
556        # stats per file (2)
557        tb5 = traceback(('c.py', 578))
558        stats2 = snapshot2.statistics('traceback')
559        self.assertEqual(stats2, [
560            tracemalloc.Statistic(tb2, 5002, 2),
561            tracemalloc.Statistic(tb5, 400, 1),
562            tracemalloc.Statistic(tb1, 30, 3),
563        ])
564
565        # stats diff per file
566        diff = snapshot2.compare_to(snapshot, 'traceback')
567        self.assertEqual(diff, [
568            tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
569            tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
570            tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
571            tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
572            tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
573        ])
574
575        self.assertRaises(ValueError,
576                          snapshot.statistics, 'traceback', cumulative=True)
577
578    def test_snapshot_group_by_cumulative(self):
579        snapshot, snapshot2 = create_snapshots()
580        tb_0 = traceback_filename('<unknown>')
581        tb_a = traceback_filename('a.py')
582        tb_b = traceback_filename('b.py')
583        tb_a_2 = traceback_lineno('a.py', 2)
584        tb_a_5 = traceback_lineno('a.py', 5)
585        tb_b_1 = traceback_lineno('b.py', 1)
586        tb_b_4 = traceback_lineno('b.py', 4)
587
588        # per file
589        stats = snapshot.statistics('filename', True)
590        self.assertEqual(stats, [
591            tracemalloc.Statistic(tb_b, 98, 5),
592            tracemalloc.Statistic(tb_a, 32, 4),
593            tracemalloc.Statistic(tb_0, 7, 1),
594        ])
595
596        # per line
597        stats = snapshot.statistics('lineno', True)
598        self.assertEqual(stats, [
599            tracemalloc.Statistic(tb_b_1, 66, 1),
600            tracemalloc.Statistic(tb_b_4, 32, 4),
601            tracemalloc.Statistic(tb_a_2, 30, 3),
602            tracemalloc.Statistic(tb_0, 7, 1),
603            tracemalloc.Statistic(tb_a_5, 2, 1),
604        ])
605
606    def test_trace_format(self):
607        snapshot, snapshot2 = create_snapshots()
608        trace = snapshot.traces[0]
609        self.assertEqual(str(trace), 'b.py:4: 10 B')
610        traceback = trace.traceback
611        self.assertEqual(str(traceback), 'b.py:4')
612        frame = traceback[0]
613        self.assertEqual(str(frame), 'b.py:4')
614
615    def test_statistic_format(self):
616        snapshot, snapshot2 = create_snapshots()
617        stats = snapshot.statistics('lineno')
618        stat = stats[0]
619        self.assertEqual(str(stat),
620                         'b.py:1: size=66 B, count=1, average=66 B')
621
622    def test_statistic_diff_format(self):
623        snapshot, snapshot2 = create_snapshots()
624        stats = snapshot2.compare_to(snapshot, 'lineno')
625        stat = stats[0]
626        self.assertEqual(str(stat),
627                         'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
628
629    def test_slices(self):
630        snapshot, snapshot2 = create_snapshots()
631        self.assertEqual(snapshot.traces[:2],
632                         (snapshot.traces[0], snapshot.traces[1]))
633
634        traceback = snapshot.traces[0].traceback
635        self.assertEqual(traceback[:2],
636                         (traceback[0], traceback[1]))
637
638    def test_format_traceback(self):
639        snapshot, snapshot2 = create_snapshots()
640        def getline(filename, lineno):
641            return '  <%s, %s>' % (filename, lineno)
642        with unittest.mock.patch('tracemalloc.linecache.getline',
643                                 side_effect=getline):
644            tb = snapshot.traces[0].traceback
645            self.assertEqual(tb.format(),
646                             ['  File "b.py", line 4',
647                              '    <b.py, 4>',
648                              '  File "a.py", line 2',
649                              '    <a.py, 2>'])
650
651            self.assertEqual(tb.format(limit=1),
652                             ['  File "a.py", line 2',
653                              '    <a.py, 2>'])
654
655            self.assertEqual(tb.format(limit=-1),
656                             ['  File "b.py", line 4',
657                              '    <b.py, 4>'])
658
659            self.assertEqual(tb.format(most_recent_first=True),
660                             ['  File "a.py", line 2',
661                              '    <a.py, 2>',
662                              '  File "b.py", line 4',
663                              '    <b.py, 4>'])
664
665            self.assertEqual(tb.format(limit=1, most_recent_first=True),
666                             ['  File "a.py", line 2',
667                              '    <a.py, 2>'])
668
669            self.assertEqual(tb.format(limit=-1, most_recent_first=True),
670                             ['  File "b.py", line 4',
671                              '    <b.py, 4>'])
672
673
674class TestFilters(unittest.TestCase):
675    maxDiff = 2048
676
677    def test_filter_attributes(self):
678        # test default values
679        f = tracemalloc.Filter(True, "abc")
680        self.assertEqual(f.inclusive, True)
681        self.assertEqual(f.filename_pattern, "abc")
682        self.assertIsNone(f.lineno)
683        self.assertEqual(f.all_frames, False)
684
685        # test custom values
686        f = tracemalloc.Filter(False, "test.py", 123, True)
687        self.assertEqual(f.inclusive, False)
688        self.assertEqual(f.filename_pattern, "test.py")
689        self.assertEqual(f.lineno, 123)
690        self.assertEqual(f.all_frames, True)
691
692        # parameters passed by keyword
693        f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
694        self.assertEqual(f.inclusive, False)
695        self.assertEqual(f.filename_pattern, "test.py")
696        self.assertEqual(f.lineno, 123)
697        self.assertEqual(f.all_frames, True)
698
699        # read-only attribute
700        self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
701
702    def test_filter_match(self):
703        # filter without line number
704        f = tracemalloc.Filter(True, "abc")
705        self.assertTrue(f._match_frame("abc", 0))
706        self.assertTrue(f._match_frame("abc", 5))
707        self.assertTrue(f._match_frame("abc", 10))
708        self.assertFalse(f._match_frame("12356", 0))
709        self.assertFalse(f._match_frame("12356", 5))
710        self.assertFalse(f._match_frame("12356", 10))
711
712        f = tracemalloc.Filter(False, "abc")
713        self.assertFalse(f._match_frame("abc", 0))
714        self.assertFalse(f._match_frame("abc", 5))
715        self.assertFalse(f._match_frame("abc", 10))
716        self.assertTrue(f._match_frame("12356", 0))
717        self.assertTrue(f._match_frame("12356", 5))
718        self.assertTrue(f._match_frame("12356", 10))
719
720        # filter with line number > 0
721        f = tracemalloc.Filter(True, "abc", 5)
722        self.assertFalse(f._match_frame("abc", 0))
723        self.assertTrue(f._match_frame("abc", 5))
724        self.assertFalse(f._match_frame("abc", 10))
725        self.assertFalse(f._match_frame("12356", 0))
726        self.assertFalse(f._match_frame("12356", 5))
727        self.assertFalse(f._match_frame("12356", 10))
728
729        f = tracemalloc.Filter(False, "abc", 5)
730        self.assertTrue(f._match_frame("abc", 0))
731        self.assertFalse(f._match_frame("abc", 5))
732        self.assertTrue(f._match_frame("abc", 10))
733        self.assertTrue(f._match_frame("12356", 0))
734        self.assertTrue(f._match_frame("12356", 5))
735        self.assertTrue(f._match_frame("12356", 10))
736
737        # filter with line number 0
738        f = tracemalloc.Filter(True, "abc", 0)
739        self.assertTrue(f._match_frame("abc", 0))
740        self.assertFalse(f._match_frame("abc", 5))
741        self.assertFalse(f._match_frame("abc", 10))
742        self.assertFalse(f._match_frame("12356", 0))
743        self.assertFalse(f._match_frame("12356", 5))
744        self.assertFalse(f._match_frame("12356", 10))
745
746        f = tracemalloc.Filter(False, "abc", 0)
747        self.assertFalse(f._match_frame("abc", 0))
748        self.assertTrue(f._match_frame("abc", 5))
749        self.assertTrue(f._match_frame("abc", 10))
750        self.assertTrue(f._match_frame("12356", 0))
751        self.assertTrue(f._match_frame("12356", 5))
752        self.assertTrue(f._match_frame("12356", 10))
753
754    def test_filter_match_filename(self):
755        def fnmatch(inclusive, filename, pattern):
756            f = tracemalloc.Filter(inclusive, pattern)
757            return f._match_frame(filename, 0)
758
759        self.assertTrue(fnmatch(True, "abc", "abc"))
760        self.assertFalse(fnmatch(True, "12356", "abc"))
761        self.assertFalse(fnmatch(True, "<unknown>", "abc"))
762
763        self.assertFalse(fnmatch(False, "abc", "abc"))
764        self.assertTrue(fnmatch(False, "12356", "abc"))
765        self.assertTrue(fnmatch(False, "<unknown>", "abc"))
766
767    def test_filter_match_filename_joker(self):
768        def fnmatch(filename, pattern):
769            filter = tracemalloc.Filter(True, pattern)
770            return filter._match_frame(filename, 0)
771
772        # empty string
773        self.assertFalse(fnmatch('abc', ''))
774        self.assertFalse(fnmatch('', 'abc'))
775        self.assertTrue(fnmatch('', ''))
776        self.assertTrue(fnmatch('', '*'))
777
778        # no *
779        self.assertTrue(fnmatch('abc', 'abc'))
780        self.assertFalse(fnmatch('abc', 'abcd'))
781        self.assertFalse(fnmatch('abc', 'def'))
782
783        # a*
784        self.assertTrue(fnmatch('abc', 'a*'))
785        self.assertTrue(fnmatch('abc', 'abc*'))
786        self.assertFalse(fnmatch('abc', 'b*'))
787        self.assertFalse(fnmatch('abc', 'abcd*'))
788
789        # a*b
790        self.assertTrue(fnmatch('abc', 'a*c'))
791        self.assertTrue(fnmatch('abcdcx', 'a*cx'))
792        self.assertFalse(fnmatch('abb', 'a*c'))
793        self.assertFalse(fnmatch('abcdce', 'a*cx'))
794
795        # a*b*c
796        self.assertTrue(fnmatch('abcde', 'a*c*e'))
797        self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
798        self.assertFalse(fnmatch('abcdd', 'a*c*e'))
799        self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
800
801        # replace .pyc suffix with .py
802        self.assertTrue(fnmatch('a.pyc', 'a.py'))
803        self.assertTrue(fnmatch('a.py', 'a.pyc'))
804
805        if os.name == 'nt':
806            # case insensitive
807            self.assertTrue(fnmatch('aBC', 'ABc'))
808            self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
809
810            self.assertTrue(fnmatch('a.pyc', 'a.PY'))
811            self.assertTrue(fnmatch('a.py', 'a.PYC'))
812        else:
813            # case sensitive
814            self.assertFalse(fnmatch('aBC', 'ABc'))
815            self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
816
817            self.assertFalse(fnmatch('a.pyc', 'a.PY'))
818            self.assertFalse(fnmatch('a.py', 'a.PYC'))
819
820        if os.name == 'nt':
821            # normalize alternate separator "/" to the standard separator "\"
822            self.assertTrue(fnmatch(r'a/b', r'a\b'))
823            self.assertTrue(fnmatch(r'a\b', r'a/b'))
824            self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
825            self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
826        else:
827            # there is no alternate separator
828            self.assertFalse(fnmatch(r'a/b', r'a\b'))
829            self.assertFalse(fnmatch(r'a\b', r'a/b'))
830            self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
831            self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
832
833        # as of 3.5, .pyo is no longer munged to .py
834        self.assertFalse(fnmatch('a.pyo', 'a.py'))
835
836    def test_filter_match_trace(self):
837        t1 = (("a.py", 2), ("b.py", 3))
838        t2 = (("b.py", 4), ("b.py", 5))
839        t3 = (("c.py", 5), ('<unknown>', 0))
840        unknown = (('<unknown>', 0),)
841
842        f = tracemalloc.Filter(True, "b.py", all_frames=True)
843        self.assertTrue(f._match_traceback(t1))
844        self.assertTrue(f._match_traceback(t2))
845        self.assertFalse(f._match_traceback(t3))
846        self.assertFalse(f._match_traceback(unknown))
847
848        f = tracemalloc.Filter(True, "b.py", all_frames=False)
849        self.assertFalse(f._match_traceback(t1))
850        self.assertTrue(f._match_traceback(t2))
851        self.assertFalse(f._match_traceback(t3))
852        self.assertFalse(f._match_traceback(unknown))
853
854        f = tracemalloc.Filter(False, "b.py", all_frames=True)
855        self.assertFalse(f._match_traceback(t1))
856        self.assertFalse(f._match_traceback(t2))
857        self.assertTrue(f._match_traceback(t3))
858        self.assertTrue(f._match_traceback(unknown))
859
860        f = tracemalloc.Filter(False, "b.py", all_frames=False)
861        self.assertTrue(f._match_traceback(t1))
862        self.assertFalse(f._match_traceback(t2))
863        self.assertTrue(f._match_traceback(t3))
864        self.assertTrue(f._match_traceback(unknown))
865
866        f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
867        self.assertTrue(f._match_traceback(t1))
868        self.assertTrue(f._match_traceback(t2))
869        self.assertTrue(f._match_traceback(t3))
870        self.assertFalse(f._match_traceback(unknown))
871
872        f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
873        self.assertFalse(f._match_traceback(t1))
874        self.assertFalse(f._match_traceback(t2))
875        self.assertTrue(f._match_traceback(t3))
876        self.assertTrue(f._match_traceback(unknown))
877
878        f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
879        self.assertTrue(f._match_traceback(t1))
880        self.assertTrue(f._match_traceback(t2))
881        self.assertFalse(f._match_traceback(t3))
882        self.assertFalse(f._match_traceback(unknown))
883
884
885class TestCommandLine(unittest.TestCase):
886    def test_env_var_disabled_by_default(self):
887        # not tracing by default
888        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
889        ok, stdout, stderr = assert_python_ok('-c', code)
890        stdout = stdout.rstrip()
891        self.assertEqual(stdout, b'False')
892
893    @unittest.skipIf(interpreter_requires_environment(),
894                     'Cannot run -E tests when PYTHON env vars are required.')
895    def test_env_var_ignored_with_E(self):
896        """PYTHON* environment variables must be ignored when -E is present."""
897        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
898        ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
899        stdout = stdout.rstrip()
900        self.assertEqual(stdout, b'False')
901
902    def test_env_var_disabled(self):
903        # tracing at startup
904        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
905        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
906        stdout = stdout.rstrip()
907        self.assertEqual(stdout, b'False')
908
909    def test_env_var_enabled_at_startup(self):
910        # tracing at startup
911        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
912        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
913        stdout = stdout.rstrip()
914        self.assertEqual(stdout, b'True')
915
916    def test_env_limit(self):
917        # start and set the number of frames
918        code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
919        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
920        stdout = stdout.rstrip()
921        self.assertEqual(stdout, b'10')
922
923    def check_env_var_invalid(self, nframe):
924        with support.SuppressCrashReport():
925            ok, stdout, stderr = assert_python_failure(
926                '-c', 'pass',
927                PYTHONTRACEMALLOC=str(nframe))
928
929        if b'ValueError: the number of frames must be in range' in stderr:
930            return
931        if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
932            return
933        self.fail(f"unexpected output: {stderr!a}")
934
935
936    def test_env_var_invalid(self):
937        for nframe in INVALID_NFRAME:
938            with self.subTest(nframe=nframe):
939                self.check_env_var_invalid(nframe)
940
941    def test_sys_xoptions(self):
942        for xoptions, nframe in (
943            ('tracemalloc', 1),
944            ('tracemalloc=1', 1),
945            ('tracemalloc=15', 15),
946        ):
947            with self.subTest(xoptions=xoptions, nframe=nframe):
948                code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
949                ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
950                stdout = stdout.rstrip()
951                self.assertEqual(stdout, str(nframe).encode('ascii'))
952
953    def check_sys_xoptions_invalid(self, nframe):
954        args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
955        with support.SuppressCrashReport():
956            ok, stdout, stderr = assert_python_failure(*args)
957
958        if b'ValueError: the number of frames must be in range' in stderr:
959            return
960        if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
961            return
962        self.fail(f"unexpected output: {stderr!a}")
963
964    def test_sys_xoptions_invalid(self):
965        for nframe in INVALID_NFRAME:
966            with self.subTest(nframe=nframe):
967                self.check_sys_xoptions_invalid(nframe)
968
969    @unittest.skipIf(_testcapi is None, 'need _testcapi')
970    def test_pymem_alloc0(self):
971        # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
972        # does not crash.
973        code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
974        assert_python_ok('-X', 'tracemalloc', '-c', code)
975
976
977@unittest.skipIf(_testcapi is None, 'need _testcapi')
978class TestCAPI(unittest.TestCase):
979    maxDiff = 80 * 20
980
981    def setUp(self):
982        if tracemalloc.is_tracing():
983            self.skipTest("tracemalloc must be stopped before the test")
984
985        self.domain = 5
986        self.size = 123
987        self.obj = allocate_bytes(self.size)[0]
988
989        # for the type "object", id(obj) is the address of its memory block.
990        # This type is not tracked by the garbage collector
991        self.ptr = id(self.obj)
992
993    def tearDown(self):
994        tracemalloc.stop()
995
996    def get_traceback(self):
997        frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
998        if frames is not None:
999            return tracemalloc.Traceback(frames)
1000        else:
1001            return None
1002
1003    def track(self, release_gil=False, nframe=1):
1004        frames = get_frames(nframe, 1)
1005        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
1006                                    release_gil)
1007        return frames
1008
1009    def untrack(self):
1010        _testcapi.tracemalloc_untrack(self.domain, self.ptr)
1011
1012    def get_traced_memory(self):
1013        # Get the traced size in the domain
1014        snapshot = tracemalloc.take_snapshot()
1015        domain_filter = tracemalloc.DomainFilter(True, self.domain)
1016        snapshot = snapshot.filter_traces([domain_filter])
1017        return sum(trace.size for trace in snapshot.traces)
1018
1019    def check_track(self, release_gil):
1020        nframe = 5
1021        tracemalloc.start(nframe)
1022
1023        size = tracemalloc.get_traced_memory()[0]
1024
1025        frames = self.track(release_gil, nframe)
1026        self.assertEqual(self.get_traceback(),
1027                         tracemalloc.Traceback(frames))
1028
1029        self.assertEqual(self.get_traced_memory(), self.size)
1030
1031    def test_track(self):
1032        self.check_track(False)
1033
1034    def test_track_without_gil(self):
1035        # check that calling _PyTraceMalloc_Track() without holding the GIL
1036        # works too
1037        self.check_track(True)
1038
1039    def test_track_already_tracked(self):
1040        nframe = 5
1041        tracemalloc.start(nframe)
1042
1043        # track a first time
1044        self.track()
1045
1046        # calling _PyTraceMalloc_Track() must remove the old trace and add
1047        # a new trace with the new traceback
1048        frames = self.track(nframe=nframe)
1049        self.assertEqual(self.get_traceback(),
1050                         tracemalloc.Traceback(frames))
1051
1052    def test_untrack(self):
1053        tracemalloc.start()
1054
1055        self.track()
1056        self.assertIsNotNone(self.get_traceback())
1057        self.assertEqual(self.get_traced_memory(), self.size)
1058
1059        # untrack must remove the trace
1060        self.untrack()
1061        self.assertIsNone(self.get_traceback())
1062        self.assertEqual(self.get_traced_memory(), 0)
1063
1064        # calling _PyTraceMalloc_Untrack() multiple times must not crash
1065        self.untrack()
1066        self.untrack()
1067
1068    def test_stop_track(self):
1069        tracemalloc.start()
1070        tracemalloc.stop()
1071
1072        with self.assertRaises(RuntimeError):
1073            self.track()
1074        self.assertIsNone(self.get_traceback())
1075
1076    def test_stop_untrack(self):
1077        tracemalloc.start()
1078        self.track()
1079
1080        tracemalloc.stop()
1081        with self.assertRaises(RuntimeError):
1082            self.untrack()
1083
1084
1085if __name__ == "__main__":
1086    unittest.main()
1087