1import gc 2import sys 3import doctest 4import unittest 5import collections 6import weakref 7import operator 8import contextlib 9import copy 10import threading 11import time 12import random 13 14from test import support 15from test.support import script_helper, ALWAYS_EQ 16from test.support import gc_collect 17 18# Used in ReferencesTestCase.test_ref_created_during_del() . 19ref_from_del = None 20 21# Used by FinalizeTestCase as a global that may be replaced by None 22# when the interpreter shuts down. 23_global_var = 'foobar' 24 25class C: 26 def method(self): 27 pass 28 29 30class Callable: 31 bar = None 32 33 def __call__(self, x): 34 self.bar = x 35 36 37def create_function(): 38 def f(): pass 39 return f 40 41def create_bound_method(): 42 return C().method 43 44 45class Object: 46 def __init__(self, arg): 47 self.arg = arg 48 def __repr__(self): 49 return "<Object %r>" % self.arg 50 def __eq__(self, other): 51 if isinstance(other, Object): 52 return self.arg == other.arg 53 return NotImplemented 54 def __lt__(self, other): 55 if isinstance(other, Object): 56 return self.arg < other.arg 57 return NotImplemented 58 def __hash__(self): 59 return hash(self.arg) 60 def some_method(self): 61 return 4 62 def other_method(self): 63 return 5 64 65 66class RefCycle: 67 def __init__(self): 68 self.cycle = self 69 70 71class TestBase(unittest.TestCase): 72 73 def setUp(self): 74 self.cbcalled = 0 75 76 def callback(self, ref): 77 self.cbcalled += 1 78 79 80@contextlib.contextmanager 81def collect_in_thread(period=0.0001): 82 """ 83 Ensure GC collections happen in a different thread, at a high frequency. 84 """ 85 please_stop = False 86 87 def collect(): 88 while not please_stop: 89 time.sleep(period) 90 gc.collect() 91 92 with support.disable_gc(): 93 t = threading.Thread(target=collect) 94 t.start() 95 try: 96 yield 97 finally: 98 please_stop = True 99 t.join() 100 101 102class ReferencesTestCase(TestBase): 103 104 def test_basic_ref(self): 105 self.check_basic_ref(C) 106 self.check_basic_ref(create_function) 107 self.check_basic_ref(create_bound_method) 108 109 # Just make sure the tp_repr handler doesn't raise an exception. 110 # Live reference: 111 o = C() 112 wr = weakref.ref(o) 113 repr(wr) 114 # Dead reference: 115 del o 116 repr(wr) 117 118 def test_basic_callback(self): 119 self.check_basic_callback(C) 120 self.check_basic_callback(create_function) 121 self.check_basic_callback(create_bound_method) 122 123 @support.cpython_only 124 def test_cfunction(self): 125 import _testcapi 126 create_cfunction = _testcapi.create_cfunction 127 f = create_cfunction() 128 wr = weakref.ref(f) 129 self.assertIs(wr(), f) 130 del f 131 self.assertIsNone(wr()) 132 self.check_basic_ref(create_cfunction) 133 self.check_basic_callback(create_cfunction) 134 135 def test_multiple_callbacks(self): 136 o = C() 137 ref1 = weakref.ref(o, self.callback) 138 ref2 = weakref.ref(o, self.callback) 139 del o 140 gc_collect() # For PyPy or other GCs. 141 self.assertIsNone(ref1(), "expected reference to be invalidated") 142 self.assertIsNone(ref2(), "expected reference to be invalidated") 143 self.assertEqual(self.cbcalled, 2, 144 "callback not called the right number of times") 145 146 def test_multiple_selfref_callbacks(self): 147 # Make sure all references are invalidated before callbacks are called 148 # 149 # What's important here is that we're using the first 150 # reference in the callback invoked on the second reference 151 # (the most recently created ref is cleaned up first). This 152 # tests that all references to the object are invalidated 153 # before any of the callbacks are invoked, so that we only 154 # have one invocation of _weakref.c:cleanup_helper() active 155 # for a particular object at a time. 156 # 157 def callback(object, self=self): 158 self.ref() 159 c = C() 160 self.ref = weakref.ref(c, callback) 161 ref1 = weakref.ref(c, callback) 162 del c 163 164 def test_constructor_kwargs(self): 165 c = C() 166 self.assertRaises(TypeError, weakref.ref, c, callback=None) 167 168 def test_proxy_ref(self): 169 o = C() 170 o.bar = 1 171 ref1 = weakref.proxy(o, self.callback) 172 ref2 = weakref.proxy(o, self.callback) 173 del o 174 gc_collect() # For PyPy or other GCs. 175 176 def check(proxy): 177 proxy.bar 178 179 self.assertRaises(ReferenceError, check, ref1) 180 self.assertRaises(ReferenceError, check, ref2) 181 ref3 = weakref.proxy(C()) 182 gc_collect() # For PyPy or other GCs. 183 self.assertRaises(ReferenceError, bool, ref3) 184 self.assertEqual(self.cbcalled, 2) 185 186 def check_basic_ref(self, factory): 187 o = factory() 188 ref = weakref.ref(o) 189 self.assertIsNotNone(ref(), 190 "weak reference to live object should be live") 191 o2 = ref() 192 self.assertIs(o, o2, 193 "<ref>() should return original object if live") 194 195 def check_basic_callback(self, factory): 196 self.cbcalled = 0 197 o = factory() 198 ref = weakref.ref(o, self.callback) 199 del o 200 gc_collect() # For PyPy or other GCs. 201 self.assertEqual(self.cbcalled, 1, 202 "callback did not properly set 'cbcalled'") 203 self.assertIsNone(ref(), 204 "ref2 should be dead after deleting object reference") 205 206 def test_ref_reuse(self): 207 o = C() 208 ref1 = weakref.ref(o) 209 # create a proxy to make sure that there's an intervening creation 210 # between these two; it should make no difference 211 proxy = weakref.proxy(o) 212 ref2 = weakref.ref(o) 213 self.assertIs(ref1, ref2, 214 "reference object w/out callback should be re-used") 215 216 o = C() 217 proxy = weakref.proxy(o) 218 ref1 = weakref.ref(o) 219 ref2 = weakref.ref(o) 220 self.assertIs(ref1, ref2, 221 "reference object w/out callback should be re-used") 222 self.assertEqual(weakref.getweakrefcount(o), 2, 223 "wrong weak ref count for object") 224 del proxy 225 gc_collect() # For PyPy or other GCs. 226 self.assertEqual(weakref.getweakrefcount(o), 1, 227 "wrong weak ref count for object after deleting proxy") 228 229 def test_proxy_reuse(self): 230 o = C() 231 proxy1 = weakref.proxy(o) 232 ref = weakref.ref(o) 233 proxy2 = weakref.proxy(o) 234 self.assertIs(proxy1, proxy2, 235 "proxy object w/out callback should have been re-used") 236 237 def test_basic_proxy(self): 238 o = C() 239 self.check_proxy(o, weakref.proxy(o)) 240 241 L = collections.UserList() 242 p = weakref.proxy(L) 243 self.assertFalse(p, "proxy for empty UserList should be false") 244 p.append(12) 245 self.assertEqual(len(L), 1) 246 self.assertTrue(p, "proxy for non-empty UserList should be true") 247 p[:] = [2, 3] 248 self.assertEqual(len(L), 2) 249 self.assertEqual(len(p), 2) 250 self.assertIn(3, p, "proxy didn't support __contains__() properly") 251 p[1] = 5 252 self.assertEqual(L[1], 5) 253 self.assertEqual(p[1], 5) 254 L2 = collections.UserList(L) 255 p2 = weakref.proxy(L2) 256 self.assertEqual(p, p2) 257 ## self.assertEqual(repr(L2), repr(p2)) 258 L3 = collections.UserList(range(10)) 259 p3 = weakref.proxy(L3) 260 self.assertEqual(L3[:], p3[:]) 261 self.assertEqual(L3[5:], p3[5:]) 262 self.assertEqual(L3[:5], p3[:5]) 263 self.assertEqual(L3[2:5], p3[2:5]) 264 265 def test_proxy_unicode(self): 266 # See bug 5037 267 class C(object): 268 def __str__(self): 269 return "string" 270 def __bytes__(self): 271 return b"bytes" 272 instance = C() 273 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 274 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 275 276 def test_proxy_index(self): 277 class C: 278 def __index__(self): 279 return 10 280 o = C() 281 p = weakref.proxy(o) 282 self.assertEqual(operator.index(p), 10) 283 284 def test_proxy_div(self): 285 class C: 286 def __floordiv__(self, other): 287 return 42 288 def __ifloordiv__(self, other): 289 return 21 290 o = C() 291 p = weakref.proxy(o) 292 self.assertEqual(p // 5, 42) 293 p //= 5 294 self.assertEqual(p, 21) 295 296 def test_proxy_matmul(self): 297 class C: 298 def __matmul__(self, other): 299 return 1729 300 def __rmatmul__(self, other): 301 return -163 302 def __imatmul__(self, other): 303 return 561 304 o = C() 305 p = weakref.proxy(o) 306 self.assertEqual(p @ 5, 1729) 307 self.assertEqual(5 @ p, -163) 308 p @= 5 309 self.assertEqual(p, 561) 310 311 # The PyWeakref_* C API is documented as allowing either NULL or 312 # None as the value for the callback, where either means "no 313 # callback". The "no callback" ref and proxy objects are supposed 314 # to be shared so long as they exist by all callers so long as 315 # they are active. In Python 2.3.3 and earlier, this guarantee 316 # was not honored, and was broken in different ways for 317 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 318 319 def test_shared_ref_without_callback(self): 320 self.check_shared_without_callback(weakref.ref) 321 322 def test_shared_proxy_without_callback(self): 323 self.check_shared_without_callback(weakref.proxy) 324 325 def check_shared_without_callback(self, makeref): 326 o = Object(1) 327 p1 = makeref(o, None) 328 p2 = makeref(o, None) 329 self.assertIs(p1, p2, "both callbacks were None in the C API") 330 del p1, p2 331 p1 = makeref(o) 332 p2 = makeref(o, None) 333 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 334 del p1, p2 335 p1 = makeref(o) 336 p2 = makeref(o) 337 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 338 del p1, p2 339 p1 = makeref(o, None) 340 p2 = makeref(o) 341 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 342 343 def test_callable_proxy(self): 344 o = Callable() 345 ref1 = weakref.proxy(o) 346 347 self.check_proxy(o, ref1) 348 349 self.assertIs(type(ref1), weakref.CallableProxyType, 350 "proxy is not of callable type") 351 ref1('twinkies!') 352 self.assertEqual(o.bar, 'twinkies!', 353 "call through proxy not passed through to original") 354 ref1(x='Splat.') 355 self.assertEqual(o.bar, 'Splat.', 356 "call through proxy not passed through to original") 357 358 # expect due to too few args 359 self.assertRaises(TypeError, ref1) 360 361 # expect due to too many args 362 self.assertRaises(TypeError, ref1, 1, 2, 3) 363 364 def check_proxy(self, o, proxy): 365 o.foo = 1 366 self.assertEqual(proxy.foo, 1, 367 "proxy does not reflect attribute addition") 368 o.foo = 2 369 self.assertEqual(proxy.foo, 2, 370 "proxy does not reflect attribute modification") 371 del o.foo 372 self.assertFalse(hasattr(proxy, 'foo'), 373 "proxy does not reflect attribute removal") 374 375 proxy.foo = 1 376 self.assertEqual(o.foo, 1, 377 "object does not reflect attribute addition via proxy") 378 proxy.foo = 2 379 self.assertEqual(o.foo, 2, 380 "object does not reflect attribute modification via proxy") 381 del proxy.foo 382 self.assertFalse(hasattr(o, 'foo'), 383 "object does not reflect attribute removal via proxy") 384 385 def test_proxy_deletion(self): 386 # Test clearing of SF bug #762891 387 class Foo: 388 result = None 389 def __delitem__(self, accessor): 390 self.result = accessor 391 g = Foo() 392 f = weakref.proxy(g) 393 del f[0] 394 self.assertEqual(f.result, 0) 395 396 def test_proxy_bool(self): 397 # Test clearing of SF bug #1170766 398 class List(list): pass 399 lyst = List() 400 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 401 402 def test_proxy_iter(self): 403 # Test fails with a debug build of the interpreter 404 # (see bpo-38395). 405 406 obj = None 407 408 class MyObj: 409 def __iter__(self): 410 nonlocal obj 411 del obj 412 return NotImplemented 413 414 obj = MyObj() 415 p = weakref.proxy(obj) 416 with self.assertRaises(TypeError): 417 # "blech" in p calls MyObj.__iter__ through the proxy, 418 # without keeping a reference to the real object, so it 419 # can be killed in the middle of the call 420 "blech" in p 421 422 def test_proxy_next(self): 423 arr = [4, 5, 6] 424 def iterator_func(): 425 yield from arr 426 it = iterator_func() 427 428 class IteratesWeakly: 429 def __iter__(self): 430 return weakref.proxy(it) 431 432 weak_it = IteratesWeakly() 433 434 # Calls proxy.__next__ 435 self.assertEqual(list(weak_it), [4, 5, 6]) 436 437 def test_proxy_bad_next(self): 438 # bpo-44720: PyIter_Next() shouldn't be called if the reference 439 # isn't an iterator. 440 441 not_an_iterator = lambda: 0 442 443 class A: 444 def __iter__(self): 445 return weakref.proxy(not_an_iterator) 446 a = A() 447 448 msg = "Weakref proxy referenced a non-iterator" 449 with self.assertRaisesRegex(TypeError, msg): 450 list(a) 451 452 def test_proxy_reversed(self): 453 class MyObj: 454 def __len__(self): 455 return 3 456 def __reversed__(self): 457 return iter('cba') 458 459 obj = MyObj() 460 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba") 461 462 def test_proxy_hash(self): 463 class MyObj: 464 def __hash__(self): 465 return 42 466 467 obj = MyObj() 468 with self.assertRaises(TypeError): 469 hash(weakref.proxy(obj)) 470 471 class MyObj: 472 __hash__ = None 473 474 obj = MyObj() 475 with self.assertRaises(TypeError): 476 hash(weakref.proxy(obj)) 477 478 def test_getweakrefcount(self): 479 o = C() 480 ref1 = weakref.ref(o) 481 ref2 = weakref.ref(o, self.callback) 482 self.assertEqual(weakref.getweakrefcount(o), 2, 483 "got wrong number of weak reference objects") 484 485 proxy1 = weakref.proxy(o) 486 proxy2 = weakref.proxy(o, self.callback) 487 self.assertEqual(weakref.getweakrefcount(o), 4, 488 "got wrong number of weak reference objects") 489 490 del ref1, ref2, proxy1, proxy2 491 gc_collect() # For PyPy or other GCs. 492 self.assertEqual(weakref.getweakrefcount(o), 0, 493 "weak reference objects not unlinked from" 494 " referent when discarded.") 495 496 # assumes ints do not support weakrefs 497 self.assertEqual(weakref.getweakrefcount(1), 0, 498 "got wrong number of weak reference objects for int") 499 500 def test_getweakrefs(self): 501 o = C() 502 ref1 = weakref.ref(o, self.callback) 503 ref2 = weakref.ref(o, self.callback) 504 del ref1 505 gc_collect() # For PyPy or other GCs. 506 self.assertEqual(weakref.getweakrefs(o), [ref2], 507 "list of refs does not match") 508 509 o = C() 510 ref1 = weakref.ref(o, self.callback) 511 ref2 = weakref.ref(o, self.callback) 512 del ref2 513 gc_collect() # For PyPy or other GCs. 514 self.assertEqual(weakref.getweakrefs(o), [ref1], 515 "list of refs does not match") 516 517 del ref1 518 gc_collect() # For PyPy or other GCs. 519 self.assertEqual(weakref.getweakrefs(o), [], 520 "list of refs not cleared") 521 522 # assumes ints do not support weakrefs 523 self.assertEqual(weakref.getweakrefs(1), [], 524 "list of refs does not match for int") 525 526 def test_newstyle_number_ops(self): 527 class F(float): 528 pass 529 f = F(2.0) 530 p = weakref.proxy(f) 531 self.assertEqual(p + 1.0, 3.0) 532 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 533 534 def test_callbacks_protected(self): 535 # Callbacks protected from already-set exceptions? 536 # Regression test for SF bug #478534. 537 class BogusError(Exception): 538 pass 539 data = {} 540 def remove(k): 541 del data[k] 542 def encapsulate(): 543 f = lambda : () 544 data[weakref.ref(f, remove)] = None 545 raise BogusError 546 try: 547 encapsulate() 548 except BogusError: 549 pass 550 else: 551 self.fail("exception not properly restored") 552 try: 553 encapsulate() 554 except BogusError: 555 pass 556 else: 557 self.fail("exception not properly restored") 558 559 def test_sf_bug_840829(self): 560 # "weakref callbacks and gc corrupt memory" 561 # subtype_dealloc erroneously exposed a new-style instance 562 # already in the process of getting deallocated to gc, 563 # causing double-deallocation if the instance had a weakref 564 # callback that triggered gc. 565 # If the bug exists, there probably won't be an obvious symptom 566 # in a release build. In a debug build, a segfault will occur 567 # when the second attempt to remove the instance from the "list 568 # of all objects" occurs. 569 570 import gc 571 572 class C(object): 573 pass 574 575 c = C() 576 wr = weakref.ref(c, lambda ignore: gc.collect()) 577 del c 578 579 # There endeth the first part. It gets worse. 580 del wr 581 582 c1 = C() 583 c1.i = C() 584 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 585 586 c2 = C() 587 c2.c1 = c1 588 del c1 # still alive because c2 points to it 589 590 # Now when subtype_dealloc gets called on c2, it's not enough just 591 # that c2 is immune from gc while the weakref callbacks associated 592 # with c2 execute (there are none in this 2nd half of the test, btw). 593 # subtype_dealloc goes on to call the base classes' deallocs too, 594 # so any gc triggered by weakref callbacks associated with anything 595 # torn down by a base class dealloc can also trigger double 596 # deallocation of c2. 597 del c2 598 599 def test_callback_in_cycle_1(self): 600 import gc 601 602 class J(object): 603 pass 604 605 class II(object): 606 def acallback(self, ignore): 607 self.J 608 609 I = II() 610 I.J = J 611 I.wr = weakref.ref(J, I.acallback) 612 613 # Now J and II are each in a self-cycle (as all new-style class 614 # objects are, since their __mro__ points back to them). I holds 615 # both a weak reference (I.wr) and a strong reference (I.J) to class 616 # J. I is also in a cycle (I.wr points to a weakref that references 617 # I.acallback). When we del these three, they all become trash, but 618 # the cycles prevent any of them from getting cleaned up immediately. 619 # Instead they have to wait for cyclic gc to deduce that they're 620 # trash. 621 # 622 # gc used to call tp_clear on all of them, and the order in which 623 # it does that is pretty accidental. The exact order in which we 624 # built up these things manages to provoke gc into running tp_clear 625 # in just the right order (I last). Calling tp_clear on II leaves 626 # behind an insane class object (its __mro__ becomes NULL). Calling 627 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 628 # just then because of the strong reference from I.J. Calling 629 # tp_clear on I starts to clear I's __dict__, and just happens to 630 # clear I.J first -- I.wr is still intact. That removes the last 631 # reference to J, which triggers the weakref callback. The callback 632 # tries to do "self.J", and instances of new-style classes look up 633 # attributes ("J") in the class dict first. The class (II) wants to 634 # search II.__mro__, but that's NULL. The result was a segfault in 635 # a release build, and an assert failure in a debug build. 636 del I, J, II 637 gc.collect() 638 639 def test_callback_in_cycle_2(self): 640 import gc 641 642 # This is just like test_callback_in_cycle_1, except that II is an 643 # old-style class. The symptom is different then: an instance of an 644 # old-style class looks in its own __dict__ first. 'J' happens to 645 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 646 # __dict__, so the attribute isn't found. The difference is that 647 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 648 # __mro__), so no segfault occurs. Instead it got: 649 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 650 # Exception exceptions.AttributeError: 651 # "II instance has no attribute 'J'" in <bound method II.acallback 652 # of <?.II instance at 0x00B9B4B8>> ignored 653 654 class J(object): 655 pass 656 657 class II: 658 def acallback(self, ignore): 659 self.J 660 661 I = II() 662 I.J = J 663 I.wr = weakref.ref(J, I.acallback) 664 665 del I, J, II 666 gc.collect() 667 668 def test_callback_in_cycle_3(self): 669 import gc 670 671 # This one broke the first patch that fixed the last two. In this 672 # case, the objects reachable from the callback aren't also reachable 673 # from the object (c1) *triggering* the callback: you can get to 674 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 675 # got tp_clear'ed by the time the c2.cb callback got invoked. 676 677 class C: 678 def cb(self, ignore): 679 self.me 680 self.c1 681 self.wr 682 683 c1, c2 = C(), C() 684 685 c2.me = c2 686 c2.c1 = c1 687 c2.wr = weakref.ref(c1, c2.cb) 688 689 del c1, c2 690 gc.collect() 691 692 def test_callback_in_cycle_4(self): 693 import gc 694 695 # Like test_callback_in_cycle_3, except c2 and c1 have different 696 # classes. c2's class (C) isn't reachable from c1 then, so protecting 697 # objects reachable from the dying object (c1) isn't enough to stop 698 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 699 # The result was a segfault (C.__mro__ was NULL when the callback 700 # tried to look up self.me). 701 702 class C(object): 703 def cb(self, ignore): 704 self.me 705 self.c1 706 self.wr 707 708 class D: 709 pass 710 711 c1, c2 = D(), C() 712 713 c2.me = c2 714 c2.c1 = c1 715 c2.wr = weakref.ref(c1, c2.cb) 716 717 del c1, c2, C, D 718 gc.collect() 719 720 def test_callback_in_cycle_resurrection(self): 721 import gc 722 723 # Do something nasty in a weakref callback: resurrect objects 724 # from dead cycles. For this to be attempted, the weakref and 725 # its callback must also be part of the cyclic trash (else the 726 # objects reachable via the callback couldn't be in cyclic trash 727 # to begin with -- the callback would act like an external root). 728 # But gc clears trash weakrefs with callbacks early now, which 729 # disables the callbacks, so the callbacks shouldn't get called 730 # at all (and so nothing actually gets resurrected). 731 732 alist = [] 733 class C(object): 734 def __init__(self, value): 735 self.attribute = value 736 737 def acallback(self, ignore): 738 alist.append(self.c) 739 740 c1, c2 = C(1), C(2) 741 c1.c = c2 742 c2.c = c1 743 c1.wr = weakref.ref(c2, c1.acallback) 744 c2.wr = weakref.ref(c1, c2.acallback) 745 746 def C_went_away(ignore): 747 alist.append("C went away") 748 wr = weakref.ref(C, C_went_away) 749 750 del c1, c2, C # make them all trash 751 self.assertEqual(alist, []) # del isn't enough to reclaim anything 752 753 gc.collect() 754 # c1.wr and c2.wr were part of the cyclic trash, so should have 755 # been cleared without their callbacks executing. OTOH, the weakref 756 # to C is bound to a function local (wr), and wasn't trash, so that 757 # callback should have been invoked when C went away. 758 self.assertEqual(alist, ["C went away"]) 759 # The remaining weakref should be dead now (its callback ran). 760 self.assertEqual(wr(), None) 761 762 del alist[:] 763 gc.collect() 764 self.assertEqual(alist, []) 765 766 def test_callbacks_on_callback(self): 767 import gc 768 769 # Set up weakref callbacks *on* weakref callbacks. 770 alist = [] 771 def safe_callback(ignore): 772 alist.append("safe_callback called") 773 774 class C(object): 775 def cb(self, ignore): 776 alist.append("cb called") 777 778 c, d = C(), C() 779 c.other = d 780 d.other = c 781 callback = c.cb 782 c.wr = weakref.ref(d, callback) # this won't trigger 783 d.wr = weakref.ref(callback, d.cb) # ditto 784 external_wr = weakref.ref(callback, safe_callback) # but this will 785 self.assertIs(external_wr(), callback) 786 787 # The weakrefs attached to c and d should get cleared, so that 788 # C.cb is never called. But external_wr isn't part of the cyclic 789 # trash, and no cyclic trash is reachable from it, so safe_callback 790 # should get invoked when the bound method object callback (c.cb) 791 # -- which is itself a callback, and also part of the cyclic trash -- 792 # gets reclaimed at the end of gc. 793 794 del callback, c, d, C 795 self.assertEqual(alist, []) # del isn't enough to clean up cycles 796 gc.collect() 797 self.assertEqual(alist, ["safe_callback called"]) 798 self.assertEqual(external_wr(), None) 799 800 del alist[:] 801 gc.collect() 802 self.assertEqual(alist, []) 803 804 def test_gc_during_ref_creation(self): 805 self.check_gc_during_creation(weakref.ref) 806 807 def test_gc_during_proxy_creation(self): 808 self.check_gc_during_creation(weakref.proxy) 809 810 def check_gc_during_creation(self, makeref): 811 thresholds = gc.get_threshold() 812 gc.set_threshold(1, 1, 1) 813 gc.collect() 814 class A: 815 pass 816 817 def callback(*args): 818 pass 819 820 referenced = A() 821 822 a = A() 823 a.a = a 824 a.wr = makeref(referenced) 825 826 try: 827 # now make sure the object and the ref get labeled as 828 # cyclic trash: 829 a = A() 830 weakref.ref(referenced, callback) 831 832 finally: 833 gc.set_threshold(*thresholds) 834 835 def test_ref_created_during_del(self): 836 # Bug #1377858 837 # A weakref created in an object's __del__() would crash the 838 # interpreter when the weakref was cleaned up since it would refer to 839 # non-existent memory. This test should not segfault the interpreter. 840 class Target(object): 841 def __del__(self): 842 global ref_from_del 843 ref_from_del = weakref.ref(self) 844 845 w = Target() 846 847 def test_init(self): 848 # Issue 3634 849 # <weakref to class>.__init__() doesn't check errors correctly 850 r = weakref.ref(Exception) 851 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 852 # No exception should be raised here 853 gc.collect() 854 855 def test_classes(self): 856 # Check that classes are weakrefable. 857 class A(object): 858 pass 859 l = [] 860 weakref.ref(int) 861 a = weakref.ref(A, l.append) 862 A = None 863 gc.collect() 864 self.assertEqual(a(), None) 865 self.assertEqual(l, [a]) 866 867 def test_equality(self): 868 # Alive weakrefs defer equality testing to their underlying object. 869 x = Object(1) 870 y = Object(1) 871 z = Object(2) 872 a = weakref.ref(x) 873 b = weakref.ref(y) 874 c = weakref.ref(z) 875 d = weakref.ref(x) 876 # Note how we directly test the operators here, to stress both 877 # __eq__ and __ne__. 878 self.assertTrue(a == b) 879 self.assertFalse(a != b) 880 self.assertFalse(a == c) 881 self.assertTrue(a != c) 882 self.assertTrue(a == d) 883 self.assertFalse(a != d) 884 self.assertFalse(a == x) 885 self.assertTrue(a != x) 886 self.assertTrue(a == ALWAYS_EQ) 887 self.assertFalse(a != ALWAYS_EQ) 888 del x, y, z 889 gc.collect() 890 for r in a, b, c: 891 # Sanity check 892 self.assertIs(r(), None) 893 # Dead weakrefs compare by identity: whether `a` and `d` are the 894 # same weakref object is an implementation detail, since they pointed 895 # to the same original object and didn't have a callback. 896 # (see issue #16453). 897 self.assertFalse(a == b) 898 self.assertTrue(a != b) 899 self.assertFalse(a == c) 900 self.assertTrue(a != c) 901 self.assertEqual(a == d, a is d) 902 self.assertEqual(a != d, a is not d) 903 904 def test_ordering(self): 905 # weakrefs cannot be ordered, even if the underlying objects can. 906 ops = [operator.lt, operator.gt, operator.le, operator.ge] 907 x = Object(1) 908 y = Object(1) 909 a = weakref.ref(x) 910 b = weakref.ref(y) 911 for op in ops: 912 self.assertRaises(TypeError, op, a, b) 913 # Same when dead. 914 del x, y 915 gc.collect() 916 for op in ops: 917 self.assertRaises(TypeError, op, a, b) 918 919 def test_hashing(self): 920 # Alive weakrefs hash the same as the underlying object 921 x = Object(42) 922 y = Object(42) 923 a = weakref.ref(x) 924 b = weakref.ref(y) 925 self.assertEqual(hash(a), hash(42)) 926 del x, y 927 gc.collect() 928 # Dead weakrefs: 929 # - retain their hash is they were hashed when alive; 930 # - otherwise, cannot be hashed. 931 self.assertEqual(hash(a), hash(42)) 932 self.assertRaises(TypeError, hash, b) 933 934 def test_trashcan_16602(self): 935 # Issue #16602: when a weakref's target was part of a long 936 # deallocation chain, the trashcan mechanism could delay clearing 937 # of the weakref and make the target object visible from outside 938 # code even though its refcount had dropped to 0. A crash ensued. 939 class C: 940 def __init__(self, parent): 941 if not parent: 942 return 943 wself = weakref.ref(self) 944 def cb(wparent): 945 o = wself() 946 self.wparent = weakref.ref(parent, cb) 947 948 d = weakref.WeakKeyDictionary() 949 root = c = C(None) 950 for n in range(100): 951 d[c] = c = C(c) 952 del root 953 gc.collect() 954 955 def test_callback_attribute(self): 956 x = Object(1) 957 callback = lambda ref: None 958 ref1 = weakref.ref(x, callback) 959 self.assertIs(ref1.__callback__, callback) 960 961 ref2 = weakref.ref(x) 962 self.assertIsNone(ref2.__callback__) 963 964 def test_callback_attribute_after_deletion(self): 965 x = Object(1) 966 ref = weakref.ref(x, self.callback) 967 self.assertIsNotNone(ref.__callback__) 968 del x 969 support.gc_collect() 970 self.assertIsNone(ref.__callback__) 971 972 def test_set_callback_attribute(self): 973 x = Object(1) 974 callback = lambda ref: None 975 ref1 = weakref.ref(x, callback) 976 with self.assertRaises(AttributeError): 977 ref1.__callback__ = lambda ref: None 978 979 def test_callback_gcs(self): 980 class ObjectWithDel(Object): 981 def __del__(self): pass 982 x = ObjectWithDel(1) 983 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 984 del x 985 support.gc_collect() 986 987 988class SubclassableWeakrefTestCase(TestBase): 989 990 def test_subclass_refs(self): 991 class MyRef(weakref.ref): 992 def __init__(self, ob, callback=None, value=42): 993 self.value = value 994 super().__init__(ob, callback) 995 def __call__(self): 996 self.called = True 997 return super().__call__() 998 o = Object("foo") 999 mr = MyRef(o, value=24) 1000 self.assertIs(mr(), o) 1001 self.assertTrue(mr.called) 1002 self.assertEqual(mr.value, 24) 1003 del o 1004 gc_collect() # For PyPy or other GCs. 1005 self.assertIsNone(mr()) 1006 self.assertTrue(mr.called) 1007 1008 def test_subclass_refs_dont_replace_standard_refs(self): 1009 class MyRef(weakref.ref): 1010 pass 1011 o = Object(42) 1012 r1 = MyRef(o) 1013 r2 = weakref.ref(o) 1014 self.assertIsNot(r1, r2) 1015 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 1016 self.assertEqual(weakref.getweakrefcount(o), 2) 1017 r3 = MyRef(o) 1018 self.assertEqual(weakref.getweakrefcount(o), 3) 1019 refs = weakref.getweakrefs(o) 1020 self.assertEqual(len(refs), 3) 1021 self.assertIs(r2, refs[0]) 1022 self.assertIn(r1, refs[1:]) 1023 self.assertIn(r3, refs[1:]) 1024 1025 def test_subclass_refs_dont_conflate_callbacks(self): 1026 class MyRef(weakref.ref): 1027 pass 1028 o = Object(42) 1029 r1 = MyRef(o, id) 1030 r2 = MyRef(o, str) 1031 self.assertIsNot(r1, r2) 1032 refs = weakref.getweakrefs(o) 1033 self.assertIn(r1, refs) 1034 self.assertIn(r2, refs) 1035 1036 def test_subclass_refs_with_slots(self): 1037 class MyRef(weakref.ref): 1038 __slots__ = "slot1", "slot2" 1039 def __new__(type, ob, callback, slot1, slot2): 1040 return weakref.ref.__new__(type, ob, callback) 1041 def __init__(self, ob, callback, slot1, slot2): 1042 self.slot1 = slot1 1043 self.slot2 = slot2 1044 def meth(self): 1045 return self.slot1 + self.slot2 1046 o = Object(42) 1047 r = MyRef(o, None, "abc", "def") 1048 self.assertEqual(r.slot1, "abc") 1049 self.assertEqual(r.slot2, "def") 1050 self.assertEqual(r.meth(), "abcdef") 1051 self.assertFalse(hasattr(r, "__dict__")) 1052 1053 def test_subclass_refs_with_cycle(self): 1054 """Confirm https://bugs.python.org/issue3100 is fixed.""" 1055 # An instance of a weakref subclass can have attributes. 1056 # If such a weakref holds the only strong reference to the object, 1057 # deleting the weakref will delete the object. In this case, 1058 # the callback must not be called, because the ref object is 1059 # being deleted. 1060 class MyRef(weakref.ref): 1061 pass 1062 1063 # Use a local callback, for "regrtest -R::" 1064 # to detect refcounting problems 1065 def callback(w): 1066 self.cbcalled += 1 1067 1068 o = C() 1069 r1 = MyRef(o, callback) 1070 r1.o = o 1071 del o 1072 1073 del r1 # Used to crash here 1074 1075 self.assertEqual(self.cbcalled, 0) 1076 1077 # Same test, with two weakrefs to the same object 1078 # (since code paths are different) 1079 o = C() 1080 r1 = MyRef(o, callback) 1081 r2 = MyRef(o, callback) 1082 r1.r = r2 1083 r2.o = o 1084 del o 1085 del r2 1086 1087 del r1 # Used to crash here 1088 1089 self.assertEqual(self.cbcalled, 0) 1090 1091 1092class WeakMethodTestCase(unittest.TestCase): 1093 1094 def _subclass(self): 1095 """Return an Object subclass overriding `some_method`.""" 1096 class C(Object): 1097 def some_method(self): 1098 return 6 1099 return C 1100 1101 def test_alive(self): 1102 o = Object(1) 1103 r = weakref.WeakMethod(o.some_method) 1104 self.assertIsInstance(r, weakref.ReferenceType) 1105 self.assertIsInstance(r(), type(o.some_method)) 1106 self.assertIs(r().__self__, o) 1107 self.assertIs(r().__func__, o.some_method.__func__) 1108 self.assertEqual(r()(), 4) 1109 1110 def test_object_dead(self): 1111 o = Object(1) 1112 r = weakref.WeakMethod(o.some_method) 1113 del o 1114 gc.collect() 1115 self.assertIs(r(), None) 1116 1117 def test_method_dead(self): 1118 C = self._subclass() 1119 o = C(1) 1120 r = weakref.WeakMethod(o.some_method) 1121 del C.some_method 1122 gc.collect() 1123 self.assertIs(r(), None) 1124 1125 def test_callback_when_object_dead(self): 1126 # Test callback behaviour when object dies first. 1127 C = self._subclass() 1128 calls = [] 1129 def cb(arg): 1130 calls.append(arg) 1131 o = C(1) 1132 r = weakref.WeakMethod(o.some_method, cb) 1133 del o 1134 gc.collect() 1135 self.assertEqual(calls, [r]) 1136 # Callback is only called once. 1137 C.some_method = Object.some_method 1138 gc.collect() 1139 self.assertEqual(calls, [r]) 1140 1141 def test_callback_when_method_dead(self): 1142 # Test callback behaviour when method dies first. 1143 C = self._subclass() 1144 calls = [] 1145 def cb(arg): 1146 calls.append(arg) 1147 o = C(1) 1148 r = weakref.WeakMethod(o.some_method, cb) 1149 del C.some_method 1150 gc.collect() 1151 self.assertEqual(calls, [r]) 1152 # Callback is only called once. 1153 del o 1154 gc.collect() 1155 self.assertEqual(calls, [r]) 1156 1157 @support.cpython_only 1158 def test_no_cycles(self): 1159 # A WeakMethod doesn't create any reference cycle to itself. 1160 o = Object(1) 1161 def cb(_): 1162 pass 1163 r = weakref.WeakMethod(o.some_method, cb) 1164 wr = weakref.ref(r) 1165 del r 1166 self.assertIs(wr(), None) 1167 1168 def test_equality(self): 1169 def _eq(a, b): 1170 self.assertTrue(a == b) 1171 self.assertFalse(a != b) 1172 def _ne(a, b): 1173 self.assertTrue(a != b) 1174 self.assertFalse(a == b) 1175 x = Object(1) 1176 y = Object(1) 1177 a = weakref.WeakMethod(x.some_method) 1178 b = weakref.WeakMethod(y.some_method) 1179 c = weakref.WeakMethod(x.other_method) 1180 d = weakref.WeakMethod(y.other_method) 1181 # Objects equal, same method 1182 _eq(a, b) 1183 _eq(c, d) 1184 # Objects equal, different method 1185 _ne(a, c) 1186 _ne(a, d) 1187 _ne(b, c) 1188 _ne(b, d) 1189 # Objects unequal, same or different method 1190 z = Object(2) 1191 e = weakref.WeakMethod(z.some_method) 1192 f = weakref.WeakMethod(z.other_method) 1193 _ne(a, e) 1194 _ne(a, f) 1195 _ne(b, e) 1196 _ne(b, f) 1197 # Compare with different types 1198 _ne(a, x.some_method) 1199 _eq(a, ALWAYS_EQ) 1200 del x, y, z 1201 gc.collect() 1202 # Dead WeakMethods compare by identity 1203 refs = a, b, c, d, e, f 1204 for q in refs: 1205 for r in refs: 1206 self.assertEqual(q == r, q is r) 1207 self.assertEqual(q != r, q is not r) 1208 1209 def test_hashing(self): 1210 # Alive WeakMethods are hashable if the underlying object is 1211 # hashable. 1212 x = Object(1) 1213 y = Object(1) 1214 a = weakref.WeakMethod(x.some_method) 1215 b = weakref.WeakMethod(y.some_method) 1216 c = weakref.WeakMethod(y.other_method) 1217 # Since WeakMethod objects are equal, the hashes should be equal. 1218 self.assertEqual(hash(a), hash(b)) 1219 ha = hash(a) 1220 # Dead WeakMethods retain their old hash value 1221 del x, y 1222 gc.collect() 1223 self.assertEqual(hash(a), ha) 1224 self.assertEqual(hash(b), ha) 1225 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1226 self.assertRaises(TypeError, hash, c) 1227 1228 1229class MappingTestCase(TestBase): 1230 1231 COUNT = 10 1232 1233 def check_len_cycles(self, dict_type, cons): 1234 N = 20 1235 items = [RefCycle() for i in range(N)] 1236 dct = dict_type(cons(o) for o in items) 1237 # Keep an iterator alive 1238 it = dct.items() 1239 try: 1240 next(it) 1241 except StopIteration: 1242 pass 1243 del items 1244 gc.collect() 1245 n1 = len(dct) 1246 del it 1247 gc.collect() 1248 n2 = len(dct) 1249 # one item may be kept alive inside the iterator 1250 self.assertIn(n1, (0, 1)) 1251 self.assertEqual(n2, 0) 1252 1253 def test_weak_keyed_len_cycles(self): 1254 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1255 1256 def test_weak_valued_len_cycles(self): 1257 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1258 1259 def check_len_race(self, dict_type, cons): 1260 # Extended sanity checks for len() in the face of cyclic collection 1261 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1262 for th in range(1, 100): 1263 N = 20 1264 gc.collect(0) 1265 gc.set_threshold(th, th, th) 1266 items = [RefCycle() for i in range(N)] 1267 dct = dict_type(cons(o) for o in items) 1268 del items 1269 # All items will be collected at next garbage collection pass 1270 it = dct.items() 1271 try: 1272 next(it) 1273 except StopIteration: 1274 pass 1275 n1 = len(dct) 1276 del it 1277 n2 = len(dct) 1278 self.assertGreaterEqual(n1, 0) 1279 self.assertLessEqual(n1, N) 1280 self.assertGreaterEqual(n2, 0) 1281 self.assertLessEqual(n2, n1) 1282 1283 def test_weak_keyed_len_race(self): 1284 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1285 1286 def test_weak_valued_len_race(self): 1287 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1288 1289 def test_weak_values(self): 1290 # 1291 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1292 # 1293 dict, objects = self.make_weak_valued_dict() 1294 for o in objects: 1295 self.assertEqual(weakref.getweakrefcount(o), 1) 1296 self.assertIs(o, dict[o.arg], 1297 "wrong object returned by weak dict!") 1298 items1 = list(dict.items()) 1299 items2 = list(dict.copy().items()) 1300 items1.sort() 1301 items2.sort() 1302 self.assertEqual(items1, items2, 1303 "cloning of weak-valued dictionary did not work!") 1304 del items1, items2 1305 self.assertEqual(len(dict), self.COUNT) 1306 del objects[0] 1307 gc_collect() # For PyPy or other GCs. 1308 self.assertEqual(len(dict), self.COUNT - 1, 1309 "deleting object did not cause dictionary update") 1310 del objects, o 1311 gc_collect() # For PyPy or other GCs. 1312 self.assertEqual(len(dict), 0, 1313 "deleting the values did not clear the dictionary") 1314 # regression on SF bug #447152: 1315 dict = weakref.WeakValueDictionary() 1316 self.assertRaises(KeyError, dict.__getitem__, 1) 1317 dict[2] = C() 1318 gc_collect() # For PyPy or other GCs. 1319 self.assertRaises(KeyError, dict.__getitem__, 2) 1320 1321 def test_weak_keys(self): 1322 # 1323 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1324 # len(d), k in d. 1325 # 1326 dict, objects = self.make_weak_keyed_dict() 1327 for o in objects: 1328 self.assertEqual(weakref.getweakrefcount(o), 1, 1329 "wrong number of weak references to %r!" % o) 1330 self.assertIs(o.arg, dict[o], 1331 "wrong object returned by weak dict!") 1332 items1 = dict.items() 1333 items2 = dict.copy().items() 1334 self.assertEqual(set(items1), set(items2), 1335 "cloning of weak-keyed dictionary did not work!") 1336 del items1, items2 1337 self.assertEqual(len(dict), self.COUNT) 1338 del objects[0] 1339 gc_collect() # For PyPy or other GCs. 1340 self.assertEqual(len(dict), (self.COUNT - 1), 1341 "deleting object did not cause dictionary update") 1342 del objects, o 1343 gc_collect() # For PyPy or other GCs. 1344 self.assertEqual(len(dict), 0, 1345 "deleting the keys did not clear the dictionary") 1346 o = Object(42) 1347 dict[o] = "What is the meaning of the universe?" 1348 self.assertIn(o, dict) 1349 self.assertNotIn(34, dict) 1350 1351 def test_weak_keyed_iters(self): 1352 dict, objects = self.make_weak_keyed_dict() 1353 self.check_iters(dict) 1354 1355 # Test keyrefs() 1356 refs = dict.keyrefs() 1357 self.assertEqual(len(refs), len(objects)) 1358 objects2 = list(objects) 1359 for wr in refs: 1360 ob = wr() 1361 self.assertIn(ob, dict) 1362 self.assertIn(ob, dict) 1363 self.assertEqual(ob.arg, dict[ob]) 1364 objects2.remove(ob) 1365 self.assertEqual(len(objects2), 0) 1366 1367 # Test iterkeyrefs() 1368 objects2 = list(objects) 1369 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1370 for wr in dict.keyrefs(): 1371 ob = wr() 1372 self.assertIn(ob, dict) 1373 self.assertIn(ob, dict) 1374 self.assertEqual(ob.arg, dict[ob]) 1375 objects2.remove(ob) 1376 self.assertEqual(len(objects2), 0) 1377 1378 def test_weak_valued_iters(self): 1379 dict, objects = self.make_weak_valued_dict() 1380 self.check_iters(dict) 1381 1382 # Test valuerefs() 1383 refs = dict.valuerefs() 1384 self.assertEqual(len(refs), len(objects)) 1385 objects2 = list(objects) 1386 for wr in refs: 1387 ob = wr() 1388 self.assertEqual(ob, dict[ob.arg]) 1389 self.assertEqual(ob.arg, dict[ob.arg].arg) 1390 objects2.remove(ob) 1391 self.assertEqual(len(objects2), 0) 1392 1393 # Test itervaluerefs() 1394 objects2 = list(objects) 1395 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1396 for wr in dict.itervaluerefs(): 1397 ob = wr() 1398 self.assertEqual(ob, dict[ob.arg]) 1399 self.assertEqual(ob.arg, dict[ob.arg].arg) 1400 objects2.remove(ob) 1401 self.assertEqual(len(objects2), 0) 1402 1403 def check_iters(self, dict): 1404 # item iterator: 1405 items = list(dict.items()) 1406 for item in dict.items(): 1407 items.remove(item) 1408 self.assertFalse(items, "items() did not touch all items") 1409 1410 # key iterator, via __iter__(): 1411 keys = list(dict.keys()) 1412 for k in dict: 1413 keys.remove(k) 1414 self.assertFalse(keys, "__iter__() did not touch all keys") 1415 1416 # key iterator, via iterkeys(): 1417 keys = list(dict.keys()) 1418 for k in dict.keys(): 1419 keys.remove(k) 1420 self.assertFalse(keys, "iterkeys() did not touch all keys") 1421 1422 # value iterator: 1423 values = list(dict.values()) 1424 for v in dict.values(): 1425 values.remove(v) 1426 self.assertFalse(values, 1427 "itervalues() did not touch all values") 1428 1429 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1430 n = len(dict) 1431 it = iter(getattr(dict, iter_name)()) 1432 next(it) # Trigger internal iteration 1433 # Destroy an object 1434 del objects[-1] 1435 gc.collect() # just in case 1436 # We have removed either the first consumed object, or another one 1437 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1438 del it 1439 # The removal has been committed 1440 self.assertEqual(len(dict), n - 1) 1441 1442 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1443 # Check that we can explicitly mutate the weak dict without 1444 # interfering with delayed removal. 1445 # `testcontext` should create an iterator, destroy one of the 1446 # weakref'ed objects and then return a new key/value pair corresponding 1447 # to the destroyed object. 1448 with testcontext() as (k, v): 1449 self.assertNotIn(k, dict) 1450 with testcontext() as (k, v): 1451 self.assertRaises(KeyError, dict.__delitem__, k) 1452 self.assertNotIn(k, dict) 1453 with testcontext() as (k, v): 1454 self.assertRaises(KeyError, dict.pop, k) 1455 self.assertNotIn(k, dict) 1456 with testcontext() as (k, v): 1457 dict[k] = v 1458 self.assertEqual(dict[k], v) 1459 ddict = copy.copy(dict) 1460 with testcontext() as (k, v): 1461 dict.update(ddict) 1462 self.assertEqual(dict, ddict) 1463 with testcontext() as (k, v): 1464 dict.clear() 1465 self.assertEqual(len(dict), 0) 1466 1467 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1468 # Check that len() works when both iterating and removing keys 1469 # explicitly through various means (.pop(), .clear()...), while 1470 # implicit mutation is deferred because an iterator is alive. 1471 # (each call to testcontext() should schedule one item for removal 1472 # for this test to work properly) 1473 o = Object(123456) 1474 with testcontext(): 1475 n = len(dict) 1476 # Since underlying dict is ordered, first item is popped 1477 dict.pop(next(dict.keys())) 1478 self.assertEqual(len(dict), n - 1) 1479 dict[o] = o 1480 self.assertEqual(len(dict), n) 1481 # last item in objects is removed from dict in context shutdown 1482 with testcontext(): 1483 self.assertEqual(len(dict), n - 1) 1484 # Then, (o, o) is popped 1485 dict.popitem() 1486 self.assertEqual(len(dict), n - 2) 1487 with testcontext(): 1488 self.assertEqual(len(dict), n - 3) 1489 del dict[next(dict.keys())] 1490 self.assertEqual(len(dict), n - 4) 1491 with testcontext(): 1492 self.assertEqual(len(dict), n - 5) 1493 dict.popitem() 1494 self.assertEqual(len(dict), n - 6) 1495 with testcontext(): 1496 dict.clear() 1497 self.assertEqual(len(dict), 0) 1498 self.assertEqual(len(dict), 0) 1499 1500 def test_weak_keys_destroy_while_iterating(self): 1501 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1502 dict, objects = self.make_weak_keyed_dict() 1503 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1504 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1505 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1506 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1507 dict, objects = self.make_weak_keyed_dict() 1508 @contextlib.contextmanager 1509 def testcontext(): 1510 try: 1511 it = iter(dict.items()) 1512 next(it) 1513 # Schedule a key/value for removal and recreate it 1514 v = objects.pop().arg 1515 gc.collect() # just in case 1516 yield Object(v), v 1517 finally: 1518 it = None # should commit all removals 1519 gc.collect() 1520 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1521 # Issue #21173: len() fragile when keys are both implicitly and 1522 # explicitly removed. 1523 dict, objects = self.make_weak_keyed_dict() 1524 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1525 1526 def test_weak_values_destroy_while_iterating(self): 1527 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1528 dict, objects = self.make_weak_valued_dict() 1529 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1530 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1531 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1532 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1533 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1534 dict, objects = self.make_weak_valued_dict() 1535 @contextlib.contextmanager 1536 def testcontext(): 1537 try: 1538 it = iter(dict.items()) 1539 next(it) 1540 # Schedule a key/value for removal and recreate it 1541 k = objects.pop().arg 1542 gc.collect() # just in case 1543 yield k, Object(k) 1544 finally: 1545 it = None # should commit all removals 1546 gc.collect() 1547 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1548 dict, objects = self.make_weak_valued_dict() 1549 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1550 1551 def test_make_weak_keyed_dict_from_dict(self): 1552 o = Object(3) 1553 dict = weakref.WeakKeyDictionary({o:364}) 1554 self.assertEqual(dict[o], 364) 1555 1556 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1557 o = Object(3) 1558 dict = weakref.WeakKeyDictionary({o:364}) 1559 dict2 = weakref.WeakKeyDictionary(dict) 1560 self.assertEqual(dict[o], 364) 1561 1562 def make_weak_keyed_dict(self): 1563 dict = weakref.WeakKeyDictionary() 1564 objects = list(map(Object, range(self.COUNT))) 1565 for o in objects: 1566 dict[o] = o.arg 1567 return dict, objects 1568 1569 def test_make_weak_valued_dict_from_dict(self): 1570 o = Object(3) 1571 dict = weakref.WeakValueDictionary({364:o}) 1572 self.assertEqual(dict[364], o) 1573 1574 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1575 o = Object(3) 1576 dict = weakref.WeakValueDictionary({364:o}) 1577 dict2 = weakref.WeakValueDictionary(dict) 1578 self.assertEqual(dict[364], o) 1579 1580 def test_make_weak_valued_dict_misc(self): 1581 # errors 1582 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1583 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1584 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1585 # special keyword arguments 1586 o = Object(3) 1587 for kw in 'self', 'dict', 'other', 'iterable': 1588 d = weakref.WeakValueDictionary(**{kw: o}) 1589 self.assertEqual(list(d.keys()), [kw]) 1590 self.assertEqual(d[kw], o) 1591 1592 def make_weak_valued_dict(self): 1593 dict = weakref.WeakValueDictionary() 1594 objects = list(map(Object, range(self.COUNT))) 1595 for o in objects: 1596 dict[o.arg] = o 1597 return dict, objects 1598 1599 def check_popitem(self, klass, key1, value1, key2, value2): 1600 weakdict = klass() 1601 weakdict[key1] = value1 1602 weakdict[key2] = value2 1603 self.assertEqual(len(weakdict), 2) 1604 k, v = weakdict.popitem() 1605 self.assertEqual(len(weakdict), 1) 1606 if k is key1: 1607 self.assertIs(v, value1) 1608 else: 1609 self.assertIs(v, value2) 1610 k, v = weakdict.popitem() 1611 self.assertEqual(len(weakdict), 0) 1612 if k is key1: 1613 self.assertIs(v, value1) 1614 else: 1615 self.assertIs(v, value2) 1616 1617 def test_weak_valued_dict_popitem(self): 1618 self.check_popitem(weakref.WeakValueDictionary, 1619 "key1", C(), "key2", C()) 1620 1621 def test_weak_keyed_dict_popitem(self): 1622 self.check_popitem(weakref.WeakKeyDictionary, 1623 C(), "value 1", C(), "value 2") 1624 1625 def check_setdefault(self, klass, key, value1, value2): 1626 self.assertIsNot(value1, value2, 1627 "invalid test" 1628 " -- value parameters must be distinct objects") 1629 weakdict = klass() 1630 o = weakdict.setdefault(key, value1) 1631 self.assertIs(o, value1) 1632 self.assertIn(key, weakdict) 1633 self.assertIs(weakdict.get(key), value1) 1634 self.assertIs(weakdict[key], value1) 1635 1636 o = weakdict.setdefault(key, value2) 1637 self.assertIs(o, value1) 1638 self.assertIn(key, weakdict) 1639 self.assertIs(weakdict.get(key), value1) 1640 self.assertIs(weakdict[key], value1) 1641 1642 def test_weak_valued_dict_setdefault(self): 1643 self.check_setdefault(weakref.WeakValueDictionary, 1644 "key", C(), C()) 1645 1646 def test_weak_keyed_dict_setdefault(self): 1647 self.check_setdefault(weakref.WeakKeyDictionary, 1648 C(), "value 1", "value 2") 1649 1650 def check_update(self, klass, dict): 1651 # 1652 # This exercises d.update(), len(d), d.keys(), k in d, 1653 # d.get(), d[]. 1654 # 1655 weakdict = klass() 1656 weakdict.update(dict) 1657 self.assertEqual(len(weakdict), len(dict)) 1658 for k in weakdict.keys(): 1659 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1660 v = dict.get(k) 1661 self.assertIs(v, weakdict[k]) 1662 self.assertIs(v, weakdict.get(k)) 1663 for k in dict.keys(): 1664 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1665 v = dict[k] 1666 self.assertIs(v, weakdict[k]) 1667 self.assertIs(v, weakdict.get(k)) 1668 1669 def test_weak_valued_dict_update(self): 1670 self.check_update(weakref.WeakValueDictionary, 1671 {1: C(), 'a': C(), C(): C()}) 1672 # errors 1673 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1674 d = weakref.WeakValueDictionary() 1675 self.assertRaises(TypeError, d.update, {}, {}) 1676 self.assertRaises(TypeError, d.update, (), ()) 1677 self.assertEqual(list(d.keys()), []) 1678 # special keyword arguments 1679 o = Object(3) 1680 for kw in 'self', 'dict', 'other', 'iterable': 1681 d = weakref.WeakValueDictionary() 1682 d.update(**{kw: o}) 1683 self.assertEqual(list(d.keys()), [kw]) 1684 self.assertEqual(d[kw], o) 1685 1686 def test_weak_valued_union_operators(self): 1687 a = C() 1688 b = C() 1689 c = C() 1690 wvd1 = weakref.WeakValueDictionary({1: a}) 1691 wvd2 = weakref.WeakValueDictionary({1: b, 2: a}) 1692 wvd3 = wvd1.copy() 1693 d1 = {1: c, 3: b} 1694 pairs = [(5, c), (6, b)] 1695 1696 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries 1697 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2)) 1698 self.assertIs(type(tmp1), weakref.WeakValueDictionary) 1699 wvd1 |= wvd2 1700 self.assertEqual(wvd1, tmp1) 1701 1702 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping 1703 self.assertEqual(dict(tmp2), dict(wvd2) | d1) 1704 self.assertIs(type(tmp2), weakref.WeakValueDictionary) 1705 wvd2 |= d1 1706 self.assertEqual(wvd2, tmp2) 1707 1708 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value 1709 tmp3 |= pairs 1710 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs)) 1711 self.assertIs(type(tmp3), weakref.WeakValueDictionary) 1712 1713 tmp4 = d1 | wvd3 # Testing .__ror__ 1714 self.assertEqual(dict(tmp4), d1 | dict(wvd3)) 1715 self.assertIs(type(tmp4), weakref.WeakValueDictionary) 1716 1717 del a 1718 self.assertNotIn(2, tmp1) 1719 self.assertNotIn(2, tmp2) 1720 self.assertNotIn(1, tmp3) 1721 self.assertNotIn(1, tmp4) 1722 1723 def test_weak_keyed_dict_update(self): 1724 self.check_update(weakref.WeakKeyDictionary, 1725 {C(): 1, C(): 2, C(): 3}) 1726 1727 def test_weak_keyed_delitem(self): 1728 d = weakref.WeakKeyDictionary() 1729 o1 = Object('1') 1730 o2 = Object('2') 1731 d[o1] = 'something' 1732 d[o2] = 'something' 1733 self.assertEqual(len(d), 2) 1734 del d[o1] 1735 self.assertEqual(len(d), 1) 1736 self.assertEqual(list(d.keys()), [o2]) 1737 1738 def test_weak_keyed_union_operators(self): 1739 o1 = C() 1740 o2 = C() 1741 o3 = C() 1742 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2}) 1743 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4}) 1744 wkd3 = wkd1.copy() 1745 d1 = {o2: '5', o3: '6'} 1746 pairs = [(o2, 7), (o3, 8)] 1747 1748 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries 1749 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2)) 1750 self.assertIs(type(tmp1), weakref.WeakKeyDictionary) 1751 wkd1 |= wkd2 1752 self.assertEqual(wkd1, tmp1) 1753 1754 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping 1755 self.assertEqual(dict(tmp2), dict(wkd2) | d1) 1756 self.assertIs(type(tmp2), weakref.WeakKeyDictionary) 1757 wkd2 |= d1 1758 self.assertEqual(wkd2, tmp2) 1759 1760 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value 1761 tmp3 |= pairs 1762 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs)) 1763 self.assertIs(type(tmp3), weakref.WeakKeyDictionary) 1764 1765 tmp4 = d1 | wkd3 # Testing .__ror__ 1766 self.assertEqual(dict(tmp4), d1 | dict(wkd3)) 1767 self.assertIs(type(tmp4), weakref.WeakKeyDictionary) 1768 1769 del o1 1770 self.assertNotIn(4, tmp1.values()) 1771 self.assertNotIn(4, tmp2.values()) 1772 self.assertNotIn(1, tmp3.values()) 1773 self.assertNotIn(1, tmp4.values()) 1774 1775 def test_weak_valued_delitem(self): 1776 d = weakref.WeakValueDictionary() 1777 o1 = Object('1') 1778 o2 = Object('2') 1779 d['something'] = o1 1780 d['something else'] = o2 1781 self.assertEqual(len(d), 2) 1782 del d['something'] 1783 self.assertEqual(len(d), 1) 1784 self.assertEqual(list(d.items()), [('something else', o2)]) 1785 1786 def test_weak_keyed_bad_delitem(self): 1787 d = weakref.WeakKeyDictionary() 1788 o = Object('1') 1789 # An attempt to delete an object that isn't there should raise 1790 # KeyError. It didn't before 2.3. 1791 self.assertRaises(KeyError, d.__delitem__, o) 1792 self.assertRaises(KeyError, d.__getitem__, o) 1793 1794 # If a key isn't of a weakly referencable type, __getitem__ and 1795 # __setitem__ raise TypeError. __delitem__ should too. 1796 self.assertRaises(TypeError, d.__delitem__, 13) 1797 self.assertRaises(TypeError, d.__getitem__, 13) 1798 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1799 1800 def test_weak_keyed_cascading_deletes(self): 1801 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1802 # over the keys via self.data.iterkeys(). If things vanished from 1803 # the dict during this (or got added), that caused a RuntimeError. 1804 1805 d = weakref.WeakKeyDictionary() 1806 mutate = False 1807 1808 class C(object): 1809 def __init__(self, i): 1810 self.value = i 1811 def __hash__(self): 1812 return hash(self.value) 1813 def __eq__(self, other): 1814 if mutate: 1815 # Side effect that mutates the dict, by removing the 1816 # last strong reference to a key. 1817 del objs[-1] 1818 return self.value == other.value 1819 1820 objs = [C(i) for i in range(4)] 1821 for o in objs: 1822 d[o] = o.value 1823 del o # now the only strong references to keys are in objs 1824 # Find the order in which iterkeys sees the keys. 1825 objs = list(d.keys()) 1826 # Reverse it, so that the iteration implementation of __delitem__ 1827 # has to keep looping to find the first object we delete. 1828 objs.reverse() 1829 1830 # Turn on mutation in C.__eq__. The first time through the loop, 1831 # under the iterkeys() business the first comparison will delete 1832 # the last item iterkeys() would see, and that causes a 1833 # RuntimeError: dictionary changed size during iteration 1834 # when the iterkeys() loop goes around to try comparing the next 1835 # key. After this was fixed, it just deletes the last object *our* 1836 # "for o in obj" loop would have gotten to. 1837 mutate = True 1838 count = 0 1839 for o in objs: 1840 count += 1 1841 del d[o] 1842 gc_collect() # For PyPy or other GCs. 1843 self.assertEqual(len(d), 0) 1844 self.assertEqual(count, 2) 1845 1846 def test_make_weak_valued_dict_repr(self): 1847 dict = weakref.WeakValueDictionary() 1848 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1849 1850 def test_make_weak_keyed_dict_repr(self): 1851 dict = weakref.WeakKeyDictionary() 1852 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1853 1854 def test_threaded_weak_valued_setdefault(self): 1855 d = weakref.WeakValueDictionary() 1856 with collect_in_thread(): 1857 for i in range(100000): 1858 x = d.setdefault(10, RefCycle()) 1859 self.assertIsNot(x, None) # we never put None in there! 1860 del x 1861 1862 def test_threaded_weak_valued_pop(self): 1863 d = weakref.WeakValueDictionary() 1864 with collect_in_thread(): 1865 for i in range(100000): 1866 d[10] = RefCycle() 1867 x = d.pop(10, 10) 1868 self.assertIsNot(x, None) # we never put None in there! 1869 1870 def test_threaded_weak_valued_consistency(self): 1871 # Issue #28427: old keys should not remove new values from 1872 # WeakValueDictionary when collecting from another thread. 1873 d = weakref.WeakValueDictionary() 1874 with collect_in_thread(): 1875 for i in range(200000): 1876 o = RefCycle() 1877 d[10] = o 1878 # o is still alive, so the dict can't be empty 1879 self.assertEqual(len(d), 1) 1880 o = None # lose ref 1881 1882 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1883 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1884 # `deepcopy` should be either True or False. 1885 exc = [] 1886 1887 class DummyKey: 1888 def __init__(self, ctr): 1889 self.ctr = ctr 1890 1891 class DummyValue: 1892 def __init__(self, ctr): 1893 self.ctr = ctr 1894 1895 def dict_copy(d, exc): 1896 try: 1897 if deepcopy is True: 1898 _ = copy.deepcopy(d) 1899 else: 1900 _ = d.copy() 1901 except Exception as ex: 1902 exc.append(ex) 1903 1904 def pop_and_collect(lst): 1905 gc_ctr = 0 1906 while lst: 1907 i = random.randint(0, len(lst) - 1) 1908 gc_ctr += 1 1909 lst.pop(i) 1910 if gc_ctr % 10000 == 0: 1911 gc.collect() # just in case 1912 1913 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 1914 1915 d = type_() 1916 keys = [] 1917 values = [] 1918 # Initialize d with many entries 1919 for i in range(70000): 1920 k, v = DummyKey(i), DummyValue(i) 1921 keys.append(k) 1922 values.append(v) 1923 d[k] = v 1924 del k 1925 del v 1926 1927 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 1928 if type_ is weakref.WeakKeyDictionary: 1929 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 1930 else: # weakref.WeakValueDictionary 1931 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 1932 1933 t_copy.start() 1934 t_collect.start() 1935 1936 t_copy.join() 1937 t_collect.join() 1938 1939 # Test exceptions 1940 if exc: 1941 raise exc[0] 1942 1943 def test_threaded_weak_key_dict_copy(self): 1944 # Issue #35615: Weakref keys or values getting GC'ed during dict 1945 # copying should not result in a crash. 1946 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 1947 1948 def test_threaded_weak_key_dict_deepcopy(self): 1949 # Issue #35615: Weakref keys or values getting GC'ed during dict 1950 # copying should not result in a crash. 1951 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 1952 1953 def test_threaded_weak_value_dict_copy(self): 1954 # Issue #35615: Weakref keys or values getting GC'ed during dict 1955 # copying should not result in a crash. 1956 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 1957 1958 def test_threaded_weak_value_dict_deepcopy(self): 1959 # Issue #35615: Weakref keys or values getting GC'ed during dict 1960 # copying should not result in a crash. 1961 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 1962 1963 @support.cpython_only 1964 def test_remove_closure(self): 1965 d = weakref.WeakValueDictionary() 1966 self.assertIsNone(d._remove.__closure__) 1967 1968 1969from test import mapping_tests 1970 1971class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1972 """Check that WeakValueDictionary conforms to the mapping protocol""" 1973 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1974 type2test = weakref.WeakValueDictionary 1975 def _reference(self): 1976 return self.__ref.copy() 1977 1978class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1979 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1980 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1981 type2test = weakref.WeakKeyDictionary 1982 def _reference(self): 1983 return self.__ref.copy() 1984 1985 1986class FinalizeTestCase(unittest.TestCase): 1987 1988 class A: 1989 pass 1990 1991 def _collect_if_necessary(self): 1992 # we create no ref-cycles so in CPython no gc should be needed 1993 if sys.implementation.name != 'cpython': 1994 support.gc_collect() 1995 1996 def test_finalize(self): 1997 def add(x,y,z): 1998 res.append(x + y + z) 1999 return x + y + z 2000 2001 a = self.A() 2002 2003 res = [] 2004 f = weakref.finalize(a, add, 67, 43, z=89) 2005 self.assertEqual(f.alive, True) 2006 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 2007 self.assertEqual(f(), 199) 2008 self.assertEqual(f(), None) 2009 self.assertEqual(f(), None) 2010 self.assertEqual(f.peek(), None) 2011 self.assertEqual(f.detach(), None) 2012 self.assertEqual(f.alive, False) 2013 self.assertEqual(res, [199]) 2014 2015 res = [] 2016 f = weakref.finalize(a, add, 67, 43, 89) 2017 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 2018 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 2019 self.assertEqual(f(), None) 2020 self.assertEqual(f(), None) 2021 self.assertEqual(f.peek(), None) 2022 self.assertEqual(f.detach(), None) 2023 self.assertEqual(f.alive, False) 2024 self.assertEqual(res, []) 2025 2026 res = [] 2027 f = weakref.finalize(a, add, x=67, y=43, z=89) 2028 del a 2029 self._collect_if_necessary() 2030 self.assertEqual(f(), None) 2031 self.assertEqual(f(), None) 2032 self.assertEqual(f.peek(), None) 2033 self.assertEqual(f.detach(), None) 2034 self.assertEqual(f.alive, False) 2035 self.assertEqual(res, [199]) 2036 2037 def test_arg_errors(self): 2038 def fin(*args, **kwargs): 2039 res.append((args, kwargs)) 2040 2041 a = self.A() 2042 2043 res = [] 2044 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) 2045 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) 2046 f() 2047 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) 2048 2049 with self.assertRaises(TypeError): 2050 weakref.finalize(a, func=fin, arg=1) 2051 with self.assertRaises(TypeError): 2052 weakref.finalize(obj=a, func=fin, arg=1) 2053 self.assertRaises(TypeError, weakref.finalize, a) 2054 self.assertRaises(TypeError, weakref.finalize) 2055 2056 def test_order(self): 2057 a = self.A() 2058 res = [] 2059 2060 f1 = weakref.finalize(a, res.append, 'f1') 2061 f2 = weakref.finalize(a, res.append, 'f2') 2062 f3 = weakref.finalize(a, res.append, 'f3') 2063 f4 = weakref.finalize(a, res.append, 'f4') 2064 f5 = weakref.finalize(a, res.append, 'f5') 2065 2066 # make sure finalizers can keep themselves alive 2067 del f1, f4 2068 2069 self.assertTrue(f2.alive) 2070 self.assertTrue(f3.alive) 2071 self.assertTrue(f5.alive) 2072 2073 self.assertTrue(f5.detach()) 2074 self.assertFalse(f5.alive) 2075 2076 f5() # nothing because previously unregistered 2077 res.append('A') 2078 f3() # => res.append('f3') 2079 self.assertFalse(f3.alive) 2080 res.append('B') 2081 f3() # nothing because previously called 2082 res.append('C') 2083 del a 2084 self._collect_if_necessary() 2085 # => res.append('f4') 2086 # => res.append('f2') 2087 # => res.append('f1') 2088 self.assertFalse(f2.alive) 2089 res.append('D') 2090 f2() # nothing because previously called by gc 2091 2092 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 2093 self.assertEqual(res, expected) 2094 2095 def test_all_freed(self): 2096 # we want a weakrefable subclass of weakref.finalize 2097 class MyFinalizer(weakref.finalize): 2098 pass 2099 2100 a = self.A() 2101 res = [] 2102 def callback(): 2103 res.append(123) 2104 f = MyFinalizer(a, callback) 2105 2106 wr_callback = weakref.ref(callback) 2107 wr_f = weakref.ref(f) 2108 del callback, f 2109 2110 self.assertIsNotNone(wr_callback()) 2111 self.assertIsNotNone(wr_f()) 2112 2113 del a 2114 self._collect_if_necessary() 2115 2116 self.assertIsNone(wr_callback()) 2117 self.assertIsNone(wr_f()) 2118 self.assertEqual(res, [123]) 2119 2120 @classmethod 2121 def run_in_child(cls): 2122 def error(): 2123 # Create an atexit finalizer from inside a finalizer called 2124 # at exit. This should be the next to be run. 2125 g1 = weakref.finalize(cls, print, 'g1') 2126 print('f3 error') 2127 1/0 2128 2129 # cls should stay alive till atexit callbacks run 2130 f1 = weakref.finalize(cls, print, 'f1', _global_var) 2131 f2 = weakref.finalize(cls, print, 'f2', _global_var) 2132 f3 = weakref.finalize(cls, error) 2133 f4 = weakref.finalize(cls, print, 'f4', _global_var) 2134 2135 assert f1.atexit == True 2136 f2.atexit = False 2137 assert f3.atexit == True 2138 assert f4.atexit == True 2139 2140 def test_atexit(self): 2141 prog = ('from test.test_weakref import FinalizeTestCase;'+ 2142 'FinalizeTestCase.run_in_child()') 2143 rc, out, err = script_helper.assert_python_ok('-c', prog) 2144 out = out.decode('ascii').splitlines() 2145 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 2146 self.assertTrue(b'ZeroDivisionError' in err) 2147 2148 2149libreftest = """ Doctest for examples in the library reference: weakref.rst 2150 2151>>> from test.support import gc_collect 2152>>> import weakref 2153>>> class Dict(dict): 2154... pass 2155... 2156>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 2157>>> r = weakref.ref(obj) 2158>>> print(r() is obj) 2159True 2160 2161>>> import weakref 2162>>> class Object: 2163... pass 2164... 2165>>> o = Object() 2166>>> r = weakref.ref(o) 2167>>> o2 = r() 2168>>> o is o2 2169True 2170>>> del o, o2 2171>>> gc_collect() # For PyPy or other GCs. 2172>>> print(r()) 2173None 2174 2175>>> import weakref 2176>>> class ExtendedRef(weakref.ref): 2177... def __init__(self, ob, callback=None, **annotations): 2178... super().__init__(ob, callback) 2179... self.__counter = 0 2180... for k, v in annotations.items(): 2181... setattr(self, k, v) 2182... def __call__(self): 2183... '''Return a pair containing the referent and the number of 2184... times the reference has been called. 2185... ''' 2186... ob = super().__call__() 2187... if ob is not None: 2188... self.__counter += 1 2189... ob = (ob, self.__counter) 2190... return ob 2191... 2192>>> class A: # not in docs from here, just testing the ExtendedRef 2193... pass 2194... 2195>>> a = A() 2196>>> r = ExtendedRef(a, foo=1, bar="baz") 2197>>> r.foo 21981 2199>>> r.bar 2200'baz' 2201>>> r()[1] 22021 2203>>> r()[1] 22042 2205>>> r()[0] is a 2206True 2207 2208 2209>>> import weakref 2210>>> _id2obj_dict = weakref.WeakValueDictionary() 2211>>> def remember(obj): 2212... oid = id(obj) 2213... _id2obj_dict[oid] = obj 2214... return oid 2215... 2216>>> def id2obj(oid): 2217... return _id2obj_dict[oid] 2218... 2219>>> a = A() # from here, just testing 2220>>> a_id = remember(a) 2221>>> id2obj(a_id) is a 2222True 2223>>> del a 2224>>> gc_collect() # For PyPy or other GCs. 2225>>> try: 2226... id2obj(a_id) 2227... except KeyError: 2228... print('OK') 2229... else: 2230... print('WeakValueDictionary error') 2231OK 2232 2233""" 2234 2235__test__ = {'libreftest' : libreftest} 2236 2237def load_tests(loader, tests, pattern): 2238 tests.addTest(doctest.DocTestSuite()) 2239 return tests 2240 2241 2242if __name__ == "__main__": 2243 unittest.main() 2244