1import contextlib
2import os
3import sys
4import tracemalloc
5import unittest
6from unittest.mock import patch
7from test.support.script_helper import (assert_python_ok, assert_python_failure,
8                                        interpreter_requires_environment)
9from test import support
10
11try:
12    import _testcapi
13except ImportError:
14    _testcapi = None
15
16
17EMPTY_STRING_SIZE = sys.getsizeof(b'')
18INVALID_NFRAME = (-1, 2**30)
19
20
21def get_frames(nframe, lineno_delta):
22    frames = []
23    frame = sys._getframe(1)
24    for index in range(nframe):
25        code = frame.f_code
26        lineno = frame.f_lineno + lineno_delta
27        frames.append((code.co_filename, lineno))
28        lineno_delta = 0
29        frame = frame.f_back
30        if frame is None:
31            break
32    return tuple(frames)
33
34def allocate_bytes(size):
35    nframe = tracemalloc.get_traceback_limit()
36    bytes_len = (size - EMPTY_STRING_SIZE)
37    frames = get_frames(nframe, 1)
38    data = b'x' * bytes_len
39    return data, tracemalloc.Traceback(frames)
40
41def create_snapshots():
42    traceback_limit = 2
43
44    # _tracemalloc._get_traces() returns a list of (domain, size,
45    # traceback_frames) tuples. traceback_frames is a tuple of (filename,
46    # line_number) tuples.
47    raw_traces = [
48        (0, 10, (('a.py', 2), ('b.py', 4))),
49        (0, 10, (('a.py', 2), ('b.py', 4))),
50        (0, 10, (('a.py', 2), ('b.py', 4))),
51
52        (1, 2, (('a.py', 5), ('b.py', 4))),
53
54        (2, 66, (('b.py', 1),)),
55
56        (3, 7, (('<unknown>', 0),)),
57    ]
58    snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
59
60    raw_traces2 = [
61        (0, 10, (('a.py', 2), ('b.py', 4))),
62        (0, 10, (('a.py', 2), ('b.py', 4))),
63        (0, 10, (('a.py', 2), ('b.py', 4))),
64
65        (2, 2, (('a.py', 5), ('b.py', 4))),
66        (2, 5000, (('a.py', 5), ('b.py', 4))),
67
68        (4, 400, (('c.py', 578),)),
69    ]
70    snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
71
72    return (snapshot, snapshot2)
73
74def frame(filename, lineno):
75    return tracemalloc._Frame((filename, lineno))
76
77def traceback(*frames):
78    return tracemalloc.Traceback(frames)
79
80def traceback_lineno(filename, lineno):
81    return traceback((filename, lineno))
82
83def traceback_filename(filename):
84    return traceback_lineno(filename, 0)
85
86
87class TestTracemallocEnabled(unittest.TestCase):
88    def setUp(self):
89        if tracemalloc.is_tracing():
90            self.skipTest("tracemalloc must be stopped before the test")
91
92        tracemalloc.start(1)
93
94    def tearDown(self):
95        tracemalloc.stop()
96
97    def test_get_tracemalloc_memory(self):
98        data = [allocate_bytes(123) for count in range(1000)]
99        size = tracemalloc.get_tracemalloc_memory()
100        self.assertGreaterEqual(size, 0)
101
102        tracemalloc.clear_traces()
103        size2 = tracemalloc.get_tracemalloc_memory()
104        self.assertGreaterEqual(size2, 0)
105        self.assertLessEqual(size2, size)
106
107    def test_get_object_traceback(self):
108        tracemalloc.clear_traces()
109        obj_size = 12345
110        obj, obj_traceback = allocate_bytes(obj_size)
111        traceback = tracemalloc.get_object_traceback(obj)
112        self.assertEqual(traceback, obj_traceback)
113
114    def test_new_reference(self):
115        tracemalloc.clear_traces()
116        # gc.collect() indirectly calls PyList_ClearFreeList()
117        support.gc_collect()
118
119        # Create a list and "destroy it": put it in the PyListObject free list
120        obj = []
121        obj = None
122
123        # Create a list which should reuse the previously created empty list
124        obj = []
125
126        nframe = tracemalloc.get_traceback_limit()
127        frames = get_frames(nframe, -3)
128        obj_traceback = tracemalloc.Traceback(frames)
129
130        traceback = tracemalloc.get_object_traceback(obj)
131        self.assertIsNotNone(traceback)
132        self.assertEqual(traceback, obj_traceback)
133
134    def test_set_traceback_limit(self):
135        obj_size = 10
136
137        tracemalloc.stop()
138        self.assertRaises(ValueError, tracemalloc.start, -1)
139
140        tracemalloc.stop()
141        tracemalloc.start(10)
142        obj2, obj2_traceback = allocate_bytes(obj_size)
143        traceback = tracemalloc.get_object_traceback(obj2)
144        self.assertEqual(len(traceback), 10)
145        self.assertEqual(traceback, obj2_traceback)
146
147        tracemalloc.stop()
148        tracemalloc.start(1)
149        obj, obj_traceback = allocate_bytes(obj_size)
150        traceback = tracemalloc.get_object_traceback(obj)
151        self.assertEqual(len(traceback), 1)
152        self.assertEqual(traceback, obj_traceback)
153
154    def find_trace(self, traces, traceback):
155        for trace in traces:
156            if trace[2] == traceback._frames:
157                return trace
158
159        self.fail("trace not found")
160
161    def test_get_traces(self):
162        tracemalloc.clear_traces()
163        obj_size = 12345
164        obj, obj_traceback = allocate_bytes(obj_size)
165
166        traces = tracemalloc._get_traces()
167        trace = self.find_trace(traces, obj_traceback)
168
169        self.assertIsInstance(trace, tuple)
170        domain, size, traceback = trace
171        self.assertEqual(size, obj_size)
172        self.assertEqual(traceback, obj_traceback._frames)
173
174        tracemalloc.stop()
175        self.assertEqual(tracemalloc._get_traces(), [])
176
177    def test_get_traces_intern_traceback(self):
178        # dummy wrappers to get more useful and identical frames in the traceback
179        def allocate_bytes2(size):
180            return allocate_bytes(size)
181        def allocate_bytes3(size):
182            return allocate_bytes2(size)
183        def allocate_bytes4(size):
184            return allocate_bytes3(size)
185
186        # Ensure that two identical tracebacks are not duplicated
187        tracemalloc.stop()
188        tracemalloc.start(4)
189        obj_size = 123
190        obj1, obj1_traceback = allocate_bytes4(obj_size)
191        obj2, obj2_traceback = allocate_bytes4(obj_size)
192
193        traces = tracemalloc._get_traces()
194
195        obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
196        obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
197
198        trace1 = self.find_trace(traces, obj1_traceback)
199        trace2 = self.find_trace(traces, obj2_traceback)
200        domain1, size1, traceback1 = trace1
201        domain2, size2, traceback2 = trace2
202        self.assertIs(traceback2, traceback1)
203
204    def test_get_traced_memory(self):
205        # Python allocates some internals objects, so the test must tolerate
206        # a small difference between the expected size and the real usage
207        max_error = 2048
208
209        # allocate one object
210        obj_size = 1024 * 1024
211        tracemalloc.clear_traces()
212        obj, obj_traceback = allocate_bytes(obj_size)
213        size, peak_size = tracemalloc.get_traced_memory()
214        self.assertGreaterEqual(size, obj_size)
215        self.assertGreaterEqual(peak_size, size)
216
217        self.assertLessEqual(size - obj_size, max_error)
218        self.assertLessEqual(peak_size - size, max_error)
219
220        # destroy the object
221        obj = None
222        size2, peak_size2 = tracemalloc.get_traced_memory()
223        self.assertLess(size2, size)
224        self.assertGreaterEqual(size - size2, obj_size - max_error)
225        self.assertGreaterEqual(peak_size2, peak_size)
226
227        # clear_traces() must reset traced memory counters
228        tracemalloc.clear_traces()
229        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
230
231        # allocate another object
232        obj, obj_traceback = allocate_bytes(obj_size)
233        size, peak_size = tracemalloc.get_traced_memory()
234        self.assertGreaterEqual(size, obj_size)
235
236        # stop() also resets traced memory counters
237        tracemalloc.stop()
238        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
239
240    def test_clear_traces(self):
241        obj, obj_traceback = allocate_bytes(123)
242        traceback = tracemalloc.get_object_traceback(obj)
243        self.assertIsNotNone(traceback)
244
245        tracemalloc.clear_traces()
246        traceback2 = tracemalloc.get_object_traceback(obj)
247        self.assertIsNone(traceback2)
248
249    def test_is_tracing(self):
250        tracemalloc.stop()
251        self.assertFalse(tracemalloc.is_tracing())
252
253        tracemalloc.start()
254        self.assertTrue(tracemalloc.is_tracing())
255
256    def test_snapshot(self):
257        obj, source = allocate_bytes(123)
258
259        # take a snapshot
260        snapshot = tracemalloc.take_snapshot()
261
262        # write on disk
263        snapshot.dump(support.TESTFN)
264        self.addCleanup(support.unlink, support.TESTFN)
265
266        # load from disk
267        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
268        self.assertEqual(snapshot2.traces, snapshot.traces)
269
270        # tracemalloc must be tracing memory allocations to take a snapshot
271        tracemalloc.stop()
272        with self.assertRaises(RuntimeError) as cm:
273            tracemalloc.take_snapshot()
274        self.assertEqual(str(cm.exception),
275                         "the tracemalloc module must be tracing memory "
276                         "allocations to take a snapshot")
277
278    def test_snapshot_save_attr(self):
279        # take a snapshot with a new attribute
280        snapshot = tracemalloc.take_snapshot()
281        snapshot.test_attr = "new"
282        snapshot.dump(support.TESTFN)
283        self.addCleanup(support.unlink, support.TESTFN)
284
285        # load() should recreate the attribute
286        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
287        self.assertEqual(snapshot2.test_attr, "new")
288
289    def fork_child(self):
290        if not tracemalloc.is_tracing():
291            return 2
292
293        obj_size = 12345
294        obj, obj_traceback = allocate_bytes(obj_size)
295        traceback = tracemalloc.get_object_traceback(obj)
296        if traceback is None:
297            return 3
298
299        # everything is fine
300        return 0
301
302    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
303    def test_fork(self):
304        # check that tracemalloc is still working after fork
305        pid = os.fork()
306        if not pid:
307            # child
308            exitcode = 1
309            try:
310                exitcode = self.fork_child()
311            finally:
312                os._exit(exitcode)
313        else:
314            pid2, status = os.waitpid(pid, 0)
315            self.assertTrue(os.WIFEXITED(status))
316            exitcode = os.WEXITSTATUS(status)
317            self.assertEqual(exitcode, 0)
318
319
320class TestSnapshot(unittest.TestCase):
321    maxDiff = 4000
322
323    def test_create_snapshot(self):
324        raw_traces = [(0, 5, (('a.py', 2),))]
325
326        with contextlib.ExitStack() as stack:
327            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
328                                             return_value=True))
329            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
330                                             return_value=5))
331            stack.enter_context(patch.object(tracemalloc, '_get_traces',
332                                             return_value=raw_traces))
333
334            snapshot = tracemalloc.take_snapshot()
335            self.assertEqual(snapshot.traceback_limit, 5)
336            self.assertEqual(len(snapshot.traces), 1)
337            trace = snapshot.traces[0]
338            self.assertEqual(trace.size, 5)
339            self.assertEqual(len(trace.traceback), 1)
340            self.assertEqual(trace.traceback[0].filename, 'a.py')
341            self.assertEqual(trace.traceback[0].lineno, 2)
342
343    def test_filter_traces(self):
344        snapshot, snapshot2 = create_snapshots()
345        filter1 = tracemalloc.Filter(False, "b.py")
346        filter2 = tracemalloc.Filter(True, "a.py", 2)
347        filter3 = tracemalloc.Filter(True, "a.py", 5)
348
349        original_traces = list(snapshot.traces._traces)
350
351        # exclude b.py
352        snapshot3 = snapshot.filter_traces((filter1,))
353        self.assertEqual(snapshot3.traces._traces, [
354            (0, 10, (('a.py', 2), ('b.py', 4))),
355            (0, 10, (('a.py', 2), ('b.py', 4))),
356            (0, 10, (('a.py', 2), ('b.py', 4))),
357            (1, 2, (('a.py', 5), ('b.py', 4))),
358            (3, 7, (('<unknown>', 0),)),
359        ])
360
361        # filter_traces() must not touch the original snapshot
362        self.assertEqual(snapshot.traces._traces, original_traces)
363
364        # only include two lines of a.py
365        snapshot4 = snapshot3.filter_traces((filter2, filter3))
366        self.assertEqual(snapshot4.traces._traces, [
367            (0, 10, (('a.py', 2), ('b.py', 4))),
368            (0, 10, (('a.py', 2), ('b.py', 4))),
369            (0, 10, (('a.py', 2), ('b.py', 4))),
370            (1, 2, (('a.py', 5), ('b.py', 4))),
371        ])
372
373        # No filter: just duplicate the snapshot
374        snapshot5 = snapshot.filter_traces(())
375        self.assertIsNot(snapshot5, snapshot)
376        self.assertIsNot(snapshot5.traces, snapshot.traces)
377        self.assertEqual(snapshot5.traces, snapshot.traces)
378
379        self.assertRaises(TypeError, snapshot.filter_traces, filter1)
380
381    def test_filter_traces_domain(self):
382        snapshot, snapshot2 = create_snapshots()
383        filter1 = tracemalloc.Filter(False, "a.py", domain=1)
384        filter2 = tracemalloc.Filter(True, "a.py", domain=1)
385
386        original_traces = list(snapshot.traces._traces)
387
388        # exclude a.py of domain 1
389        snapshot3 = snapshot.filter_traces((filter1,))
390        self.assertEqual(snapshot3.traces._traces, [
391            (0, 10, (('a.py', 2), ('b.py', 4))),
392            (0, 10, (('a.py', 2), ('b.py', 4))),
393            (0, 10, (('a.py', 2), ('b.py', 4))),
394            (2, 66, (('b.py', 1),)),
395            (3, 7, (('<unknown>', 0),)),
396        ])
397
398        # include domain 1
399        snapshot3 = snapshot.filter_traces((filter1,))
400        self.assertEqual(snapshot3.traces._traces, [
401            (0, 10, (('a.py', 2), ('b.py', 4))),
402            (0, 10, (('a.py', 2), ('b.py', 4))),
403            (0, 10, (('a.py', 2), ('b.py', 4))),
404            (2, 66, (('b.py', 1),)),
405            (3, 7, (('<unknown>', 0),)),
406        ])
407
408    def test_filter_traces_domain_filter(self):
409        snapshot, snapshot2 = create_snapshots()
410        filter1 = tracemalloc.DomainFilter(False, domain=3)
411        filter2 = tracemalloc.DomainFilter(True, domain=3)
412
413        # exclude domain 2
414        snapshot3 = snapshot.filter_traces((filter1,))
415        self.assertEqual(snapshot3.traces._traces, [
416            (0, 10, (('a.py', 2), ('b.py', 4))),
417            (0, 10, (('a.py', 2), ('b.py', 4))),
418            (0, 10, (('a.py', 2), ('b.py', 4))),
419            (1, 2, (('a.py', 5), ('b.py', 4))),
420            (2, 66, (('b.py', 1),)),
421        ])
422
423        # include domain 2
424        snapshot3 = snapshot.filter_traces((filter2,))
425        self.assertEqual(snapshot3.traces._traces, [
426            (3, 7, (('<unknown>', 0),)),
427        ])
428
429    def test_snapshot_group_by_line(self):
430        snapshot, snapshot2 = create_snapshots()
431        tb_0 = traceback_lineno('<unknown>', 0)
432        tb_a_2 = traceback_lineno('a.py', 2)
433        tb_a_5 = traceback_lineno('a.py', 5)
434        tb_b_1 = traceback_lineno('b.py', 1)
435        tb_c_578 = traceback_lineno('c.py', 578)
436
437        # stats per file and line
438        stats1 = snapshot.statistics('lineno')
439        self.assertEqual(stats1, [
440            tracemalloc.Statistic(tb_b_1, 66, 1),
441            tracemalloc.Statistic(tb_a_2, 30, 3),
442            tracemalloc.Statistic(tb_0, 7, 1),
443            tracemalloc.Statistic(tb_a_5, 2, 1),
444        ])
445
446        # stats per file and line (2)
447        stats2 = snapshot2.statistics('lineno')
448        self.assertEqual(stats2, [
449            tracemalloc.Statistic(tb_a_5, 5002, 2),
450            tracemalloc.Statistic(tb_c_578, 400, 1),
451            tracemalloc.Statistic(tb_a_2, 30, 3),
452        ])
453
454        # stats diff per file and line
455        statistics = snapshot2.compare_to(snapshot, 'lineno')
456        self.assertEqual(statistics, [
457            tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
458            tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
459            tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
460            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
461            tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
462        ])
463
464    def test_snapshot_group_by_file(self):
465        snapshot, snapshot2 = create_snapshots()
466        tb_0 = traceback_filename('<unknown>')
467        tb_a = traceback_filename('a.py')
468        tb_b = traceback_filename('b.py')
469        tb_c = traceback_filename('c.py')
470
471        # stats per file
472        stats1 = snapshot.statistics('filename')
473        self.assertEqual(stats1, [
474            tracemalloc.Statistic(tb_b, 66, 1),
475            tracemalloc.Statistic(tb_a, 32, 4),
476            tracemalloc.Statistic(tb_0, 7, 1),
477        ])
478
479        # stats per file (2)
480        stats2 = snapshot2.statistics('filename')
481        self.assertEqual(stats2, [
482            tracemalloc.Statistic(tb_a, 5032, 5),
483            tracemalloc.Statistic(tb_c, 400, 1),
484        ])
485
486        # stats diff per file
487        diff = snapshot2.compare_to(snapshot, 'filename')
488        self.assertEqual(diff, [
489            tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
490            tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
491            tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
492            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
493        ])
494
495    def test_snapshot_group_by_traceback(self):
496        snapshot, snapshot2 = create_snapshots()
497
498        # stats per file
499        tb1 = traceback(('a.py', 2), ('b.py', 4))
500        tb2 = traceback(('a.py', 5), ('b.py', 4))
501        tb3 = traceback(('b.py', 1))
502        tb4 = traceback(('<unknown>', 0))
503        stats1 = snapshot.statistics('traceback')
504        self.assertEqual(stats1, [
505            tracemalloc.Statistic(tb3, 66, 1),
506            tracemalloc.Statistic(tb1, 30, 3),
507            tracemalloc.Statistic(tb4, 7, 1),
508            tracemalloc.Statistic(tb2, 2, 1),
509        ])
510
511        # stats per file (2)
512        tb5 = traceback(('c.py', 578))
513        stats2 = snapshot2.statistics('traceback')
514        self.assertEqual(stats2, [
515            tracemalloc.Statistic(tb2, 5002, 2),
516            tracemalloc.Statistic(tb5, 400, 1),
517            tracemalloc.Statistic(tb1, 30, 3),
518        ])
519
520        # stats diff per file
521        diff = snapshot2.compare_to(snapshot, 'traceback')
522        self.assertEqual(diff, [
523            tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
524            tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
525            tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
526            tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
527            tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
528        ])
529
530        self.assertRaises(ValueError,
531                          snapshot.statistics, 'traceback', cumulative=True)
532
533    def test_snapshot_group_by_cumulative(self):
534        snapshot, snapshot2 = create_snapshots()
535        tb_0 = traceback_filename('<unknown>')
536        tb_a = traceback_filename('a.py')
537        tb_b = traceback_filename('b.py')
538        tb_a_2 = traceback_lineno('a.py', 2)
539        tb_a_5 = traceback_lineno('a.py', 5)
540        tb_b_1 = traceback_lineno('b.py', 1)
541        tb_b_4 = traceback_lineno('b.py', 4)
542
543        # per file
544        stats = snapshot.statistics('filename', True)
545        self.assertEqual(stats, [
546            tracemalloc.Statistic(tb_b, 98, 5),
547            tracemalloc.Statistic(tb_a, 32, 4),
548            tracemalloc.Statistic(tb_0, 7, 1),
549        ])
550
551        # per line
552        stats = snapshot.statistics('lineno', True)
553        self.assertEqual(stats, [
554            tracemalloc.Statistic(tb_b_1, 66, 1),
555            tracemalloc.Statistic(tb_b_4, 32, 4),
556            tracemalloc.Statistic(tb_a_2, 30, 3),
557            tracemalloc.Statistic(tb_0, 7, 1),
558            tracemalloc.Statistic(tb_a_5, 2, 1),
559        ])
560
561    def test_trace_format(self):
562        snapshot, snapshot2 = create_snapshots()
563        trace = snapshot.traces[0]
564        self.assertEqual(str(trace), 'b.py:4: 10 B')
565        traceback = trace.traceback
566        self.assertEqual(str(traceback), 'b.py:4')
567        frame = traceback[0]
568        self.assertEqual(str(frame), 'b.py:4')
569
570    def test_statistic_format(self):
571        snapshot, snapshot2 = create_snapshots()
572        stats = snapshot.statistics('lineno')
573        stat = stats[0]
574        self.assertEqual(str(stat),
575                         'b.py:1: size=66 B, count=1, average=66 B')
576
577    def test_statistic_diff_format(self):
578        snapshot, snapshot2 = create_snapshots()
579        stats = snapshot2.compare_to(snapshot, 'lineno')
580        stat = stats[0]
581        self.assertEqual(str(stat),
582                         'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
583
584    def test_slices(self):
585        snapshot, snapshot2 = create_snapshots()
586        self.assertEqual(snapshot.traces[:2],
587                         (snapshot.traces[0], snapshot.traces[1]))
588
589        traceback = snapshot.traces[0].traceback
590        self.assertEqual(traceback[:2],
591                         (traceback[0], traceback[1]))
592
593    def test_format_traceback(self):
594        snapshot, snapshot2 = create_snapshots()
595        def getline(filename, lineno):
596            return '  <%s, %s>' % (filename, lineno)
597        with unittest.mock.patch('tracemalloc.linecache.getline',
598                                 side_effect=getline):
599            tb = snapshot.traces[0].traceback
600            self.assertEqual(tb.format(),
601                             ['  File "b.py", line 4',
602                              '    <b.py, 4>',
603                              '  File "a.py", line 2',
604                              '    <a.py, 2>'])
605
606            self.assertEqual(tb.format(limit=1),
607                             ['  File "a.py", line 2',
608                              '    <a.py, 2>'])
609
610            self.assertEqual(tb.format(limit=-1),
611                             ['  File "b.py", line 4',
612                              '    <b.py, 4>'])
613
614            self.assertEqual(tb.format(most_recent_first=True),
615                             ['  File "a.py", line 2',
616                              '    <a.py, 2>',
617                              '  File "b.py", line 4',
618                              '    <b.py, 4>'])
619
620            self.assertEqual(tb.format(limit=1, most_recent_first=True),
621                             ['  File "a.py", line 2',
622                              '    <a.py, 2>'])
623
624            self.assertEqual(tb.format(limit=-1, most_recent_first=True),
625                             ['  File "b.py", line 4',
626                              '    <b.py, 4>'])
627
628
629class TestFilters(unittest.TestCase):
630    maxDiff = 2048
631
632    def test_filter_attributes(self):
633        # test default values
634        f = tracemalloc.Filter(True, "abc")
635        self.assertEqual(f.inclusive, True)
636        self.assertEqual(f.filename_pattern, "abc")
637        self.assertIsNone(f.lineno)
638        self.assertEqual(f.all_frames, False)
639
640        # test custom values
641        f = tracemalloc.Filter(False, "test.py", 123, True)
642        self.assertEqual(f.inclusive, False)
643        self.assertEqual(f.filename_pattern, "test.py")
644        self.assertEqual(f.lineno, 123)
645        self.assertEqual(f.all_frames, True)
646
647        # parameters passed by keyword
648        f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
649        self.assertEqual(f.inclusive, False)
650        self.assertEqual(f.filename_pattern, "test.py")
651        self.assertEqual(f.lineno, 123)
652        self.assertEqual(f.all_frames, True)
653
654        # read-only attribute
655        self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
656
657    def test_filter_match(self):
658        # filter without line number
659        f = tracemalloc.Filter(True, "abc")
660        self.assertTrue(f._match_frame("abc", 0))
661        self.assertTrue(f._match_frame("abc", 5))
662        self.assertTrue(f._match_frame("abc", 10))
663        self.assertFalse(f._match_frame("12356", 0))
664        self.assertFalse(f._match_frame("12356", 5))
665        self.assertFalse(f._match_frame("12356", 10))
666
667        f = tracemalloc.Filter(False, "abc")
668        self.assertFalse(f._match_frame("abc", 0))
669        self.assertFalse(f._match_frame("abc", 5))
670        self.assertFalse(f._match_frame("abc", 10))
671        self.assertTrue(f._match_frame("12356", 0))
672        self.assertTrue(f._match_frame("12356", 5))
673        self.assertTrue(f._match_frame("12356", 10))
674
675        # filter with line number > 0
676        f = tracemalloc.Filter(True, "abc", 5)
677        self.assertFalse(f._match_frame("abc", 0))
678        self.assertTrue(f._match_frame("abc", 5))
679        self.assertFalse(f._match_frame("abc", 10))
680        self.assertFalse(f._match_frame("12356", 0))
681        self.assertFalse(f._match_frame("12356", 5))
682        self.assertFalse(f._match_frame("12356", 10))
683
684        f = tracemalloc.Filter(False, "abc", 5)
685        self.assertTrue(f._match_frame("abc", 0))
686        self.assertFalse(f._match_frame("abc", 5))
687        self.assertTrue(f._match_frame("abc", 10))
688        self.assertTrue(f._match_frame("12356", 0))
689        self.assertTrue(f._match_frame("12356", 5))
690        self.assertTrue(f._match_frame("12356", 10))
691
692        # filter with line number 0
693        f = tracemalloc.Filter(True, "abc", 0)
694        self.assertTrue(f._match_frame("abc", 0))
695        self.assertFalse(f._match_frame("abc", 5))
696        self.assertFalse(f._match_frame("abc", 10))
697        self.assertFalse(f._match_frame("12356", 0))
698        self.assertFalse(f._match_frame("12356", 5))
699        self.assertFalse(f._match_frame("12356", 10))
700
701        f = tracemalloc.Filter(False, "abc", 0)
702        self.assertFalse(f._match_frame("abc", 0))
703        self.assertTrue(f._match_frame("abc", 5))
704        self.assertTrue(f._match_frame("abc", 10))
705        self.assertTrue(f._match_frame("12356", 0))
706        self.assertTrue(f._match_frame("12356", 5))
707        self.assertTrue(f._match_frame("12356", 10))
708
709    def test_filter_match_filename(self):
710        def fnmatch(inclusive, filename, pattern):
711            f = tracemalloc.Filter(inclusive, pattern)
712            return f._match_frame(filename, 0)
713
714        self.assertTrue(fnmatch(True, "abc", "abc"))
715        self.assertFalse(fnmatch(True, "12356", "abc"))
716        self.assertFalse(fnmatch(True, "<unknown>", "abc"))
717
718        self.assertFalse(fnmatch(False, "abc", "abc"))
719        self.assertTrue(fnmatch(False, "12356", "abc"))
720        self.assertTrue(fnmatch(False, "<unknown>", "abc"))
721
722    def test_filter_match_filename_joker(self):
723        def fnmatch(filename, pattern):
724            filter = tracemalloc.Filter(True, pattern)
725            return filter._match_frame(filename, 0)
726
727        # empty string
728        self.assertFalse(fnmatch('abc', ''))
729        self.assertFalse(fnmatch('', 'abc'))
730        self.assertTrue(fnmatch('', ''))
731        self.assertTrue(fnmatch('', '*'))
732
733        # no *
734        self.assertTrue(fnmatch('abc', 'abc'))
735        self.assertFalse(fnmatch('abc', 'abcd'))
736        self.assertFalse(fnmatch('abc', 'def'))
737
738        # a*
739        self.assertTrue(fnmatch('abc', 'a*'))
740        self.assertTrue(fnmatch('abc', 'abc*'))
741        self.assertFalse(fnmatch('abc', 'b*'))
742        self.assertFalse(fnmatch('abc', 'abcd*'))
743
744        # a*b
745        self.assertTrue(fnmatch('abc', 'a*c'))
746        self.assertTrue(fnmatch('abcdcx', 'a*cx'))
747        self.assertFalse(fnmatch('abb', 'a*c'))
748        self.assertFalse(fnmatch('abcdce', 'a*cx'))
749
750        # a*b*c
751        self.assertTrue(fnmatch('abcde', 'a*c*e'))
752        self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
753        self.assertFalse(fnmatch('abcdd', 'a*c*e'))
754        self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
755
756        # replace .pyc suffix with .py
757        self.assertTrue(fnmatch('a.pyc', 'a.py'))
758        self.assertTrue(fnmatch('a.py', 'a.pyc'))
759
760        if os.name == 'nt':
761            # case insensitive
762            self.assertTrue(fnmatch('aBC', 'ABc'))
763            self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
764
765            self.assertTrue(fnmatch('a.pyc', 'a.PY'))
766            self.assertTrue(fnmatch('a.py', 'a.PYC'))
767        else:
768            # case sensitive
769            self.assertFalse(fnmatch('aBC', 'ABc'))
770            self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
771
772            self.assertFalse(fnmatch('a.pyc', 'a.PY'))
773            self.assertFalse(fnmatch('a.py', 'a.PYC'))
774
775        if os.name == 'nt':
776            # normalize alternate separator "/" to the standard separator "\"
777            self.assertTrue(fnmatch(r'a/b', r'a\b'))
778            self.assertTrue(fnmatch(r'a\b', r'a/b'))
779            self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
780            self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
781        else:
782            # there is no alternate separator
783            self.assertFalse(fnmatch(r'a/b', r'a\b'))
784            self.assertFalse(fnmatch(r'a\b', r'a/b'))
785            self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
786            self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
787
788        # as of 3.5, .pyo is no longer munged to .py
789        self.assertFalse(fnmatch('a.pyo', 'a.py'))
790
791    def test_filter_match_trace(self):
792        t1 = (("a.py", 2), ("b.py", 3))
793        t2 = (("b.py", 4), ("b.py", 5))
794        t3 = (("c.py", 5), ('<unknown>', 0))
795        unknown = (('<unknown>', 0),)
796
797        f = tracemalloc.Filter(True, "b.py", all_frames=True)
798        self.assertTrue(f._match_traceback(t1))
799        self.assertTrue(f._match_traceback(t2))
800        self.assertFalse(f._match_traceback(t3))
801        self.assertFalse(f._match_traceback(unknown))
802
803        f = tracemalloc.Filter(True, "b.py", all_frames=False)
804        self.assertFalse(f._match_traceback(t1))
805        self.assertTrue(f._match_traceback(t2))
806        self.assertFalse(f._match_traceback(t3))
807        self.assertFalse(f._match_traceback(unknown))
808
809        f = tracemalloc.Filter(False, "b.py", all_frames=True)
810        self.assertFalse(f._match_traceback(t1))
811        self.assertFalse(f._match_traceback(t2))
812        self.assertTrue(f._match_traceback(t3))
813        self.assertTrue(f._match_traceback(unknown))
814
815        f = tracemalloc.Filter(False, "b.py", all_frames=False)
816        self.assertTrue(f._match_traceback(t1))
817        self.assertFalse(f._match_traceback(t2))
818        self.assertTrue(f._match_traceback(t3))
819        self.assertTrue(f._match_traceback(unknown))
820
821        f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
822        self.assertTrue(f._match_traceback(t1))
823        self.assertTrue(f._match_traceback(t2))
824        self.assertTrue(f._match_traceback(t3))
825        self.assertFalse(f._match_traceback(unknown))
826
827        f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
828        self.assertFalse(f._match_traceback(t1))
829        self.assertFalse(f._match_traceback(t2))
830        self.assertTrue(f._match_traceback(t3))
831        self.assertTrue(f._match_traceback(unknown))
832
833        f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
834        self.assertTrue(f._match_traceback(t1))
835        self.assertTrue(f._match_traceback(t2))
836        self.assertFalse(f._match_traceback(t3))
837        self.assertFalse(f._match_traceback(unknown))
838
839
840class TestCommandLine(unittest.TestCase):
841    def test_env_var_disabled_by_default(self):
842        # not tracing by default
843        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
844        ok, stdout, stderr = assert_python_ok('-c', code)
845        stdout = stdout.rstrip()
846        self.assertEqual(stdout, b'False')
847
848    @unittest.skipIf(interpreter_requires_environment(),
849                     'Cannot run -E tests when PYTHON env vars are required.')
850    def test_env_var_ignored_with_E(self):
851        """PYTHON* environment variables must be ignored when -E is present."""
852        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
853        ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
854        stdout = stdout.rstrip()
855        self.assertEqual(stdout, b'False')
856
857    def test_env_var_disabled(self):
858        # tracing at startup
859        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
860        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
861        stdout = stdout.rstrip()
862        self.assertEqual(stdout, b'False')
863
864    def test_env_var_enabled_at_startup(self):
865        # tracing at startup
866        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
867        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
868        stdout = stdout.rstrip()
869        self.assertEqual(stdout, b'True')
870
871    def test_env_limit(self):
872        # start and set the number of frames
873        code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
874        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
875        stdout = stdout.rstrip()
876        self.assertEqual(stdout, b'10')
877
878    def check_env_var_invalid(self, nframe):
879        with support.SuppressCrashReport():
880            ok, stdout, stderr = assert_python_failure(
881                '-c', 'pass',
882                PYTHONTRACEMALLOC=str(nframe))
883
884        if b'ValueError: the number of frames must be in range' in stderr:
885            return
886        if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
887            return
888        self.fail(f"unexpected output: {stderr!a}")
889
890
891    def test_env_var_invalid(self):
892        for nframe in INVALID_NFRAME:
893            with self.subTest(nframe=nframe):
894                self.check_env_var_invalid(nframe)
895
896    def test_sys_xoptions(self):
897        for xoptions, nframe in (
898            ('tracemalloc', 1),
899            ('tracemalloc=1', 1),
900            ('tracemalloc=15', 15),
901        ):
902            with self.subTest(xoptions=xoptions, nframe=nframe):
903                code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
904                ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
905                stdout = stdout.rstrip()
906                self.assertEqual(stdout, str(nframe).encode('ascii'))
907
908    def check_sys_xoptions_invalid(self, nframe):
909        args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
910        with support.SuppressCrashReport():
911            ok, stdout, stderr = assert_python_failure(*args)
912
913        if b'ValueError: the number of frames must be in range' in stderr:
914            return
915        if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
916            return
917        self.fail(f"unexpected output: {stderr!a}")
918
919    def test_sys_xoptions_invalid(self):
920        for nframe in INVALID_NFRAME:
921            with self.subTest(nframe=nframe):
922                self.check_sys_xoptions_invalid(nframe)
923
924    @unittest.skipIf(_testcapi is None, 'need _testcapi')
925    def test_pymem_alloc0(self):
926        # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
927        # does not crash.
928        code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
929        assert_python_ok('-X', 'tracemalloc', '-c', code)
930
931
932@unittest.skipIf(_testcapi is None, 'need _testcapi')
933class TestCAPI(unittest.TestCase):
934    maxDiff = 80 * 20
935
936    def setUp(self):
937        if tracemalloc.is_tracing():
938            self.skipTest("tracemalloc must be stopped before the test")
939
940        self.domain = 5
941        self.size = 123
942        self.obj = allocate_bytes(self.size)[0]
943
944        # for the type "object", id(obj) is the address of its memory block.
945        # This type is not tracked by the garbage collector
946        self.ptr = id(self.obj)
947
948    def tearDown(self):
949        tracemalloc.stop()
950
951    def get_traceback(self):
952        frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
953        if frames is not None:
954            return tracemalloc.Traceback(frames)
955        else:
956            return None
957
958    def track(self, release_gil=False, nframe=1):
959        frames = get_frames(nframe, 1)
960        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
961                                    release_gil)
962        return frames
963
964    def untrack(self):
965        _testcapi.tracemalloc_untrack(self.domain, self.ptr)
966
967    def get_traced_memory(self):
968        # Get the traced size in the domain
969        snapshot = tracemalloc.take_snapshot()
970        domain_filter = tracemalloc.DomainFilter(True, self.domain)
971        snapshot = snapshot.filter_traces([domain_filter])
972        return sum(trace.size for trace in snapshot.traces)
973
974    def check_track(self, release_gil):
975        nframe = 5
976        tracemalloc.start(nframe)
977
978        size = tracemalloc.get_traced_memory()[0]
979
980        frames = self.track(release_gil, nframe)
981        self.assertEqual(self.get_traceback(),
982                         tracemalloc.Traceback(frames))
983
984        self.assertEqual(self.get_traced_memory(), self.size)
985
986    def test_track(self):
987        self.check_track(False)
988
989    def test_track_without_gil(self):
990        # check that calling _PyTraceMalloc_Track() without holding the GIL
991        # works too
992        self.check_track(True)
993
994    def test_track_already_tracked(self):
995        nframe = 5
996        tracemalloc.start(nframe)
997
998        # track a first time
999        self.track()
1000
1001        # calling _PyTraceMalloc_Track() must remove the old trace and add
1002        # a new trace with the new traceback
1003        frames = self.track(nframe=nframe)
1004        self.assertEqual(self.get_traceback(),
1005                         tracemalloc.Traceback(frames))
1006
1007    def test_untrack(self):
1008        tracemalloc.start()
1009
1010        self.track()
1011        self.assertIsNotNone(self.get_traceback())
1012        self.assertEqual(self.get_traced_memory(), self.size)
1013
1014        # untrack must remove the trace
1015        self.untrack()
1016        self.assertIsNone(self.get_traceback())
1017        self.assertEqual(self.get_traced_memory(), 0)
1018
1019        # calling _PyTraceMalloc_Untrack() multiple times must not crash
1020        self.untrack()
1021        self.untrack()
1022
1023    def test_stop_track(self):
1024        tracemalloc.start()
1025        tracemalloc.stop()
1026
1027        with self.assertRaises(RuntimeError):
1028            self.track()
1029        self.assertIsNone(self.get_traceback())
1030
1031    def test_stop_untrack(self):
1032        tracemalloc.start()
1033        self.track()
1034
1035        tracemalloc.stop()
1036        with self.assertRaises(RuntimeError):
1037            self.untrack()
1038
1039
1040def test_main():
1041    support.run_unittest(
1042        TestTracemallocEnabled,
1043        TestSnapshot,
1044        TestFilters,
1045        TestCommandLine,
1046        TestCAPI,
1047    )
1048
1049if __name__ == "__main__":
1050    test_main()
1051