1import gc
2import sys
3import doctest
4import unittest
5import collections
6import weakref
7import operator
8import contextlib
9import copy
10import threading
11import time
12import random
13
14from test import support
15from test.support import script_helper, ALWAYS_EQ
16from test.support import gc_collect
17
18# Used in ReferencesTestCase.test_ref_created_during_del() .
19ref_from_del = None
20
21# Used by FinalizeTestCase as a global that may be replaced by None
22# when the interpreter shuts down.
23_global_var = 'foobar'
24
25class C:
26    def method(self):
27        pass
28
29
30class Callable:
31    bar = None
32
33    def __call__(self, x):
34        self.bar = x
35
36
37def create_function():
38    def f(): pass
39    return f
40
41def create_bound_method():
42    return C().method
43
44
45class Object:
46    def __init__(self, arg):
47        self.arg = arg
48    def __repr__(self):
49        return "<Object %r>" % self.arg
50    def __eq__(self, other):
51        if isinstance(other, Object):
52            return self.arg == other.arg
53        return NotImplemented
54    def __lt__(self, other):
55        if isinstance(other, Object):
56            return self.arg < other.arg
57        return NotImplemented
58    def __hash__(self):
59        return hash(self.arg)
60    def some_method(self):
61        return 4
62    def other_method(self):
63        return 5
64
65
66class RefCycle:
67    def __init__(self):
68        self.cycle = self
69
70
71class TestBase(unittest.TestCase):
72
73    def setUp(self):
74        self.cbcalled = 0
75
76    def callback(self, ref):
77        self.cbcalled += 1
78
79
80@contextlib.contextmanager
81def collect_in_thread(period=0.0001):
82    """
83    Ensure GC collections happen in a different thread, at a high frequency.
84    """
85    please_stop = False
86
87    def collect():
88        while not please_stop:
89            time.sleep(period)
90            gc.collect()
91
92    with support.disable_gc():
93        t = threading.Thread(target=collect)
94        t.start()
95        try:
96            yield
97        finally:
98            please_stop = True
99            t.join()
100
101
102class ReferencesTestCase(TestBase):
103
104    def test_basic_ref(self):
105        self.check_basic_ref(C)
106        self.check_basic_ref(create_function)
107        self.check_basic_ref(create_bound_method)
108
109        # Just make sure the tp_repr handler doesn't raise an exception.
110        # Live reference:
111        o = C()
112        wr = weakref.ref(o)
113        repr(wr)
114        # Dead reference:
115        del o
116        repr(wr)
117
118    def test_basic_callback(self):
119        self.check_basic_callback(C)
120        self.check_basic_callback(create_function)
121        self.check_basic_callback(create_bound_method)
122
123    @support.cpython_only
124    def test_cfunction(self):
125        import _testcapi
126        create_cfunction = _testcapi.create_cfunction
127        f = create_cfunction()
128        wr = weakref.ref(f)
129        self.assertIs(wr(), f)
130        del f
131        self.assertIsNone(wr())
132        self.check_basic_ref(create_cfunction)
133        self.check_basic_callback(create_cfunction)
134
135    def test_multiple_callbacks(self):
136        o = C()
137        ref1 = weakref.ref(o, self.callback)
138        ref2 = weakref.ref(o, self.callback)
139        del o
140        gc_collect()  # For PyPy or other GCs.
141        self.assertIsNone(ref1(), "expected reference to be invalidated")
142        self.assertIsNone(ref2(), "expected reference to be invalidated")
143        self.assertEqual(self.cbcalled, 2,
144                     "callback not called the right number of times")
145
146    def test_multiple_selfref_callbacks(self):
147        # Make sure all references are invalidated before callbacks are called
148        #
149        # What's important here is that we're using the first
150        # reference in the callback invoked on the second reference
151        # (the most recently created ref is cleaned up first).  This
152        # tests that all references to the object are invalidated
153        # before any of the callbacks are invoked, so that we only
154        # have one invocation of _weakref.c:cleanup_helper() active
155        # for a particular object at a time.
156        #
157        def callback(object, self=self):
158            self.ref()
159        c = C()
160        self.ref = weakref.ref(c, callback)
161        ref1 = weakref.ref(c, callback)
162        del c
163
164    def test_constructor_kwargs(self):
165        c = C()
166        self.assertRaises(TypeError, weakref.ref, c, callback=None)
167
168    def test_proxy_ref(self):
169        o = C()
170        o.bar = 1
171        ref1 = weakref.proxy(o, self.callback)
172        ref2 = weakref.proxy(o, self.callback)
173        del o
174        gc_collect()  # For PyPy or other GCs.
175
176        def check(proxy):
177            proxy.bar
178
179        self.assertRaises(ReferenceError, check, ref1)
180        self.assertRaises(ReferenceError, check, ref2)
181        ref3 = weakref.proxy(C())
182        gc_collect()  # For PyPy or other GCs.
183        self.assertRaises(ReferenceError, bool, ref3)
184        self.assertEqual(self.cbcalled, 2)
185
186    def check_basic_ref(self, factory):
187        o = factory()
188        ref = weakref.ref(o)
189        self.assertIsNotNone(ref(),
190                     "weak reference to live object should be live")
191        o2 = ref()
192        self.assertIs(o, o2,
193                     "<ref>() should return original object if live")
194
195    def check_basic_callback(self, factory):
196        self.cbcalled = 0
197        o = factory()
198        ref = weakref.ref(o, self.callback)
199        del o
200        gc_collect()  # For PyPy or other GCs.
201        self.assertEqual(self.cbcalled, 1,
202                     "callback did not properly set 'cbcalled'")
203        self.assertIsNone(ref(),
204                     "ref2 should be dead after deleting object reference")
205
206    def test_ref_reuse(self):
207        o = C()
208        ref1 = weakref.ref(o)
209        # create a proxy to make sure that there's an intervening creation
210        # between these two; it should make no difference
211        proxy = weakref.proxy(o)
212        ref2 = weakref.ref(o)
213        self.assertIs(ref1, ref2,
214                     "reference object w/out callback should be re-used")
215
216        o = C()
217        proxy = weakref.proxy(o)
218        ref1 = weakref.ref(o)
219        ref2 = weakref.ref(o)
220        self.assertIs(ref1, ref2,
221                     "reference object w/out callback should be re-used")
222        self.assertEqual(weakref.getweakrefcount(o), 2,
223                     "wrong weak ref count for object")
224        del proxy
225        gc_collect()  # For PyPy or other GCs.
226        self.assertEqual(weakref.getweakrefcount(o), 1,
227                     "wrong weak ref count for object after deleting proxy")
228
229    def test_proxy_reuse(self):
230        o = C()
231        proxy1 = weakref.proxy(o)
232        ref = weakref.ref(o)
233        proxy2 = weakref.proxy(o)
234        self.assertIs(proxy1, proxy2,
235                     "proxy object w/out callback should have been re-used")
236
237    def test_basic_proxy(self):
238        o = C()
239        self.check_proxy(o, weakref.proxy(o))
240
241        L = collections.UserList()
242        p = weakref.proxy(L)
243        self.assertFalse(p, "proxy for empty UserList should be false")
244        p.append(12)
245        self.assertEqual(len(L), 1)
246        self.assertTrue(p, "proxy for non-empty UserList should be true")
247        p[:] = [2, 3]
248        self.assertEqual(len(L), 2)
249        self.assertEqual(len(p), 2)
250        self.assertIn(3, p, "proxy didn't support __contains__() properly")
251        p[1] = 5
252        self.assertEqual(L[1], 5)
253        self.assertEqual(p[1], 5)
254        L2 = collections.UserList(L)
255        p2 = weakref.proxy(L2)
256        self.assertEqual(p, p2)
257        ## self.assertEqual(repr(L2), repr(p2))
258        L3 = collections.UserList(range(10))
259        p3 = weakref.proxy(L3)
260        self.assertEqual(L3[:], p3[:])
261        self.assertEqual(L3[5:], p3[5:])
262        self.assertEqual(L3[:5], p3[:5])
263        self.assertEqual(L3[2:5], p3[2:5])
264
265    def test_proxy_unicode(self):
266        # See bug 5037
267        class C(object):
268            def __str__(self):
269                return "string"
270            def __bytes__(self):
271                return b"bytes"
272        instance = C()
273        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
274        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
275
276    def test_proxy_index(self):
277        class C:
278            def __index__(self):
279                return 10
280        o = C()
281        p = weakref.proxy(o)
282        self.assertEqual(operator.index(p), 10)
283
284    def test_proxy_div(self):
285        class C:
286            def __floordiv__(self, other):
287                return 42
288            def __ifloordiv__(self, other):
289                return 21
290        o = C()
291        p = weakref.proxy(o)
292        self.assertEqual(p // 5, 42)
293        p //= 5
294        self.assertEqual(p, 21)
295
296    def test_proxy_matmul(self):
297        class C:
298            def __matmul__(self, other):
299                return 1729
300            def __rmatmul__(self, other):
301                return -163
302            def __imatmul__(self, other):
303                return 561
304        o = C()
305        p = weakref.proxy(o)
306        self.assertEqual(p @ 5, 1729)
307        self.assertEqual(5 @ p, -163)
308        p @= 5
309        self.assertEqual(p, 561)
310
311    # The PyWeakref_* C API is documented as allowing either NULL or
312    # None as the value for the callback, where either means "no
313    # callback".  The "no callback" ref and proxy objects are supposed
314    # to be shared so long as they exist by all callers so long as
315    # they are active.  In Python 2.3.3 and earlier, this guarantee
316    # was not honored, and was broken in different ways for
317    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
318
319    def test_shared_ref_without_callback(self):
320        self.check_shared_without_callback(weakref.ref)
321
322    def test_shared_proxy_without_callback(self):
323        self.check_shared_without_callback(weakref.proxy)
324
325    def check_shared_without_callback(self, makeref):
326        o = Object(1)
327        p1 = makeref(o, None)
328        p2 = makeref(o, None)
329        self.assertIs(p1, p2, "both callbacks were None in the C API")
330        del p1, p2
331        p1 = makeref(o)
332        p2 = makeref(o, None)
333        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
334        del p1, p2
335        p1 = makeref(o)
336        p2 = makeref(o)
337        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
338        del p1, p2
339        p1 = makeref(o, None)
340        p2 = makeref(o)
341        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
342
343    def test_callable_proxy(self):
344        o = Callable()
345        ref1 = weakref.proxy(o)
346
347        self.check_proxy(o, ref1)
348
349        self.assertIs(type(ref1), weakref.CallableProxyType,
350                     "proxy is not of callable type")
351        ref1('twinkies!')
352        self.assertEqual(o.bar, 'twinkies!',
353                     "call through proxy not passed through to original")
354        ref1(x='Splat.')
355        self.assertEqual(o.bar, 'Splat.',
356                     "call through proxy not passed through to original")
357
358        # expect due to too few args
359        self.assertRaises(TypeError, ref1)
360
361        # expect due to too many args
362        self.assertRaises(TypeError, ref1, 1, 2, 3)
363
364    def check_proxy(self, o, proxy):
365        o.foo = 1
366        self.assertEqual(proxy.foo, 1,
367                     "proxy does not reflect attribute addition")
368        o.foo = 2
369        self.assertEqual(proxy.foo, 2,
370                     "proxy does not reflect attribute modification")
371        del o.foo
372        self.assertFalse(hasattr(proxy, 'foo'),
373                     "proxy does not reflect attribute removal")
374
375        proxy.foo = 1
376        self.assertEqual(o.foo, 1,
377                     "object does not reflect attribute addition via proxy")
378        proxy.foo = 2
379        self.assertEqual(o.foo, 2,
380            "object does not reflect attribute modification via proxy")
381        del proxy.foo
382        self.assertFalse(hasattr(o, 'foo'),
383                     "object does not reflect attribute removal via proxy")
384
385    def test_proxy_deletion(self):
386        # Test clearing of SF bug #762891
387        class Foo:
388            result = None
389            def __delitem__(self, accessor):
390                self.result = accessor
391        g = Foo()
392        f = weakref.proxy(g)
393        del f[0]
394        self.assertEqual(f.result, 0)
395
396    def test_proxy_bool(self):
397        # Test clearing of SF bug #1170766
398        class List(list): pass
399        lyst = List()
400        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
401
402    def test_proxy_iter(self):
403        # Test fails with a debug build of the interpreter
404        # (see bpo-38395).
405
406        obj = None
407
408        class MyObj:
409            def __iter__(self):
410                nonlocal obj
411                del obj
412                return NotImplemented
413
414        obj = MyObj()
415        p = weakref.proxy(obj)
416        with self.assertRaises(TypeError):
417            # "blech" in p calls MyObj.__iter__ through the proxy,
418            # without keeping a reference to the real object, so it
419            # can be killed in the middle of the call
420            "blech" in p
421
422    def test_proxy_next(self):
423        arr = [4, 5, 6]
424        def iterator_func():
425            yield from arr
426        it = iterator_func()
427
428        class IteratesWeakly:
429            def __iter__(self):
430                return weakref.proxy(it)
431
432        weak_it = IteratesWeakly()
433
434        # Calls proxy.__next__
435        self.assertEqual(list(weak_it), [4, 5, 6])
436
437    def test_proxy_bad_next(self):
438        # bpo-44720: PyIter_Next() shouldn't be called if the reference
439        # isn't an iterator.
440
441        not_an_iterator = lambda: 0
442
443        class A:
444            def __iter__(self):
445                return weakref.proxy(not_an_iterator)
446        a = A()
447
448        msg = "Weakref proxy referenced a non-iterator"
449        with self.assertRaisesRegex(TypeError, msg):
450            list(a)
451
452    def test_proxy_reversed(self):
453        class MyObj:
454            def __len__(self):
455                return 3
456            def __reversed__(self):
457                return iter('cba')
458
459        obj = MyObj()
460        self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
461
462    def test_proxy_hash(self):
463        class MyObj:
464            def __hash__(self):
465                return 42
466
467        obj = MyObj()
468        with self.assertRaises(TypeError):
469            hash(weakref.proxy(obj))
470
471        class MyObj:
472            __hash__ = None
473
474        obj = MyObj()
475        with self.assertRaises(TypeError):
476            hash(weakref.proxy(obj))
477
478    def test_getweakrefcount(self):
479        o = C()
480        ref1 = weakref.ref(o)
481        ref2 = weakref.ref(o, self.callback)
482        self.assertEqual(weakref.getweakrefcount(o), 2,
483                     "got wrong number of weak reference objects")
484
485        proxy1 = weakref.proxy(o)
486        proxy2 = weakref.proxy(o, self.callback)
487        self.assertEqual(weakref.getweakrefcount(o), 4,
488                     "got wrong number of weak reference objects")
489
490        del ref1, ref2, proxy1, proxy2
491        gc_collect()  # For PyPy or other GCs.
492        self.assertEqual(weakref.getweakrefcount(o), 0,
493                     "weak reference objects not unlinked from"
494                     " referent when discarded.")
495
496        # assumes ints do not support weakrefs
497        self.assertEqual(weakref.getweakrefcount(1), 0,
498                     "got wrong number of weak reference objects for int")
499
500    def test_getweakrefs(self):
501        o = C()
502        ref1 = weakref.ref(o, self.callback)
503        ref2 = weakref.ref(o, self.callback)
504        del ref1
505        gc_collect()  # For PyPy or other GCs.
506        self.assertEqual(weakref.getweakrefs(o), [ref2],
507                     "list of refs does not match")
508
509        o = C()
510        ref1 = weakref.ref(o, self.callback)
511        ref2 = weakref.ref(o, self.callback)
512        del ref2
513        gc_collect()  # For PyPy or other GCs.
514        self.assertEqual(weakref.getweakrefs(o), [ref1],
515                     "list of refs does not match")
516
517        del ref1
518        gc_collect()  # For PyPy or other GCs.
519        self.assertEqual(weakref.getweakrefs(o), [],
520                     "list of refs not cleared")
521
522        # assumes ints do not support weakrefs
523        self.assertEqual(weakref.getweakrefs(1), [],
524                     "list of refs does not match for int")
525
526    def test_newstyle_number_ops(self):
527        class F(float):
528            pass
529        f = F(2.0)
530        p = weakref.proxy(f)
531        self.assertEqual(p + 1.0, 3.0)
532        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
533
534    def test_callbacks_protected(self):
535        # Callbacks protected from already-set exceptions?
536        # Regression test for SF bug #478534.
537        class BogusError(Exception):
538            pass
539        data = {}
540        def remove(k):
541            del data[k]
542        def encapsulate():
543            f = lambda : ()
544            data[weakref.ref(f, remove)] = None
545            raise BogusError
546        try:
547            encapsulate()
548        except BogusError:
549            pass
550        else:
551            self.fail("exception not properly restored")
552        try:
553            encapsulate()
554        except BogusError:
555            pass
556        else:
557            self.fail("exception not properly restored")
558
559    def test_sf_bug_840829(self):
560        # "weakref callbacks and gc corrupt memory"
561        # subtype_dealloc erroneously exposed a new-style instance
562        # already in the process of getting deallocated to gc,
563        # causing double-deallocation if the instance had a weakref
564        # callback that triggered gc.
565        # If the bug exists, there probably won't be an obvious symptom
566        # in a release build.  In a debug build, a segfault will occur
567        # when the second attempt to remove the instance from the "list
568        # of all objects" occurs.
569
570        import gc
571
572        class C(object):
573            pass
574
575        c = C()
576        wr = weakref.ref(c, lambda ignore: gc.collect())
577        del c
578
579        # There endeth the first part.  It gets worse.
580        del wr
581
582        c1 = C()
583        c1.i = C()
584        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
585
586        c2 = C()
587        c2.c1 = c1
588        del c1  # still alive because c2 points to it
589
590        # Now when subtype_dealloc gets called on c2, it's not enough just
591        # that c2 is immune from gc while the weakref callbacks associated
592        # with c2 execute (there are none in this 2nd half of the test, btw).
593        # subtype_dealloc goes on to call the base classes' deallocs too,
594        # so any gc triggered by weakref callbacks associated with anything
595        # torn down by a base class dealloc can also trigger double
596        # deallocation of c2.
597        del c2
598
599    def test_callback_in_cycle_1(self):
600        import gc
601
602        class J(object):
603            pass
604
605        class II(object):
606            def acallback(self, ignore):
607                self.J
608
609        I = II()
610        I.J = J
611        I.wr = weakref.ref(J, I.acallback)
612
613        # Now J and II are each in a self-cycle (as all new-style class
614        # objects are, since their __mro__ points back to them).  I holds
615        # both a weak reference (I.wr) and a strong reference (I.J) to class
616        # J.  I is also in a cycle (I.wr points to a weakref that references
617        # I.acallback).  When we del these three, they all become trash, but
618        # the cycles prevent any of them from getting cleaned up immediately.
619        # Instead they have to wait for cyclic gc to deduce that they're
620        # trash.
621        #
622        # gc used to call tp_clear on all of them, and the order in which
623        # it does that is pretty accidental.  The exact order in which we
624        # built up these things manages to provoke gc into running tp_clear
625        # in just the right order (I last).  Calling tp_clear on II leaves
626        # behind an insane class object (its __mro__ becomes NULL).  Calling
627        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
628        # just then because of the strong reference from I.J.  Calling
629        # tp_clear on I starts to clear I's __dict__, and just happens to
630        # clear I.J first -- I.wr is still intact.  That removes the last
631        # reference to J, which triggers the weakref callback.  The callback
632        # tries to do "self.J", and instances of new-style classes look up
633        # attributes ("J") in the class dict first.  The class (II) wants to
634        # search II.__mro__, but that's NULL.   The result was a segfault in
635        # a release build, and an assert failure in a debug build.
636        del I, J, II
637        gc.collect()
638
639    def test_callback_in_cycle_2(self):
640        import gc
641
642        # This is just like test_callback_in_cycle_1, except that II is an
643        # old-style class.  The symptom is different then:  an instance of an
644        # old-style class looks in its own __dict__ first.  'J' happens to
645        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
646        # __dict__, so the attribute isn't found.  The difference is that
647        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
648        # __mro__), so no segfault occurs.  Instead it got:
649        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
650        #    Exception exceptions.AttributeError:
651        #   "II instance has no attribute 'J'" in <bound method II.acallback
652        #       of <?.II instance at 0x00B9B4B8>> ignored
653
654        class J(object):
655            pass
656
657        class II:
658            def acallback(self, ignore):
659                self.J
660
661        I = II()
662        I.J = J
663        I.wr = weakref.ref(J, I.acallback)
664
665        del I, J, II
666        gc.collect()
667
668    def test_callback_in_cycle_3(self):
669        import gc
670
671        # This one broke the first patch that fixed the last two.  In this
672        # case, the objects reachable from the callback aren't also reachable
673        # from the object (c1) *triggering* the callback:  you can get to
674        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
675        # got tp_clear'ed by the time the c2.cb callback got invoked.
676
677        class C:
678            def cb(self, ignore):
679                self.me
680                self.c1
681                self.wr
682
683        c1, c2 = C(), C()
684
685        c2.me = c2
686        c2.c1 = c1
687        c2.wr = weakref.ref(c1, c2.cb)
688
689        del c1, c2
690        gc.collect()
691
692    def test_callback_in_cycle_4(self):
693        import gc
694
695        # Like test_callback_in_cycle_3, except c2 and c1 have different
696        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
697        # objects reachable from the dying object (c1) isn't enough to stop
698        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
699        # The result was a segfault (C.__mro__ was NULL when the callback
700        # tried to look up self.me).
701
702        class C(object):
703            def cb(self, ignore):
704                self.me
705                self.c1
706                self.wr
707
708        class D:
709            pass
710
711        c1, c2 = D(), C()
712
713        c2.me = c2
714        c2.c1 = c1
715        c2.wr = weakref.ref(c1, c2.cb)
716
717        del c1, c2, C, D
718        gc.collect()
719
720    def test_callback_in_cycle_resurrection(self):
721        import gc
722
723        # Do something nasty in a weakref callback:  resurrect objects
724        # from dead cycles.  For this to be attempted, the weakref and
725        # its callback must also be part of the cyclic trash (else the
726        # objects reachable via the callback couldn't be in cyclic trash
727        # to begin with -- the callback would act like an external root).
728        # But gc clears trash weakrefs with callbacks early now, which
729        # disables the callbacks, so the callbacks shouldn't get called
730        # at all (and so nothing actually gets resurrected).
731
732        alist = []
733        class C(object):
734            def __init__(self, value):
735                self.attribute = value
736
737            def acallback(self, ignore):
738                alist.append(self.c)
739
740        c1, c2 = C(1), C(2)
741        c1.c = c2
742        c2.c = c1
743        c1.wr = weakref.ref(c2, c1.acallback)
744        c2.wr = weakref.ref(c1, c2.acallback)
745
746        def C_went_away(ignore):
747            alist.append("C went away")
748        wr = weakref.ref(C, C_went_away)
749
750        del c1, c2, C   # make them all trash
751        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
752
753        gc.collect()
754        # c1.wr and c2.wr were part of the cyclic trash, so should have
755        # been cleared without their callbacks executing.  OTOH, the weakref
756        # to C is bound to a function local (wr), and wasn't trash, so that
757        # callback should have been invoked when C went away.
758        self.assertEqual(alist, ["C went away"])
759        # The remaining weakref should be dead now (its callback ran).
760        self.assertEqual(wr(), None)
761
762        del alist[:]
763        gc.collect()
764        self.assertEqual(alist, [])
765
766    def test_callbacks_on_callback(self):
767        import gc
768
769        # Set up weakref callbacks *on* weakref callbacks.
770        alist = []
771        def safe_callback(ignore):
772            alist.append("safe_callback called")
773
774        class C(object):
775            def cb(self, ignore):
776                alist.append("cb called")
777
778        c, d = C(), C()
779        c.other = d
780        d.other = c
781        callback = c.cb
782        c.wr = weakref.ref(d, callback)     # this won't trigger
783        d.wr = weakref.ref(callback, d.cb)  # ditto
784        external_wr = weakref.ref(callback, safe_callback)  # but this will
785        self.assertIs(external_wr(), callback)
786
787        # The weakrefs attached to c and d should get cleared, so that
788        # C.cb is never called.  But external_wr isn't part of the cyclic
789        # trash, and no cyclic trash is reachable from it, so safe_callback
790        # should get invoked when the bound method object callback (c.cb)
791        # -- which is itself a callback, and also part of the cyclic trash --
792        # gets reclaimed at the end of gc.
793
794        del callback, c, d, C
795        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
796        gc.collect()
797        self.assertEqual(alist, ["safe_callback called"])
798        self.assertEqual(external_wr(), None)
799
800        del alist[:]
801        gc.collect()
802        self.assertEqual(alist, [])
803
804    def test_gc_during_ref_creation(self):
805        self.check_gc_during_creation(weakref.ref)
806
807    def test_gc_during_proxy_creation(self):
808        self.check_gc_during_creation(weakref.proxy)
809
810    def check_gc_during_creation(self, makeref):
811        thresholds = gc.get_threshold()
812        gc.set_threshold(1, 1, 1)
813        gc.collect()
814        class A:
815            pass
816
817        def callback(*args):
818            pass
819
820        referenced = A()
821
822        a = A()
823        a.a = a
824        a.wr = makeref(referenced)
825
826        try:
827            # now make sure the object and the ref get labeled as
828            # cyclic trash:
829            a = A()
830            weakref.ref(referenced, callback)
831
832        finally:
833            gc.set_threshold(*thresholds)
834
835    def test_ref_created_during_del(self):
836        # Bug #1377858
837        # A weakref created in an object's __del__() would crash the
838        # interpreter when the weakref was cleaned up since it would refer to
839        # non-existent memory.  This test should not segfault the interpreter.
840        class Target(object):
841            def __del__(self):
842                global ref_from_del
843                ref_from_del = weakref.ref(self)
844
845        w = Target()
846
847    def test_init(self):
848        # Issue 3634
849        # <weakref to class>.__init__() doesn't check errors correctly
850        r = weakref.ref(Exception)
851        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
852        # No exception should be raised here
853        gc.collect()
854
855    def test_classes(self):
856        # Check that classes are weakrefable.
857        class A(object):
858            pass
859        l = []
860        weakref.ref(int)
861        a = weakref.ref(A, l.append)
862        A = None
863        gc.collect()
864        self.assertEqual(a(), None)
865        self.assertEqual(l, [a])
866
867    def test_equality(self):
868        # Alive weakrefs defer equality testing to their underlying object.
869        x = Object(1)
870        y = Object(1)
871        z = Object(2)
872        a = weakref.ref(x)
873        b = weakref.ref(y)
874        c = weakref.ref(z)
875        d = weakref.ref(x)
876        # Note how we directly test the operators here, to stress both
877        # __eq__ and __ne__.
878        self.assertTrue(a == b)
879        self.assertFalse(a != b)
880        self.assertFalse(a == c)
881        self.assertTrue(a != c)
882        self.assertTrue(a == d)
883        self.assertFalse(a != d)
884        self.assertFalse(a == x)
885        self.assertTrue(a != x)
886        self.assertTrue(a == ALWAYS_EQ)
887        self.assertFalse(a != ALWAYS_EQ)
888        del x, y, z
889        gc.collect()
890        for r in a, b, c:
891            # Sanity check
892            self.assertIs(r(), None)
893        # Dead weakrefs compare by identity: whether `a` and `d` are the
894        # same weakref object is an implementation detail, since they pointed
895        # to the same original object and didn't have a callback.
896        # (see issue #16453).
897        self.assertFalse(a == b)
898        self.assertTrue(a != b)
899        self.assertFalse(a == c)
900        self.assertTrue(a != c)
901        self.assertEqual(a == d, a is d)
902        self.assertEqual(a != d, a is not d)
903
904    def test_ordering(self):
905        # weakrefs cannot be ordered, even if the underlying objects can.
906        ops = [operator.lt, operator.gt, operator.le, operator.ge]
907        x = Object(1)
908        y = Object(1)
909        a = weakref.ref(x)
910        b = weakref.ref(y)
911        for op in ops:
912            self.assertRaises(TypeError, op, a, b)
913        # Same when dead.
914        del x, y
915        gc.collect()
916        for op in ops:
917            self.assertRaises(TypeError, op, a, b)
918
919    def test_hashing(self):
920        # Alive weakrefs hash the same as the underlying object
921        x = Object(42)
922        y = Object(42)
923        a = weakref.ref(x)
924        b = weakref.ref(y)
925        self.assertEqual(hash(a), hash(42))
926        del x, y
927        gc.collect()
928        # Dead weakrefs:
929        # - retain their hash is they were hashed when alive;
930        # - otherwise, cannot be hashed.
931        self.assertEqual(hash(a), hash(42))
932        self.assertRaises(TypeError, hash, b)
933
934    def test_trashcan_16602(self):
935        # Issue #16602: when a weakref's target was part of a long
936        # deallocation chain, the trashcan mechanism could delay clearing
937        # of the weakref and make the target object visible from outside
938        # code even though its refcount had dropped to 0.  A crash ensued.
939        class C:
940            def __init__(self, parent):
941                if not parent:
942                    return
943                wself = weakref.ref(self)
944                def cb(wparent):
945                    o = wself()
946                self.wparent = weakref.ref(parent, cb)
947
948        d = weakref.WeakKeyDictionary()
949        root = c = C(None)
950        for n in range(100):
951            d[c] = c = C(c)
952        del root
953        gc.collect()
954
955    def test_callback_attribute(self):
956        x = Object(1)
957        callback = lambda ref: None
958        ref1 = weakref.ref(x, callback)
959        self.assertIs(ref1.__callback__, callback)
960
961        ref2 = weakref.ref(x)
962        self.assertIsNone(ref2.__callback__)
963
964    def test_callback_attribute_after_deletion(self):
965        x = Object(1)
966        ref = weakref.ref(x, self.callback)
967        self.assertIsNotNone(ref.__callback__)
968        del x
969        support.gc_collect()
970        self.assertIsNone(ref.__callback__)
971
972    def test_set_callback_attribute(self):
973        x = Object(1)
974        callback = lambda ref: None
975        ref1 = weakref.ref(x, callback)
976        with self.assertRaises(AttributeError):
977            ref1.__callback__ = lambda ref: None
978
979    def test_callback_gcs(self):
980        class ObjectWithDel(Object):
981            def __del__(self): pass
982        x = ObjectWithDel(1)
983        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
984        del x
985        support.gc_collect()
986
987
988class SubclassableWeakrefTestCase(TestBase):
989
990    def test_subclass_refs(self):
991        class MyRef(weakref.ref):
992            def __init__(self, ob, callback=None, value=42):
993                self.value = value
994                super().__init__(ob, callback)
995            def __call__(self):
996                self.called = True
997                return super().__call__()
998        o = Object("foo")
999        mr = MyRef(o, value=24)
1000        self.assertIs(mr(), o)
1001        self.assertTrue(mr.called)
1002        self.assertEqual(mr.value, 24)
1003        del o
1004        gc_collect()  # For PyPy or other GCs.
1005        self.assertIsNone(mr())
1006        self.assertTrue(mr.called)
1007
1008    def test_subclass_refs_dont_replace_standard_refs(self):
1009        class MyRef(weakref.ref):
1010            pass
1011        o = Object(42)
1012        r1 = MyRef(o)
1013        r2 = weakref.ref(o)
1014        self.assertIsNot(r1, r2)
1015        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
1016        self.assertEqual(weakref.getweakrefcount(o), 2)
1017        r3 = MyRef(o)
1018        self.assertEqual(weakref.getweakrefcount(o), 3)
1019        refs = weakref.getweakrefs(o)
1020        self.assertEqual(len(refs), 3)
1021        self.assertIs(r2, refs[0])
1022        self.assertIn(r1, refs[1:])
1023        self.assertIn(r3, refs[1:])
1024
1025    def test_subclass_refs_dont_conflate_callbacks(self):
1026        class MyRef(weakref.ref):
1027            pass
1028        o = Object(42)
1029        r1 = MyRef(o, id)
1030        r2 = MyRef(o, str)
1031        self.assertIsNot(r1, r2)
1032        refs = weakref.getweakrefs(o)
1033        self.assertIn(r1, refs)
1034        self.assertIn(r2, refs)
1035
1036    def test_subclass_refs_with_slots(self):
1037        class MyRef(weakref.ref):
1038            __slots__ = "slot1", "slot2"
1039            def __new__(type, ob, callback, slot1, slot2):
1040                return weakref.ref.__new__(type, ob, callback)
1041            def __init__(self, ob, callback, slot1, slot2):
1042                self.slot1 = slot1
1043                self.slot2 = slot2
1044            def meth(self):
1045                return self.slot1 + self.slot2
1046        o = Object(42)
1047        r = MyRef(o, None, "abc", "def")
1048        self.assertEqual(r.slot1, "abc")
1049        self.assertEqual(r.slot2, "def")
1050        self.assertEqual(r.meth(), "abcdef")
1051        self.assertFalse(hasattr(r, "__dict__"))
1052
1053    def test_subclass_refs_with_cycle(self):
1054        """Confirm https://bugs.python.org/issue3100 is fixed."""
1055        # An instance of a weakref subclass can have attributes.
1056        # If such a weakref holds the only strong reference to the object,
1057        # deleting the weakref will delete the object. In this case,
1058        # the callback must not be called, because the ref object is
1059        # being deleted.
1060        class MyRef(weakref.ref):
1061            pass
1062
1063        # Use a local callback, for "regrtest -R::"
1064        # to detect refcounting problems
1065        def callback(w):
1066            self.cbcalled += 1
1067
1068        o = C()
1069        r1 = MyRef(o, callback)
1070        r1.o = o
1071        del o
1072
1073        del r1 # Used to crash here
1074
1075        self.assertEqual(self.cbcalled, 0)
1076
1077        # Same test, with two weakrefs to the same object
1078        # (since code paths are different)
1079        o = C()
1080        r1 = MyRef(o, callback)
1081        r2 = MyRef(o, callback)
1082        r1.r = r2
1083        r2.o = o
1084        del o
1085        del r2
1086
1087        del r1 # Used to crash here
1088
1089        self.assertEqual(self.cbcalled, 0)
1090
1091
1092class WeakMethodTestCase(unittest.TestCase):
1093
1094    def _subclass(self):
1095        """Return an Object subclass overriding `some_method`."""
1096        class C(Object):
1097            def some_method(self):
1098                return 6
1099        return C
1100
1101    def test_alive(self):
1102        o = Object(1)
1103        r = weakref.WeakMethod(o.some_method)
1104        self.assertIsInstance(r, weakref.ReferenceType)
1105        self.assertIsInstance(r(), type(o.some_method))
1106        self.assertIs(r().__self__, o)
1107        self.assertIs(r().__func__, o.some_method.__func__)
1108        self.assertEqual(r()(), 4)
1109
1110    def test_object_dead(self):
1111        o = Object(1)
1112        r = weakref.WeakMethod(o.some_method)
1113        del o
1114        gc.collect()
1115        self.assertIs(r(), None)
1116
1117    def test_method_dead(self):
1118        C = self._subclass()
1119        o = C(1)
1120        r = weakref.WeakMethod(o.some_method)
1121        del C.some_method
1122        gc.collect()
1123        self.assertIs(r(), None)
1124
1125    def test_callback_when_object_dead(self):
1126        # Test callback behaviour when object dies first.
1127        C = self._subclass()
1128        calls = []
1129        def cb(arg):
1130            calls.append(arg)
1131        o = C(1)
1132        r = weakref.WeakMethod(o.some_method, cb)
1133        del o
1134        gc.collect()
1135        self.assertEqual(calls, [r])
1136        # Callback is only called once.
1137        C.some_method = Object.some_method
1138        gc.collect()
1139        self.assertEqual(calls, [r])
1140
1141    def test_callback_when_method_dead(self):
1142        # Test callback behaviour when method dies first.
1143        C = self._subclass()
1144        calls = []
1145        def cb(arg):
1146            calls.append(arg)
1147        o = C(1)
1148        r = weakref.WeakMethod(o.some_method, cb)
1149        del C.some_method
1150        gc.collect()
1151        self.assertEqual(calls, [r])
1152        # Callback is only called once.
1153        del o
1154        gc.collect()
1155        self.assertEqual(calls, [r])
1156
1157    @support.cpython_only
1158    def test_no_cycles(self):
1159        # A WeakMethod doesn't create any reference cycle to itself.
1160        o = Object(1)
1161        def cb(_):
1162            pass
1163        r = weakref.WeakMethod(o.some_method, cb)
1164        wr = weakref.ref(r)
1165        del r
1166        self.assertIs(wr(), None)
1167
1168    def test_equality(self):
1169        def _eq(a, b):
1170            self.assertTrue(a == b)
1171            self.assertFalse(a != b)
1172        def _ne(a, b):
1173            self.assertTrue(a != b)
1174            self.assertFalse(a == b)
1175        x = Object(1)
1176        y = Object(1)
1177        a = weakref.WeakMethod(x.some_method)
1178        b = weakref.WeakMethod(y.some_method)
1179        c = weakref.WeakMethod(x.other_method)
1180        d = weakref.WeakMethod(y.other_method)
1181        # Objects equal, same method
1182        _eq(a, b)
1183        _eq(c, d)
1184        # Objects equal, different method
1185        _ne(a, c)
1186        _ne(a, d)
1187        _ne(b, c)
1188        _ne(b, d)
1189        # Objects unequal, same or different method
1190        z = Object(2)
1191        e = weakref.WeakMethod(z.some_method)
1192        f = weakref.WeakMethod(z.other_method)
1193        _ne(a, e)
1194        _ne(a, f)
1195        _ne(b, e)
1196        _ne(b, f)
1197        # Compare with different types
1198        _ne(a, x.some_method)
1199        _eq(a, ALWAYS_EQ)
1200        del x, y, z
1201        gc.collect()
1202        # Dead WeakMethods compare by identity
1203        refs = a, b, c, d, e, f
1204        for q in refs:
1205            for r in refs:
1206                self.assertEqual(q == r, q is r)
1207                self.assertEqual(q != r, q is not r)
1208
1209    def test_hashing(self):
1210        # Alive WeakMethods are hashable if the underlying object is
1211        # hashable.
1212        x = Object(1)
1213        y = Object(1)
1214        a = weakref.WeakMethod(x.some_method)
1215        b = weakref.WeakMethod(y.some_method)
1216        c = weakref.WeakMethod(y.other_method)
1217        # Since WeakMethod objects are equal, the hashes should be equal.
1218        self.assertEqual(hash(a), hash(b))
1219        ha = hash(a)
1220        # Dead WeakMethods retain their old hash value
1221        del x, y
1222        gc.collect()
1223        self.assertEqual(hash(a), ha)
1224        self.assertEqual(hash(b), ha)
1225        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1226        self.assertRaises(TypeError, hash, c)
1227
1228
1229class MappingTestCase(TestBase):
1230
1231    COUNT = 10
1232
1233    def check_len_cycles(self, dict_type, cons):
1234        N = 20
1235        items = [RefCycle() for i in range(N)]
1236        dct = dict_type(cons(o) for o in items)
1237        # Keep an iterator alive
1238        it = dct.items()
1239        try:
1240            next(it)
1241        except StopIteration:
1242            pass
1243        del items
1244        gc.collect()
1245        n1 = len(dct)
1246        del it
1247        gc.collect()
1248        n2 = len(dct)
1249        # one item may be kept alive inside the iterator
1250        self.assertIn(n1, (0, 1))
1251        self.assertEqual(n2, 0)
1252
1253    def test_weak_keyed_len_cycles(self):
1254        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1255
1256    def test_weak_valued_len_cycles(self):
1257        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1258
1259    def check_len_race(self, dict_type, cons):
1260        # Extended sanity checks for len() in the face of cyclic collection
1261        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1262        for th in range(1, 100):
1263            N = 20
1264            gc.collect(0)
1265            gc.set_threshold(th, th, th)
1266            items = [RefCycle() for i in range(N)]
1267            dct = dict_type(cons(o) for o in items)
1268            del items
1269            # All items will be collected at next garbage collection pass
1270            it = dct.items()
1271            try:
1272                next(it)
1273            except StopIteration:
1274                pass
1275            n1 = len(dct)
1276            del it
1277            n2 = len(dct)
1278            self.assertGreaterEqual(n1, 0)
1279            self.assertLessEqual(n1, N)
1280            self.assertGreaterEqual(n2, 0)
1281            self.assertLessEqual(n2, n1)
1282
1283    def test_weak_keyed_len_race(self):
1284        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1285
1286    def test_weak_valued_len_race(self):
1287        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1288
1289    def test_weak_values(self):
1290        #
1291        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1292        #
1293        dict, objects = self.make_weak_valued_dict()
1294        for o in objects:
1295            self.assertEqual(weakref.getweakrefcount(o), 1)
1296            self.assertIs(o, dict[o.arg],
1297                         "wrong object returned by weak dict!")
1298        items1 = list(dict.items())
1299        items2 = list(dict.copy().items())
1300        items1.sort()
1301        items2.sort()
1302        self.assertEqual(items1, items2,
1303                     "cloning of weak-valued dictionary did not work!")
1304        del items1, items2
1305        self.assertEqual(len(dict), self.COUNT)
1306        del objects[0]
1307        gc_collect()  # For PyPy or other GCs.
1308        self.assertEqual(len(dict), self.COUNT - 1,
1309                     "deleting object did not cause dictionary update")
1310        del objects, o
1311        gc_collect()  # For PyPy or other GCs.
1312        self.assertEqual(len(dict), 0,
1313                     "deleting the values did not clear the dictionary")
1314        # regression on SF bug #447152:
1315        dict = weakref.WeakValueDictionary()
1316        self.assertRaises(KeyError, dict.__getitem__, 1)
1317        dict[2] = C()
1318        gc_collect()  # For PyPy or other GCs.
1319        self.assertRaises(KeyError, dict.__getitem__, 2)
1320
1321    def test_weak_keys(self):
1322        #
1323        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1324        #  len(d), k in d.
1325        #
1326        dict, objects = self.make_weak_keyed_dict()
1327        for o in objects:
1328            self.assertEqual(weakref.getweakrefcount(o), 1,
1329                         "wrong number of weak references to %r!" % o)
1330            self.assertIs(o.arg, dict[o],
1331                         "wrong object returned by weak dict!")
1332        items1 = dict.items()
1333        items2 = dict.copy().items()
1334        self.assertEqual(set(items1), set(items2),
1335                     "cloning of weak-keyed dictionary did not work!")
1336        del items1, items2
1337        self.assertEqual(len(dict), self.COUNT)
1338        del objects[0]
1339        gc_collect()  # For PyPy or other GCs.
1340        self.assertEqual(len(dict), (self.COUNT - 1),
1341                     "deleting object did not cause dictionary update")
1342        del objects, o
1343        gc_collect()  # For PyPy or other GCs.
1344        self.assertEqual(len(dict), 0,
1345                     "deleting the keys did not clear the dictionary")
1346        o = Object(42)
1347        dict[o] = "What is the meaning of the universe?"
1348        self.assertIn(o, dict)
1349        self.assertNotIn(34, dict)
1350
1351    def test_weak_keyed_iters(self):
1352        dict, objects = self.make_weak_keyed_dict()
1353        self.check_iters(dict)
1354
1355        # Test keyrefs()
1356        refs = dict.keyrefs()
1357        self.assertEqual(len(refs), len(objects))
1358        objects2 = list(objects)
1359        for wr in refs:
1360            ob = wr()
1361            self.assertIn(ob, dict)
1362            self.assertIn(ob, dict)
1363            self.assertEqual(ob.arg, dict[ob])
1364            objects2.remove(ob)
1365        self.assertEqual(len(objects2), 0)
1366
1367        # Test iterkeyrefs()
1368        objects2 = list(objects)
1369        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1370        for wr in dict.keyrefs():
1371            ob = wr()
1372            self.assertIn(ob, dict)
1373            self.assertIn(ob, dict)
1374            self.assertEqual(ob.arg, dict[ob])
1375            objects2.remove(ob)
1376        self.assertEqual(len(objects2), 0)
1377
1378    def test_weak_valued_iters(self):
1379        dict, objects = self.make_weak_valued_dict()
1380        self.check_iters(dict)
1381
1382        # Test valuerefs()
1383        refs = dict.valuerefs()
1384        self.assertEqual(len(refs), len(objects))
1385        objects2 = list(objects)
1386        for wr in refs:
1387            ob = wr()
1388            self.assertEqual(ob, dict[ob.arg])
1389            self.assertEqual(ob.arg, dict[ob.arg].arg)
1390            objects2.remove(ob)
1391        self.assertEqual(len(objects2), 0)
1392
1393        # Test itervaluerefs()
1394        objects2 = list(objects)
1395        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1396        for wr in dict.itervaluerefs():
1397            ob = wr()
1398            self.assertEqual(ob, dict[ob.arg])
1399            self.assertEqual(ob.arg, dict[ob.arg].arg)
1400            objects2.remove(ob)
1401        self.assertEqual(len(objects2), 0)
1402
1403    def check_iters(self, dict):
1404        # item iterator:
1405        items = list(dict.items())
1406        for item in dict.items():
1407            items.remove(item)
1408        self.assertFalse(items, "items() did not touch all items")
1409
1410        # key iterator, via __iter__():
1411        keys = list(dict.keys())
1412        for k in dict:
1413            keys.remove(k)
1414        self.assertFalse(keys, "__iter__() did not touch all keys")
1415
1416        # key iterator, via iterkeys():
1417        keys = list(dict.keys())
1418        for k in dict.keys():
1419            keys.remove(k)
1420        self.assertFalse(keys, "iterkeys() did not touch all keys")
1421
1422        # value iterator:
1423        values = list(dict.values())
1424        for v in dict.values():
1425            values.remove(v)
1426        self.assertFalse(values,
1427                     "itervalues() did not touch all values")
1428
1429    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1430        n = len(dict)
1431        it = iter(getattr(dict, iter_name)())
1432        next(it)             # Trigger internal iteration
1433        # Destroy an object
1434        del objects[-1]
1435        gc.collect()    # just in case
1436        # We have removed either the first consumed object, or another one
1437        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1438        del it
1439        # The removal has been committed
1440        self.assertEqual(len(dict), n - 1)
1441
1442    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1443        # Check that we can explicitly mutate the weak dict without
1444        # interfering with delayed removal.
1445        # `testcontext` should create an iterator, destroy one of the
1446        # weakref'ed objects and then return a new key/value pair corresponding
1447        # to the destroyed object.
1448        with testcontext() as (k, v):
1449            self.assertNotIn(k, dict)
1450        with testcontext() as (k, v):
1451            self.assertRaises(KeyError, dict.__delitem__, k)
1452        self.assertNotIn(k, dict)
1453        with testcontext() as (k, v):
1454            self.assertRaises(KeyError, dict.pop, k)
1455        self.assertNotIn(k, dict)
1456        with testcontext() as (k, v):
1457            dict[k] = v
1458        self.assertEqual(dict[k], v)
1459        ddict = copy.copy(dict)
1460        with testcontext() as (k, v):
1461            dict.update(ddict)
1462        self.assertEqual(dict, ddict)
1463        with testcontext() as (k, v):
1464            dict.clear()
1465        self.assertEqual(len(dict), 0)
1466
1467    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1468        # Check that len() works when both iterating and removing keys
1469        # explicitly through various means (.pop(), .clear()...), while
1470        # implicit mutation is deferred because an iterator is alive.
1471        # (each call to testcontext() should schedule one item for removal
1472        #  for this test to work properly)
1473        o = Object(123456)
1474        with testcontext():
1475            n = len(dict)
1476            # Since underlying dict is ordered, first item is popped
1477            dict.pop(next(dict.keys()))
1478            self.assertEqual(len(dict), n - 1)
1479            dict[o] = o
1480            self.assertEqual(len(dict), n)
1481        # last item in objects is removed from dict in context shutdown
1482        with testcontext():
1483            self.assertEqual(len(dict), n - 1)
1484            # Then, (o, o) is popped
1485            dict.popitem()
1486            self.assertEqual(len(dict), n - 2)
1487        with testcontext():
1488            self.assertEqual(len(dict), n - 3)
1489            del dict[next(dict.keys())]
1490            self.assertEqual(len(dict), n - 4)
1491        with testcontext():
1492            self.assertEqual(len(dict), n - 5)
1493            dict.popitem()
1494            self.assertEqual(len(dict), n - 6)
1495        with testcontext():
1496            dict.clear()
1497            self.assertEqual(len(dict), 0)
1498        self.assertEqual(len(dict), 0)
1499
1500    def test_weak_keys_destroy_while_iterating(self):
1501        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1502        dict, objects = self.make_weak_keyed_dict()
1503        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1504        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1505        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1506        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1507        dict, objects = self.make_weak_keyed_dict()
1508        @contextlib.contextmanager
1509        def testcontext():
1510            try:
1511                it = iter(dict.items())
1512                next(it)
1513                # Schedule a key/value for removal and recreate it
1514                v = objects.pop().arg
1515                gc.collect()      # just in case
1516                yield Object(v), v
1517            finally:
1518                it = None           # should commit all removals
1519                gc.collect()
1520        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1521        # Issue #21173: len() fragile when keys are both implicitly and
1522        # explicitly removed.
1523        dict, objects = self.make_weak_keyed_dict()
1524        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1525
1526    def test_weak_values_destroy_while_iterating(self):
1527        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1528        dict, objects = self.make_weak_valued_dict()
1529        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1530        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1531        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1532        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1533        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1534        dict, objects = self.make_weak_valued_dict()
1535        @contextlib.contextmanager
1536        def testcontext():
1537            try:
1538                it = iter(dict.items())
1539                next(it)
1540                # Schedule a key/value for removal and recreate it
1541                k = objects.pop().arg
1542                gc.collect()      # just in case
1543                yield k, Object(k)
1544            finally:
1545                it = None           # should commit all removals
1546                gc.collect()
1547        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1548        dict, objects = self.make_weak_valued_dict()
1549        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1550
1551    def test_make_weak_keyed_dict_from_dict(self):
1552        o = Object(3)
1553        dict = weakref.WeakKeyDictionary({o:364})
1554        self.assertEqual(dict[o], 364)
1555
1556    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1557        o = Object(3)
1558        dict = weakref.WeakKeyDictionary({o:364})
1559        dict2 = weakref.WeakKeyDictionary(dict)
1560        self.assertEqual(dict[o], 364)
1561
1562    def make_weak_keyed_dict(self):
1563        dict = weakref.WeakKeyDictionary()
1564        objects = list(map(Object, range(self.COUNT)))
1565        for o in objects:
1566            dict[o] = o.arg
1567        return dict, objects
1568
1569    def test_make_weak_valued_dict_from_dict(self):
1570        o = Object(3)
1571        dict = weakref.WeakValueDictionary({364:o})
1572        self.assertEqual(dict[364], o)
1573
1574    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1575        o = Object(3)
1576        dict = weakref.WeakValueDictionary({364:o})
1577        dict2 = weakref.WeakValueDictionary(dict)
1578        self.assertEqual(dict[364], o)
1579
1580    def test_make_weak_valued_dict_misc(self):
1581        # errors
1582        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1583        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1584        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1585        # special keyword arguments
1586        o = Object(3)
1587        for kw in 'self', 'dict', 'other', 'iterable':
1588            d = weakref.WeakValueDictionary(**{kw: o})
1589            self.assertEqual(list(d.keys()), [kw])
1590            self.assertEqual(d[kw], o)
1591
1592    def make_weak_valued_dict(self):
1593        dict = weakref.WeakValueDictionary()
1594        objects = list(map(Object, range(self.COUNT)))
1595        for o in objects:
1596            dict[o.arg] = o
1597        return dict, objects
1598
1599    def check_popitem(self, klass, key1, value1, key2, value2):
1600        weakdict = klass()
1601        weakdict[key1] = value1
1602        weakdict[key2] = value2
1603        self.assertEqual(len(weakdict), 2)
1604        k, v = weakdict.popitem()
1605        self.assertEqual(len(weakdict), 1)
1606        if k is key1:
1607            self.assertIs(v, value1)
1608        else:
1609            self.assertIs(v, value2)
1610        k, v = weakdict.popitem()
1611        self.assertEqual(len(weakdict), 0)
1612        if k is key1:
1613            self.assertIs(v, value1)
1614        else:
1615            self.assertIs(v, value2)
1616
1617    def test_weak_valued_dict_popitem(self):
1618        self.check_popitem(weakref.WeakValueDictionary,
1619                           "key1", C(), "key2", C())
1620
1621    def test_weak_keyed_dict_popitem(self):
1622        self.check_popitem(weakref.WeakKeyDictionary,
1623                           C(), "value 1", C(), "value 2")
1624
1625    def check_setdefault(self, klass, key, value1, value2):
1626        self.assertIsNot(value1, value2,
1627                     "invalid test"
1628                     " -- value parameters must be distinct objects")
1629        weakdict = klass()
1630        o = weakdict.setdefault(key, value1)
1631        self.assertIs(o, value1)
1632        self.assertIn(key, weakdict)
1633        self.assertIs(weakdict.get(key), value1)
1634        self.assertIs(weakdict[key], value1)
1635
1636        o = weakdict.setdefault(key, value2)
1637        self.assertIs(o, value1)
1638        self.assertIn(key, weakdict)
1639        self.assertIs(weakdict.get(key), value1)
1640        self.assertIs(weakdict[key], value1)
1641
1642    def test_weak_valued_dict_setdefault(self):
1643        self.check_setdefault(weakref.WeakValueDictionary,
1644                              "key", C(), C())
1645
1646    def test_weak_keyed_dict_setdefault(self):
1647        self.check_setdefault(weakref.WeakKeyDictionary,
1648                              C(), "value 1", "value 2")
1649
1650    def check_update(self, klass, dict):
1651        #
1652        #  This exercises d.update(), len(d), d.keys(), k in d,
1653        #  d.get(), d[].
1654        #
1655        weakdict = klass()
1656        weakdict.update(dict)
1657        self.assertEqual(len(weakdict), len(dict))
1658        for k in weakdict.keys():
1659            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1660            v = dict.get(k)
1661            self.assertIs(v, weakdict[k])
1662            self.assertIs(v, weakdict.get(k))
1663        for k in dict.keys():
1664            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1665            v = dict[k]
1666            self.assertIs(v, weakdict[k])
1667            self.assertIs(v, weakdict.get(k))
1668
1669    def test_weak_valued_dict_update(self):
1670        self.check_update(weakref.WeakValueDictionary,
1671                          {1: C(), 'a': C(), C(): C()})
1672        # errors
1673        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1674        d = weakref.WeakValueDictionary()
1675        self.assertRaises(TypeError, d.update, {}, {})
1676        self.assertRaises(TypeError, d.update, (), ())
1677        self.assertEqual(list(d.keys()), [])
1678        # special keyword arguments
1679        o = Object(3)
1680        for kw in 'self', 'dict', 'other', 'iterable':
1681            d = weakref.WeakValueDictionary()
1682            d.update(**{kw: o})
1683            self.assertEqual(list(d.keys()), [kw])
1684            self.assertEqual(d[kw], o)
1685
1686    def test_weak_valued_union_operators(self):
1687        a = C()
1688        b = C()
1689        c = C()
1690        wvd1 = weakref.WeakValueDictionary({1: a})
1691        wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1692        wvd3 = wvd1.copy()
1693        d1 = {1: c, 3: b}
1694        pairs = [(5, c), (6, b)]
1695
1696        tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1697        self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1698        self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1699        wvd1 |= wvd2
1700        self.assertEqual(wvd1, tmp1)
1701
1702        tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1703        self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1704        self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1705        wvd2 |= d1
1706        self.assertEqual(wvd2, tmp2)
1707
1708        tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1709        tmp3 |= pairs
1710        self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1711        self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1712
1713        tmp4 = d1 | wvd3 # Testing .__ror__
1714        self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1715        self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1716
1717        del a
1718        self.assertNotIn(2, tmp1)
1719        self.assertNotIn(2, tmp2)
1720        self.assertNotIn(1, tmp3)
1721        self.assertNotIn(1, tmp4)
1722
1723    def test_weak_keyed_dict_update(self):
1724        self.check_update(weakref.WeakKeyDictionary,
1725                          {C(): 1, C(): 2, C(): 3})
1726
1727    def test_weak_keyed_delitem(self):
1728        d = weakref.WeakKeyDictionary()
1729        o1 = Object('1')
1730        o2 = Object('2')
1731        d[o1] = 'something'
1732        d[o2] = 'something'
1733        self.assertEqual(len(d), 2)
1734        del d[o1]
1735        self.assertEqual(len(d), 1)
1736        self.assertEqual(list(d.keys()), [o2])
1737
1738    def test_weak_keyed_union_operators(self):
1739        o1 = C()
1740        o2 = C()
1741        o3 = C()
1742        wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1743        wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1744        wkd3 = wkd1.copy()
1745        d1 = {o2: '5', o3: '6'}
1746        pairs = [(o2, 7), (o3, 8)]
1747
1748        tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1749        self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1750        self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1751        wkd1 |= wkd2
1752        self.assertEqual(wkd1, tmp1)
1753
1754        tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1755        self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1756        self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1757        wkd2 |= d1
1758        self.assertEqual(wkd2, tmp2)
1759
1760        tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1761        tmp3 |= pairs
1762        self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1763        self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1764
1765        tmp4 = d1 | wkd3 # Testing .__ror__
1766        self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1767        self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1768
1769        del o1
1770        self.assertNotIn(4, tmp1.values())
1771        self.assertNotIn(4, tmp2.values())
1772        self.assertNotIn(1, tmp3.values())
1773        self.assertNotIn(1, tmp4.values())
1774
1775    def test_weak_valued_delitem(self):
1776        d = weakref.WeakValueDictionary()
1777        o1 = Object('1')
1778        o2 = Object('2')
1779        d['something'] = o1
1780        d['something else'] = o2
1781        self.assertEqual(len(d), 2)
1782        del d['something']
1783        self.assertEqual(len(d), 1)
1784        self.assertEqual(list(d.items()), [('something else', o2)])
1785
1786    def test_weak_keyed_bad_delitem(self):
1787        d = weakref.WeakKeyDictionary()
1788        o = Object('1')
1789        # An attempt to delete an object that isn't there should raise
1790        # KeyError.  It didn't before 2.3.
1791        self.assertRaises(KeyError, d.__delitem__, o)
1792        self.assertRaises(KeyError, d.__getitem__, o)
1793
1794        # If a key isn't of a weakly referencable type, __getitem__ and
1795        # __setitem__ raise TypeError.  __delitem__ should too.
1796        self.assertRaises(TypeError, d.__delitem__,  13)
1797        self.assertRaises(TypeError, d.__getitem__,  13)
1798        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1799
1800    def test_weak_keyed_cascading_deletes(self):
1801        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1802        # over the keys via self.data.iterkeys().  If things vanished from
1803        # the dict during this (or got added), that caused a RuntimeError.
1804
1805        d = weakref.WeakKeyDictionary()
1806        mutate = False
1807
1808        class C(object):
1809            def __init__(self, i):
1810                self.value = i
1811            def __hash__(self):
1812                return hash(self.value)
1813            def __eq__(self, other):
1814                if mutate:
1815                    # Side effect that mutates the dict, by removing the
1816                    # last strong reference to a key.
1817                    del objs[-1]
1818                return self.value == other.value
1819
1820        objs = [C(i) for i in range(4)]
1821        for o in objs:
1822            d[o] = o.value
1823        del o   # now the only strong references to keys are in objs
1824        # Find the order in which iterkeys sees the keys.
1825        objs = list(d.keys())
1826        # Reverse it, so that the iteration implementation of __delitem__
1827        # has to keep looping to find the first object we delete.
1828        objs.reverse()
1829
1830        # Turn on mutation in C.__eq__.  The first time through the loop,
1831        # under the iterkeys() business the first comparison will delete
1832        # the last item iterkeys() would see, and that causes a
1833        #     RuntimeError: dictionary changed size during iteration
1834        # when the iterkeys() loop goes around to try comparing the next
1835        # key.  After this was fixed, it just deletes the last object *our*
1836        # "for o in obj" loop would have gotten to.
1837        mutate = True
1838        count = 0
1839        for o in objs:
1840            count += 1
1841            del d[o]
1842        gc_collect()  # For PyPy or other GCs.
1843        self.assertEqual(len(d), 0)
1844        self.assertEqual(count, 2)
1845
1846    def test_make_weak_valued_dict_repr(self):
1847        dict = weakref.WeakValueDictionary()
1848        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1849
1850    def test_make_weak_keyed_dict_repr(self):
1851        dict = weakref.WeakKeyDictionary()
1852        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1853
1854    def test_threaded_weak_valued_setdefault(self):
1855        d = weakref.WeakValueDictionary()
1856        with collect_in_thread():
1857            for i in range(100000):
1858                x = d.setdefault(10, RefCycle())
1859                self.assertIsNot(x, None)  # we never put None in there!
1860                del x
1861
1862    def test_threaded_weak_valued_pop(self):
1863        d = weakref.WeakValueDictionary()
1864        with collect_in_thread():
1865            for i in range(100000):
1866                d[10] = RefCycle()
1867                x = d.pop(10, 10)
1868                self.assertIsNot(x, None)  # we never put None in there!
1869
1870    def test_threaded_weak_valued_consistency(self):
1871        # Issue #28427: old keys should not remove new values from
1872        # WeakValueDictionary when collecting from another thread.
1873        d = weakref.WeakValueDictionary()
1874        with collect_in_thread():
1875            for i in range(200000):
1876                o = RefCycle()
1877                d[10] = o
1878                # o is still alive, so the dict can't be empty
1879                self.assertEqual(len(d), 1)
1880                o = None  # lose ref
1881
1882    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1883        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1884        # `deepcopy` should be either True or False.
1885        exc = []
1886
1887        class DummyKey:
1888            def __init__(self, ctr):
1889                self.ctr = ctr
1890
1891        class DummyValue:
1892            def __init__(self, ctr):
1893                self.ctr = ctr
1894
1895        def dict_copy(d, exc):
1896            try:
1897                if deepcopy is True:
1898                    _ = copy.deepcopy(d)
1899                else:
1900                    _ = d.copy()
1901            except Exception as ex:
1902                exc.append(ex)
1903
1904        def pop_and_collect(lst):
1905            gc_ctr = 0
1906            while lst:
1907                i = random.randint(0, len(lst) - 1)
1908                gc_ctr += 1
1909                lst.pop(i)
1910                if gc_ctr % 10000 == 0:
1911                    gc.collect()  # just in case
1912
1913        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1914
1915        d = type_()
1916        keys = []
1917        values = []
1918        # Initialize d with many entries
1919        for i in range(70000):
1920            k, v = DummyKey(i), DummyValue(i)
1921            keys.append(k)
1922            values.append(v)
1923            d[k] = v
1924            del k
1925            del v
1926
1927        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1928        if type_ is weakref.WeakKeyDictionary:
1929            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1930        else:  # weakref.WeakValueDictionary
1931            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1932
1933        t_copy.start()
1934        t_collect.start()
1935
1936        t_copy.join()
1937        t_collect.join()
1938
1939        # Test exceptions
1940        if exc:
1941            raise exc[0]
1942
1943    def test_threaded_weak_key_dict_copy(self):
1944        # Issue #35615: Weakref keys or values getting GC'ed during dict
1945        # copying should not result in a crash.
1946        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1947
1948    def test_threaded_weak_key_dict_deepcopy(self):
1949        # Issue #35615: Weakref keys or values getting GC'ed during dict
1950        # copying should not result in a crash.
1951        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1952
1953    def test_threaded_weak_value_dict_copy(self):
1954        # Issue #35615: Weakref keys or values getting GC'ed during dict
1955        # copying should not result in a crash.
1956        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1957
1958    def test_threaded_weak_value_dict_deepcopy(self):
1959        # Issue #35615: Weakref keys or values getting GC'ed during dict
1960        # copying should not result in a crash.
1961        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1962
1963    @support.cpython_only
1964    def test_remove_closure(self):
1965        d = weakref.WeakValueDictionary()
1966        self.assertIsNone(d._remove.__closure__)
1967
1968
1969from test import mapping_tests
1970
1971class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1972    """Check that WeakValueDictionary conforms to the mapping protocol"""
1973    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1974    type2test = weakref.WeakValueDictionary
1975    def _reference(self):
1976        return self.__ref.copy()
1977
1978class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1979    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1980    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1981    type2test = weakref.WeakKeyDictionary
1982    def _reference(self):
1983        return self.__ref.copy()
1984
1985
1986class FinalizeTestCase(unittest.TestCase):
1987
1988    class A:
1989        pass
1990
1991    def _collect_if_necessary(self):
1992        # we create no ref-cycles so in CPython no gc should be needed
1993        if sys.implementation.name != 'cpython':
1994            support.gc_collect()
1995
1996    def test_finalize(self):
1997        def add(x,y,z):
1998            res.append(x + y + z)
1999            return x + y + z
2000
2001        a = self.A()
2002
2003        res = []
2004        f = weakref.finalize(a, add, 67, 43, z=89)
2005        self.assertEqual(f.alive, True)
2006        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
2007        self.assertEqual(f(), 199)
2008        self.assertEqual(f(), None)
2009        self.assertEqual(f(), None)
2010        self.assertEqual(f.peek(), None)
2011        self.assertEqual(f.detach(), None)
2012        self.assertEqual(f.alive, False)
2013        self.assertEqual(res, [199])
2014
2015        res = []
2016        f = weakref.finalize(a, add, 67, 43, 89)
2017        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
2018        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
2019        self.assertEqual(f(), None)
2020        self.assertEqual(f(), None)
2021        self.assertEqual(f.peek(), None)
2022        self.assertEqual(f.detach(), None)
2023        self.assertEqual(f.alive, False)
2024        self.assertEqual(res, [])
2025
2026        res = []
2027        f = weakref.finalize(a, add, x=67, y=43, z=89)
2028        del a
2029        self._collect_if_necessary()
2030        self.assertEqual(f(), None)
2031        self.assertEqual(f(), None)
2032        self.assertEqual(f.peek(), None)
2033        self.assertEqual(f.detach(), None)
2034        self.assertEqual(f.alive, False)
2035        self.assertEqual(res, [199])
2036
2037    def test_arg_errors(self):
2038        def fin(*args, **kwargs):
2039            res.append((args, kwargs))
2040
2041        a = self.A()
2042
2043        res = []
2044        f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
2045        self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
2046        f()
2047        self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
2048
2049        with self.assertRaises(TypeError):
2050            weakref.finalize(a, func=fin, arg=1)
2051        with self.assertRaises(TypeError):
2052            weakref.finalize(obj=a, func=fin, arg=1)
2053        self.assertRaises(TypeError, weakref.finalize, a)
2054        self.assertRaises(TypeError, weakref.finalize)
2055
2056    def test_order(self):
2057        a = self.A()
2058        res = []
2059
2060        f1 = weakref.finalize(a, res.append, 'f1')
2061        f2 = weakref.finalize(a, res.append, 'f2')
2062        f3 = weakref.finalize(a, res.append, 'f3')
2063        f4 = weakref.finalize(a, res.append, 'f4')
2064        f5 = weakref.finalize(a, res.append, 'f5')
2065
2066        # make sure finalizers can keep themselves alive
2067        del f1, f4
2068
2069        self.assertTrue(f2.alive)
2070        self.assertTrue(f3.alive)
2071        self.assertTrue(f5.alive)
2072
2073        self.assertTrue(f5.detach())
2074        self.assertFalse(f5.alive)
2075
2076        f5()                       # nothing because previously unregistered
2077        res.append('A')
2078        f3()                       # => res.append('f3')
2079        self.assertFalse(f3.alive)
2080        res.append('B')
2081        f3()                       # nothing because previously called
2082        res.append('C')
2083        del a
2084        self._collect_if_necessary()
2085                                   # => res.append('f4')
2086                                   # => res.append('f2')
2087                                   # => res.append('f1')
2088        self.assertFalse(f2.alive)
2089        res.append('D')
2090        f2()                       # nothing because previously called by gc
2091
2092        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2093        self.assertEqual(res, expected)
2094
2095    def test_all_freed(self):
2096        # we want a weakrefable subclass of weakref.finalize
2097        class MyFinalizer(weakref.finalize):
2098            pass
2099
2100        a = self.A()
2101        res = []
2102        def callback():
2103            res.append(123)
2104        f = MyFinalizer(a, callback)
2105
2106        wr_callback = weakref.ref(callback)
2107        wr_f = weakref.ref(f)
2108        del callback, f
2109
2110        self.assertIsNotNone(wr_callback())
2111        self.assertIsNotNone(wr_f())
2112
2113        del a
2114        self._collect_if_necessary()
2115
2116        self.assertIsNone(wr_callback())
2117        self.assertIsNone(wr_f())
2118        self.assertEqual(res, [123])
2119
2120    @classmethod
2121    def run_in_child(cls):
2122        def error():
2123            # Create an atexit finalizer from inside a finalizer called
2124            # at exit.  This should be the next to be run.
2125            g1 = weakref.finalize(cls, print, 'g1')
2126            print('f3 error')
2127            1/0
2128
2129        # cls should stay alive till atexit callbacks run
2130        f1 = weakref.finalize(cls, print, 'f1', _global_var)
2131        f2 = weakref.finalize(cls, print, 'f2', _global_var)
2132        f3 = weakref.finalize(cls, error)
2133        f4 = weakref.finalize(cls, print, 'f4', _global_var)
2134
2135        assert f1.atexit == True
2136        f2.atexit = False
2137        assert f3.atexit == True
2138        assert f4.atexit == True
2139
2140    def test_atexit(self):
2141        prog = ('from test.test_weakref import FinalizeTestCase;'+
2142                'FinalizeTestCase.run_in_child()')
2143        rc, out, err = script_helper.assert_python_ok('-c', prog)
2144        out = out.decode('ascii').splitlines()
2145        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2146        self.assertTrue(b'ZeroDivisionError' in err)
2147
2148
2149libreftest = """ Doctest for examples in the library reference: weakref.rst
2150
2151>>> from test.support import gc_collect
2152>>> import weakref
2153>>> class Dict(dict):
2154...     pass
2155...
2156>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
2157>>> r = weakref.ref(obj)
2158>>> print(r() is obj)
2159True
2160
2161>>> import weakref
2162>>> class Object:
2163...     pass
2164...
2165>>> o = Object()
2166>>> r = weakref.ref(o)
2167>>> o2 = r()
2168>>> o is o2
2169True
2170>>> del o, o2
2171>>> gc_collect()  # For PyPy or other GCs.
2172>>> print(r())
2173None
2174
2175>>> import weakref
2176>>> class ExtendedRef(weakref.ref):
2177...     def __init__(self, ob, callback=None, **annotations):
2178...         super().__init__(ob, callback)
2179...         self.__counter = 0
2180...         for k, v in annotations.items():
2181...             setattr(self, k, v)
2182...     def __call__(self):
2183...         '''Return a pair containing the referent and the number of
2184...         times the reference has been called.
2185...         '''
2186...         ob = super().__call__()
2187...         if ob is not None:
2188...             self.__counter += 1
2189...             ob = (ob, self.__counter)
2190...         return ob
2191...
2192>>> class A:   # not in docs from here, just testing the ExtendedRef
2193...     pass
2194...
2195>>> a = A()
2196>>> r = ExtendedRef(a, foo=1, bar="baz")
2197>>> r.foo
21981
2199>>> r.bar
2200'baz'
2201>>> r()[1]
22021
2203>>> r()[1]
22042
2205>>> r()[0] is a
2206True
2207
2208
2209>>> import weakref
2210>>> _id2obj_dict = weakref.WeakValueDictionary()
2211>>> def remember(obj):
2212...     oid = id(obj)
2213...     _id2obj_dict[oid] = obj
2214...     return oid
2215...
2216>>> def id2obj(oid):
2217...     return _id2obj_dict[oid]
2218...
2219>>> a = A()             # from here, just testing
2220>>> a_id = remember(a)
2221>>> id2obj(a_id) is a
2222True
2223>>> del a
2224>>> gc_collect()  # For PyPy or other GCs.
2225>>> try:
2226...     id2obj(a_id)
2227... except KeyError:
2228...     print('OK')
2229... else:
2230...     print('WeakValueDictionary error')
2231OK
2232
2233"""
2234
2235__test__ = {'libreftest' : libreftest}
2236
2237def load_tests(loader, tests, pattern):
2238    tests.addTest(doctest.DocTestSuite())
2239    return tests
2240
2241
2242if __name__ == "__main__":
2243    unittest.main()
2244