1"""
2Tests for object finalization semantics, as outlined in PEP 442.
3"""
4
5import contextlib
6import gc
7import unittest
8import weakref
9
10try:
11    from _testcapi import with_tp_del
12except ImportError:
13    def with_tp_del(cls):
14        class C(object):
15            def __new__(cls, *args, **kwargs):
16                raise TypeError('requires _testcapi.with_tp_del')
17        return C
18
19try:
20    from _testcapi import without_gc
21except ImportError:
22    def without_gc(cls):
23        class C:
24            def __new__(cls, *args, **kwargs):
25                raise TypeError('requires _testcapi.without_gc')
26        return C
27
28from test import support
29
30
31class NonGCSimpleBase:
32    """
33    The base class for all the objects under test, equipped with various
34    testing features.
35    """
36
37    survivors = []
38    del_calls = []
39    tp_del_calls = []
40    errors = []
41
42    _cleaning = False
43
44    __slots__ = ()
45
46    @classmethod
47    def _cleanup(cls):
48        cls.survivors.clear()
49        cls.errors.clear()
50        gc.garbage.clear()
51        gc.collect()
52        cls.del_calls.clear()
53        cls.tp_del_calls.clear()
54
55    @classmethod
56    @contextlib.contextmanager
57    def test(cls):
58        """
59        A context manager to use around all finalization tests.
60        """
61        with support.disable_gc():
62            cls.del_calls.clear()
63            cls.tp_del_calls.clear()
64            NonGCSimpleBase._cleaning = False
65            try:
66                yield
67                if cls.errors:
68                    raise cls.errors[0]
69            finally:
70                NonGCSimpleBase._cleaning = True
71                cls._cleanup()
72
73    def check_sanity(self):
74        """
75        Check the object is sane (non-broken).
76        """
77
78    def __del__(self):
79        """
80        PEP 442 finalizer.  Record that this was called, check the
81        object is in a sane state, and invoke a side effect.
82        """
83        try:
84            if not self._cleaning:
85                self.del_calls.append(id(self))
86                self.check_sanity()
87                self.side_effect()
88        except Exception as e:
89            self.errors.append(e)
90
91    def side_effect(self):
92        """
93        A side effect called on destruction.
94        """
95
96
97class SimpleBase(NonGCSimpleBase):
98
99    def __init__(self):
100        self.id_ = id(self)
101
102    def check_sanity(self):
103        assert self.id_ == id(self)
104
105
106@without_gc
107class NonGC(NonGCSimpleBase):
108    __slots__ = ()
109
110@without_gc
111class NonGCResurrector(NonGCSimpleBase):
112    __slots__ = ()
113
114    def side_effect(self):
115        """
116        Resurrect self by storing self in a class-wide list.
117        """
118        self.survivors.append(self)
119
120class Simple(SimpleBase):
121    pass
122
123# Can't inherit from NonGCResurrector, in case importing without_gc fails.
124class SimpleResurrector(SimpleBase):
125
126    def side_effect(self):
127        """
128        Resurrect self by storing self in a class-wide list.
129        """
130        self.survivors.append(self)
131
132
133class TestBase:
134
135    def setUp(self):
136        self.old_garbage = gc.garbage[:]
137        gc.garbage[:] = []
138
139    def tearDown(self):
140        # None of the tests here should put anything in gc.garbage
141        try:
142            self.assertEqual(gc.garbage, [])
143        finally:
144            del self.old_garbage
145            gc.collect()
146
147    def assert_del_calls(self, ids):
148        self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
149
150    def assert_tp_del_calls(self, ids):
151        self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
152
153    def assert_survivors(self, ids):
154        self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
155
156    def assert_garbage(self, ids):
157        self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
158
159    def clear_survivors(self):
160        SimpleBase.survivors.clear()
161
162
163class SimpleFinalizationTest(TestBase, unittest.TestCase):
164    """
165    Test finalization without refcycles.
166    """
167
168    def test_simple(self):
169        with SimpleBase.test():
170            s = Simple()
171            ids = [id(s)]
172            wr = weakref.ref(s)
173            del s
174            gc.collect()
175            self.assert_del_calls(ids)
176            self.assert_survivors([])
177            self.assertIs(wr(), None)
178            gc.collect()
179            self.assert_del_calls(ids)
180            self.assert_survivors([])
181
182    def test_simple_resurrect(self):
183        with SimpleBase.test():
184            s = SimpleResurrector()
185            ids = [id(s)]
186            wr = weakref.ref(s)
187            del s
188            gc.collect()
189            self.assert_del_calls(ids)
190            self.assert_survivors(ids)
191            self.assertIsNot(wr(), None)
192            self.clear_survivors()
193            gc.collect()
194            self.assert_del_calls(ids)
195            self.assert_survivors([])
196        self.assertIs(wr(), None)
197
198    @support.cpython_only
199    def test_non_gc(self):
200        with SimpleBase.test():
201            s = NonGC()
202            self.assertFalse(gc.is_tracked(s))
203            ids = [id(s)]
204            del s
205            gc.collect()
206            self.assert_del_calls(ids)
207            self.assert_survivors([])
208            gc.collect()
209            self.assert_del_calls(ids)
210            self.assert_survivors([])
211
212    @support.cpython_only
213    def test_non_gc_resurrect(self):
214        with SimpleBase.test():
215            s = NonGCResurrector()
216            self.assertFalse(gc.is_tracked(s))
217            ids = [id(s)]
218            del s
219            gc.collect()
220            self.assert_del_calls(ids)
221            self.assert_survivors(ids)
222            self.clear_survivors()
223            gc.collect()
224            self.assert_del_calls(ids * 2)
225            self.assert_survivors(ids)
226
227
228class SelfCycleBase:
229
230    def __init__(self):
231        super().__init__()
232        self.ref = self
233
234    def check_sanity(self):
235        super().check_sanity()
236        assert self.ref is self
237
238class SimpleSelfCycle(SelfCycleBase, Simple):
239    pass
240
241class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
242    pass
243
244class SuicidalSelfCycle(SelfCycleBase, Simple):
245
246    def side_effect(self):
247        """
248        Explicitly break the reference cycle.
249        """
250        self.ref = None
251
252
253class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
254    """
255    Test finalization of an object having a single cyclic reference to
256    itself.
257    """
258
259    def test_simple(self):
260        with SimpleBase.test():
261            s = SimpleSelfCycle()
262            ids = [id(s)]
263            wr = weakref.ref(s)
264            del s
265            gc.collect()
266            self.assert_del_calls(ids)
267            self.assert_survivors([])
268            self.assertIs(wr(), None)
269            gc.collect()
270            self.assert_del_calls(ids)
271            self.assert_survivors([])
272
273    def test_simple_resurrect(self):
274        # Test that __del__ can resurrect the object being finalized.
275        with SimpleBase.test():
276            s = SelfCycleResurrector()
277            ids = [id(s)]
278            wr = weakref.ref(s)
279            del s
280            gc.collect()
281            self.assert_del_calls(ids)
282            self.assert_survivors(ids)
283            # XXX is this desirable?
284            self.assertIs(wr(), None)
285            # When trying to destroy the object a second time, __del__
286            # isn't called anymore (and the object isn't resurrected).
287            self.clear_survivors()
288            gc.collect()
289            self.assert_del_calls(ids)
290            self.assert_survivors([])
291            self.assertIs(wr(), None)
292
293    def test_simple_suicide(self):
294        # Test the GC is able to deal with an object that kills its last
295        # reference during __del__.
296        with SimpleBase.test():
297            s = SuicidalSelfCycle()
298            ids = [id(s)]
299            wr = weakref.ref(s)
300            del s
301            gc.collect()
302            self.assert_del_calls(ids)
303            self.assert_survivors([])
304            self.assertIs(wr(), None)
305            gc.collect()
306            self.assert_del_calls(ids)
307            self.assert_survivors([])
308            self.assertIs(wr(), None)
309
310
311class ChainedBase:
312
313    def chain(self, left):
314        self.suicided = False
315        self.left = left
316        left.right = self
317
318    def check_sanity(self):
319        super().check_sanity()
320        if self.suicided:
321            assert self.left is None
322            assert self.right is None
323        else:
324            left = self.left
325            if left.suicided:
326                assert left.right is None
327            else:
328                assert left.right is self
329            right = self.right
330            if right.suicided:
331                assert right.left is None
332            else:
333                assert right.left is self
334
335class SimpleChained(ChainedBase, Simple):
336    pass
337
338class ChainedResurrector(ChainedBase, SimpleResurrector):
339    pass
340
341class SuicidalChained(ChainedBase, Simple):
342
343    def side_effect(self):
344        """
345        Explicitly break the reference cycle.
346        """
347        self.suicided = True
348        self.left = None
349        self.right = None
350
351
352class CycleChainFinalizationTest(TestBase, unittest.TestCase):
353    """
354    Test finalization of a cyclic chain.  These tests are similar in
355    spirit to the self-cycle tests above, but the collectable object
356    graph isn't trivial anymore.
357    """
358
359    def build_chain(self, classes):
360        nodes = [cls() for cls in classes]
361        for i in range(len(nodes)):
362            nodes[i].chain(nodes[i-1])
363        return nodes
364
365    def check_non_resurrecting_chain(self, classes):
366        N = len(classes)
367        with SimpleBase.test():
368            nodes = self.build_chain(classes)
369            ids = [id(s) for s in nodes]
370            wrs = [weakref.ref(s) for s in nodes]
371            del nodes
372            gc.collect()
373            self.assert_del_calls(ids)
374            self.assert_survivors([])
375            self.assertEqual([wr() for wr in wrs], [None] * N)
376            gc.collect()
377            self.assert_del_calls(ids)
378
379    def check_resurrecting_chain(self, classes):
380        N = len(classes)
381        with SimpleBase.test():
382            nodes = self.build_chain(classes)
383            N = len(nodes)
384            ids = [id(s) for s in nodes]
385            survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
386            wrs = [weakref.ref(s) for s in nodes]
387            del nodes
388            gc.collect()
389            self.assert_del_calls(ids)
390            self.assert_survivors(survivor_ids)
391            # XXX desirable?
392            self.assertEqual([wr() for wr in wrs], [None] * N)
393            self.clear_survivors()
394            gc.collect()
395            self.assert_del_calls(ids)
396            self.assert_survivors([])
397
398    def test_homogenous(self):
399        self.check_non_resurrecting_chain([SimpleChained] * 3)
400
401    def test_homogenous_resurrect(self):
402        self.check_resurrecting_chain([ChainedResurrector] * 3)
403
404    def test_homogenous_suicidal(self):
405        self.check_non_resurrecting_chain([SuicidalChained] * 3)
406
407    def test_heterogenous_suicidal_one(self):
408        self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
409
410    def test_heterogenous_suicidal_two(self):
411        self.check_non_resurrecting_chain(
412            [SuicidalChained] * 2 + [SimpleChained] * 2)
413
414    def test_heterogenous_resurrect_one(self):
415        self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
416
417    def test_heterogenous_resurrect_two(self):
418        self.check_resurrecting_chain(
419            [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
420
421    def test_heterogenous_resurrect_three(self):
422        self.check_resurrecting_chain(
423            [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
424
425
426# NOTE: the tp_del slot isn't automatically inherited, so we have to call
427# with_tp_del() for each instantiated class.
428
429class LegacyBase(SimpleBase):
430
431    def __del__(self):
432        try:
433            # Do not invoke side_effect here, since we are now exercising
434            # the tp_del slot.
435            if not self._cleaning:
436                self.del_calls.append(id(self))
437                self.check_sanity()
438        except Exception as e:
439            self.errors.append(e)
440
441    def __tp_del__(self):
442        """
443        Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
444        """
445        try:
446            if not self._cleaning:
447                self.tp_del_calls.append(id(self))
448                self.check_sanity()
449                self.side_effect()
450        except Exception as e:
451            self.errors.append(e)
452
453@with_tp_del
454class Legacy(LegacyBase):
455    pass
456
457@with_tp_del
458class LegacyResurrector(LegacyBase):
459
460    def side_effect(self):
461        """
462        Resurrect self by storing self in a class-wide list.
463        """
464        self.survivors.append(self)
465
466@with_tp_del
467class LegacySelfCycle(SelfCycleBase, LegacyBase):
468    pass
469
470
471@support.cpython_only
472class LegacyFinalizationTest(TestBase, unittest.TestCase):
473    """
474    Test finalization of objects with a tp_del.
475    """
476
477    def tearDown(self):
478        # These tests need to clean up a bit more, since they create
479        # uncollectable objects.
480        gc.garbage.clear()
481        gc.collect()
482        super().tearDown()
483
484    def test_legacy(self):
485        with SimpleBase.test():
486            s = Legacy()
487            ids = [id(s)]
488            wr = weakref.ref(s)
489            del s
490            gc.collect()
491            self.assert_del_calls(ids)
492            self.assert_tp_del_calls(ids)
493            self.assert_survivors([])
494            self.assertIs(wr(), None)
495            gc.collect()
496            self.assert_del_calls(ids)
497            self.assert_tp_del_calls(ids)
498
499    def test_legacy_resurrect(self):
500        with SimpleBase.test():
501            s = LegacyResurrector()
502            ids = [id(s)]
503            wr = weakref.ref(s)
504            del s
505            gc.collect()
506            self.assert_del_calls(ids)
507            self.assert_tp_del_calls(ids)
508            self.assert_survivors(ids)
509            # weakrefs are cleared before tp_del is called.
510            self.assertIs(wr(), None)
511            self.clear_survivors()
512            gc.collect()
513            self.assert_del_calls(ids)
514            self.assert_tp_del_calls(ids * 2)
515            self.assert_survivors(ids)
516        self.assertIs(wr(), None)
517
518    def test_legacy_self_cycle(self):
519        # Self-cycles with legacy finalizers end up in gc.garbage.
520        with SimpleBase.test():
521            s = LegacySelfCycle()
522            ids = [id(s)]
523            wr = weakref.ref(s)
524            del s
525            gc.collect()
526            self.assert_del_calls([])
527            self.assert_tp_del_calls([])
528            self.assert_survivors([])
529            self.assert_garbage(ids)
530            self.assertIsNot(wr(), None)
531            # Break the cycle to allow collection
532            gc.garbage[0].ref = None
533        self.assert_garbage([])
534        self.assertIs(wr(), None)
535
536
537if __name__ == "__main__":
538    unittest.main()
539