1"""Thread module emulating a subset of Java's threading model.""" 2 3import os as _os 4import sys as _sys 5import _thread 6 7from time import monotonic as _time 8from traceback import format_exc as _format_exc 9from _weakrefset import WeakSet 10from itertools import islice as _islice, count as _count 11try: 12 from _collections import deque as _deque 13except ImportError: 14 from collections import deque as _deque 15 16# Note regarding PEP 8 compliant names 17# This threading model was originally inspired by Java, and inherited 18# the convention of camelCase function and method names from that 19# language. Those original names are not in any imminent danger of 20# being deprecated (even for Py3k),so this module provides them as an 21# alias for the PEP 8 compliant names 22# Note that using the new PEP 8 compliant names facilitates substitution 23# with the multiprocessing module, which doesn't provide the old 24# Java inspired names. 25 26__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 27 'enumerate', 'main_thread', 'TIMEOUT_MAX', 28 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 29 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 30 'setprofile', 'settrace', 'local', 'stack_size'] 31 32# Rename some stuff so "from threading import *" is safe 33_start_new_thread = _thread.start_new_thread 34_allocate_lock = _thread.allocate_lock 35_set_sentinel = _thread._set_sentinel 36get_ident = _thread.get_ident 37ThreadError = _thread.error 38try: 39 _CRLock = _thread.RLock 40except AttributeError: 41 _CRLock = None 42TIMEOUT_MAX = _thread.TIMEOUT_MAX 43del _thread 44 45 46# Support for profile and trace hooks 47 48_profile_hook = None 49_trace_hook = None 50 51def setprofile(func): 52 """Set a profile function for all threads started from the threading module. 53 54 The func will be passed to sys.setprofile() for each thread, before its 55 run() method is called. 56 57 """ 58 global _profile_hook 59 _profile_hook = func 60 61def settrace(func): 62 """Set a trace function for all threads started from the threading module. 63 64 The func will be passed to sys.settrace() for each thread, before its run() 65 method is called. 66 67 """ 68 global _trace_hook 69 _trace_hook = func 70 71# Synchronization classes 72 73Lock = _allocate_lock 74 75def RLock(*args, **kwargs): 76 """Factory function that returns a new reentrant lock. 77 78 A reentrant lock must be released by the thread that acquired it. Once a 79 thread has acquired a reentrant lock, the same thread may acquire it again 80 without blocking; the thread must release it once for each time it has 81 acquired it. 82 83 """ 84 if _CRLock is None: 85 return _PyRLock(*args, **kwargs) 86 return _CRLock(*args, **kwargs) 87 88class _RLock: 89 """This class implements reentrant lock objects. 90 91 A reentrant lock must be released by the thread that acquired it. Once a 92 thread has acquired a reentrant lock, the same thread may acquire it 93 again without blocking; the thread must release it once for each time it 94 has acquired it. 95 96 """ 97 98 def __init__(self): 99 self._block = _allocate_lock() 100 self._owner = None 101 self._count = 0 102 103 def __repr__(self): 104 owner = self._owner 105 try: 106 owner = _active[owner].name 107 except KeyError: 108 pass 109 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 110 "locked" if self._block.locked() else "unlocked", 111 self.__class__.__module__, 112 self.__class__.__qualname__, 113 owner, 114 self._count, 115 hex(id(self)) 116 ) 117 118 def acquire(self, blocking=True, timeout=-1): 119 """Acquire a lock, blocking or non-blocking. 120 121 When invoked without arguments: if this thread already owns the lock, 122 increment the recursion level by one, and return immediately. Otherwise, 123 if another thread owns the lock, block until the lock is unlocked. Once 124 the lock is unlocked (not owned by any thread), then grab ownership, set 125 the recursion level to one, and return. If more than one thread is 126 blocked waiting until the lock is unlocked, only one at a time will be 127 able to grab ownership of the lock. There is no return value in this 128 case. 129 130 When invoked with the blocking argument set to true, do the same thing 131 as when called without arguments, and return true. 132 133 When invoked with the blocking argument set to false, do not block. If a 134 call without an argument would block, return false immediately; 135 otherwise, do the same thing as when called without arguments, and 136 return true. 137 138 When invoked with the floating-point timeout argument set to a positive 139 value, block for at most the number of seconds specified by timeout 140 and as long as the lock cannot be acquired. Return true if the lock has 141 been acquired, false if the timeout has elapsed. 142 143 """ 144 me = get_ident() 145 if self._owner == me: 146 self._count += 1 147 return 1 148 rc = self._block.acquire(blocking, timeout) 149 if rc: 150 self._owner = me 151 self._count = 1 152 return rc 153 154 __enter__ = acquire 155 156 def release(self): 157 """Release a lock, decrementing the recursion level. 158 159 If after the decrement it is zero, reset the lock to unlocked (not owned 160 by any thread), and if any other threads are blocked waiting for the 161 lock to become unlocked, allow exactly one of them to proceed. If after 162 the decrement the recursion level is still nonzero, the lock remains 163 locked and owned by the calling thread. 164 165 Only call this method when the calling thread owns the lock. A 166 RuntimeError is raised if this method is called when the lock is 167 unlocked. 168 169 There is no return value. 170 171 """ 172 if self._owner != get_ident(): 173 raise RuntimeError("cannot release un-acquired lock") 174 self._count = count = self._count - 1 175 if not count: 176 self._owner = None 177 self._block.release() 178 179 def __exit__(self, t, v, tb): 180 self.release() 181 182 # Internal methods used by condition variables 183 184 def _acquire_restore(self, state): 185 self._block.acquire() 186 self._count, self._owner = state 187 188 def _release_save(self): 189 if self._count == 0: 190 raise RuntimeError("cannot release un-acquired lock") 191 count = self._count 192 self._count = 0 193 owner = self._owner 194 self._owner = None 195 self._block.release() 196 return (count, owner) 197 198 def _is_owned(self): 199 return self._owner == get_ident() 200 201_PyRLock = _RLock 202 203 204class Condition: 205 """Class that implements a condition variable. 206 207 A condition variable allows one or more threads to wait until they are 208 notified by another thread. 209 210 If the lock argument is given and not None, it must be a Lock or RLock 211 object, and it is used as the underlying lock. Otherwise, a new RLock object 212 is created and used as the underlying lock. 213 214 """ 215 216 def __init__(self, lock=None): 217 if lock is None: 218 lock = RLock() 219 self._lock = lock 220 # Export the lock's acquire() and release() methods 221 self.acquire = lock.acquire 222 self.release = lock.release 223 # If the lock defines _release_save() and/or _acquire_restore(), 224 # these override the default implementations (which just call 225 # release() and acquire() on the lock). Ditto for _is_owned(). 226 try: 227 self._release_save = lock._release_save 228 except AttributeError: 229 pass 230 try: 231 self._acquire_restore = lock._acquire_restore 232 except AttributeError: 233 pass 234 try: 235 self._is_owned = lock._is_owned 236 except AttributeError: 237 pass 238 self._waiters = _deque() 239 240 def __enter__(self): 241 return self._lock.__enter__() 242 243 def __exit__(self, *args): 244 return self._lock.__exit__(*args) 245 246 def __repr__(self): 247 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 248 249 def _release_save(self): 250 self._lock.release() # No state to save 251 252 def _acquire_restore(self, x): 253 self._lock.acquire() # Ignore saved state 254 255 def _is_owned(self): 256 # Return True if lock is owned by current_thread. 257 # This method is called only if _lock doesn't have _is_owned(). 258 if self._lock.acquire(0): 259 self._lock.release() 260 return False 261 else: 262 return True 263 264 def wait(self, timeout=None): 265 """Wait until notified or until a timeout occurs. 266 267 If the calling thread has not acquired the lock when this method is 268 called, a RuntimeError is raised. 269 270 This method releases the underlying lock, and then blocks until it is 271 awakened by a notify() or notify_all() call for the same condition 272 variable in another thread, or until the optional timeout occurs. Once 273 awakened or timed out, it re-acquires the lock and returns. 274 275 When the timeout argument is present and not None, it should be a 276 floating point number specifying a timeout for the operation in seconds 277 (or fractions thereof). 278 279 When the underlying lock is an RLock, it is not released using its 280 release() method, since this may not actually unlock the lock when it 281 was acquired multiple times recursively. Instead, an internal interface 282 of the RLock class is used, which really unlocks it even when it has 283 been recursively acquired several times. Another internal interface is 284 then used to restore the recursion level when the lock is reacquired. 285 286 """ 287 if not self._is_owned(): 288 raise RuntimeError("cannot wait on un-acquired lock") 289 waiter = _allocate_lock() 290 waiter.acquire() 291 self._waiters.append(waiter) 292 saved_state = self._release_save() 293 gotit = False 294 try: # restore state no matter what (e.g., KeyboardInterrupt) 295 if timeout is None: 296 waiter.acquire() 297 gotit = True 298 else: 299 if timeout > 0: 300 gotit = waiter.acquire(True, timeout) 301 else: 302 gotit = waiter.acquire(False) 303 return gotit 304 finally: 305 self._acquire_restore(saved_state) 306 if not gotit: 307 try: 308 self._waiters.remove(waiter) 309 except ValueError: 310 pass 311 312 def wait_for(self, predicate, timeout=None): 313 """Wait until a condition evaluates to True. 314 315 predicate should be a callable which result will be interpreted as a 316 boolean value. A timeout may be provided giving the maximum time to 317 wait. 318 319 """ 320 endtime = None 321 waittime = timeout 322 result = predicate() 323 while not result: 324 if waittime is not None: 325 if endtime is None: 326 endtime = _time() + waittime 327 else: 328 waittime = endtime - _time() 329 if waittime <= 0: 330 break 331 self.wait(waittime) 332 result = predicate() 333 return result 334 335 def notify(self, n=1): 336 """Wake up one or more threads waiting on this condition, if any. 337 338 If the calling thread has not acquired the lock when this method is 339 called, a RuntimeError is raised. 340 341 This method wakes up at most n of the threads waiting for the condition 342 variable; it is a no-op if no threads are waiting. 343 344 """ 345 if not self._is_owned(): 346 raise RuntimeError("cannot notify on un-acquired lock") 347 all_waiters = self._waiters 348 waiters_to_notify = _deque(_islice(all_waiters, n)) 349 if not waiters_to_notify: 350 return 351 for waiter in waiters_to_notify: 352 waiter.release() 353 try: 354 all_waiters.remove(waiter) 355 except ValueError: 356 pass 357 358 def notify_all(self): 359 """Wake up all threads waiting on this condition. 360 361 If the calling thread has not acquired the lock when this method 362 is called, a RuntimeError is raised. 363 364 """ 365 self.notify(len(self._waiters)) 366 367 notifyAll = notify_all 368 369 370class Semaphore: 371 """This class implements semaphore objects. 372 373 Semaphores manage a counter representing the number of release() calls minus 374 the number of acquire() calls, plus an initial value. The acquire() method 375 blocks if necessary until it can return without making the counter 376 negative. If not given, value defaults to 1. 377 378 """ 379 380 # After Tim Peters' semaphore class, but not quite the same (no maximum) 381 382 def __init__(self, value=1): 383 if value < 0: 384 raise ValueError("semaphore initial value must be >= 0") 385 self._cond = Condition(Lock()) 386 self._value = value 387 388 def acquire(self, blocking=True, timeout=None): 389 """Acquire a semaphore, decrementing the internal counter by one. 390 391 When invoked without arguments: if the internal counter is larger than 392 zero on entry, decrement it by one and return immediately. If it is zero 393 on entry, block, waiting until some other thread has called release() to 394 make it larger than zero. This is done with proper interlocking so that 395 if multiple acquire() calls are blocked, release() will wake exactly one 396 of them up. The implementation may pick one at random, so the order in 397 which blocked threads are awakened should not be relied on. There is no 398 return value in this case. 399 400 When invoked with blocking set to true, do the same thing as when called 401 without arguments, and return true. 402 403 When invoked with blocking set to false, do not block. If a call without 404 an argument would block, return false immediately; otherwise, do the 405 same thing as when called without arguments, and return true. 406 407 When invoked with a timeout other than None, it will block for at 408 most timeout seconds. If acquire does not complete successfully in 409 that interval, return false. Return true otherwise. 410 411 """ 412 if not blocking and timeout is not None: 413 raise ValueError("can't specify timeout for non-blocking acquire") 414 rc = False 415 endtime = None 416 with self._cond: 417 while self._value == 0: 418 if not blocking: 419 break 420 if timeout is not None: 421 if endtime is None: 422 endtime = _time() + timeout 423 else: 424 timeout = endtime - _time() 425 if timeout <= 0: 426 break 427 self._cond.wait(timeout) 428 else: 429 self._value -= 1 430 rc = True 431 return rc 432 433 __enter__ = acquire 434 435 def release(self): 436 """Release a semaphore, incrementing the internal counter by one. 437 438 When the counter is zero on entry and another thread is waiting for it 439 to become larger than zero again, wake up that thread. 440 441 """ 442 with self._cond: 443 self._value += 1 444 self._cond.notify() 445 446 def __exit__(self, t, v, tb): 447 self.release() 448 449 450class BoundedSemaphore(Semaphore): 451 """Implements a bounded semaphore. 452 453 A bounded semaphore checks to make sure its current value doesn't exceed its 454 initial value. If it does, ValueError is raised. In most situations 455 semaphores are used to guard resources with limited capacity. 456 457 If the semaphore is released too many times it's a sign of a bug. If not 458 given, value defaults to 1. 459 460 Like regular semaphores, bounded semaphores manage a counter representing 461 the number of release() calls minus the number of acquire() calls, plus an 462 initial value. The acquire() method blocks if necessary until it can return 463 without making the counter negative. If not given, value defaults to 1. 464 465 """ 466 467 def __init__(self, value=1): 468 Semaphore.__init__(self, value) 469 self._initial_value = value 470 471 def release(self): 472 """Release a semaphore, incrementing the internal counter by one. 473 474 When the counter is zero on entry and another thread is waiting for it 475 to become larger than zero again, wake up that thread. 476 477 If the number of releases exceeds the number of acquires, 478 raise a ValueError. 479 480 """ 481 with self._cond: 482 if self._value >= self._initial_value: 483 raise ValueError("Semaphore released too many times") 484 self._value += 1 485 self._cond.notify() 486 487 488class Event: 489 """Class implementing event objects. 490 491 Events manage a flag that can be set to true with the set() method and reset 492 to false with the clear() method. The wait() method blocks until the flag is 493 true. The flag is initially false. 494 495 """ 496 497 # After Tim Peters' event class (without is_posted()) 498 499 def __init__(self): 500 self._cond = Condition(Lock()) 501 self._flag = False 502 503 def _reset_internal_locks(self): 504 # private! called by Thread._reset_internal_locks by _after_fork() 505 self._cond.__init__(Lock()) 506 507 def is_set(self): 508 """Return true if and only if the internal flag is true.""" 509 return self._flag 510 511 isSet = is_set 512 513 def set(self): 514 """Set the internal flag to true. 515 516 All threads waiting for it to become true are awakened. Threads 517 that call wait() once the flag is true will not block at all. 518 519 """ 520 with self._cond: 521 self._flag = True 522 self._cond.notify_all() 523 524 def clear(self): 525 """Reset the internal flag to false. 526 527 Subsequently, threads calling wait() will block until set() is called to 528 set the internal flag to true again. 529 530 """ 531 with self._cond: 532 self._flag = False 533 534 def wait(self, timeout=None): 535 """Block until the internal flag is true. 536 537 If the internal flag is true on entry, return immediately. Otherwise, 538 block until another thread calls set() to set the flag to true, or until 539 the optional timeout occurs. 540 541 When the timeout argument is present and not None, it should be a 542 floating point number specifying a timeout for the operation in seconds 543 (or fractions thereof). 544 545 This method returns the internal flag on exit, so it will always return 546 True except if a timeout is given and the operation times out. 547 548 """ 549 with self._cond: 550 signaled = self._flag 551 if not signaled: 552 signaled = self._cond.wait(timeout) 553 return signaled 554 555 556# A barrier class. Inspired in part by the pthread_barrier_* api and 557# the CyclicBarrier class from Java. See 558# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 559# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 560# CyclicBarrier.html 561# for information. 562# We maintain two main states, 'filling' and 'draining' enabling the barrier 563# to be cyclic. Threads are not allowed into it until it has fully drained 564# since the previous cycle. In addition, a 'resetting' state exists which is 565# similar to 'draining' except that threads leave with a BrokenBarrierError, 566# and a 'broken' state in which all threads get the exception. 567class Barrier: 568 """Implements a Barrier. 569 570 Useful for synchronizing a fixed number of threads at known synchronization 571 points. Threads block on 'wait()' and are simultaneously awoken once they 572 have all made that call. 573 574 """ 575 576 def __init__(self, parties, action=None, timeout=None): 577 """Create a barrier, initialised to 'parties' threads. 578 579 'action' is a callable which, when supplied, will be called by one of 580 the threads after they have all entered the barrier and just prior to 581 releasing them all. If a 'timeout' is provided, it is used as the 582 default for all subsequent 'wait()' calls. 583 584 """ 585 self._cond = Condition(Lock()) 586 self._action = action 587 self._timeout = timeout 588 self._parties = parties 589 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 590 self._count = 0 591 592 def wait(self, timeout=None): 593 """Wait for the barrier. 594 595 When the specified number of threads have started waiting, they are all 596 simultaneously awoken. If an 'action' was provided for the barrier, one 597 of the threads will have executed that callback prior to returning. 598 Returns an individual index number from 0 to 'parties-1'. 599 600 """ 601 if timeout is None: 602 timeout = self._timeout 603 with self._cond: 604 self._enter() # Block while the barrier drains. 605 index = self._count 606 self._count += 1 607 try: 608 if index + 1 == self._parties: 609 # We release the barrier 610 self._release() 611 else: 612 # We wait until someone releases us 613 self._wait(timeout) 614 return index 615 finally: 616 self._count -= 1 617 # Wake up any threads waiting for barrier to drain. 618 self._exit() 619 620 # Block until the barrier is ready for us, or raise an exception 621 # if it is broken. 622 def _enter(self): 623 while self._state in (-1, 1): 624 # It is draining or resetting, wait until done 625 self._cond.wait() 626 #see if the barrier is in a broken state 627 if self._state < 0: 628 raise BrokenBarrierError 629 assert self._state == 0 630 631 # Optionally run the 'action' and release the threads waiting 632 # in the barrier. 633 def _release(self): 634 try: 635 if self._action: 636 self._action() 637 # enter draining state 638 self._state = 1 639 self._cond.notify_all() 640 except: 641 #an exception during the _action handler. Break and reraise 642 self._break() 643 raise 644 645 # Wait in the barrier until we are released. Raise an exception 646 # if the barrier is reset or broken. 647 def _wait(self, timeout): 648 if not self._cond.wait_for(lambda : self._state != 0, timeout): 649 #timed out. Break the barrier 650 self._break() 651 raise BrokenBarrierError 652 if self._state < 0: 653 raise BrokenBarrierError 654 assert self._state == 1 655 656 # If we are the last thread to exit the barrier, signal any threads 657 # waiting for the barrier to drain. 658 def _exit(self): 659 if self._count == 0: 660 if self._state in (-1, 1): 661 #resetting or draining 662 self._state = 0 663 self._cond.notify_all() 664 665 def reset(self): 666 """Reset the barrier to the initial state. 667 668 Any threads currently waiting will get the BrokenBarrier exception 669 raised. 670 671 """ 672 with self._cond: 673 if self._count > 0: 674 if self._state == 0: 675 #reset the barrier, waking up threads 676 self._state = -1 677 elif self._state == -2: 678 #was broken, set it to reset state 679 #which clears when the last thread exits 680 self._state = -1 681 else: 682 self._state = 0 683 self._cond.notify_all() 684 685 def abort(self): 686 """Place the barrier into a 'broken' state. 687 688 Useful in case of error. Any currently waiting threads and threads 689 attempting to 'wait()' will have BrokenBarrierError raised. 690 691 """ 692 with self._cond: 693 self._break() 694 695 def _break(self): 696 # An internal error was detected. The barrier is set to 697 # a broken state all parties awakened. 698 self._state = -2 699 self._cond.notify_all() 700 701 @property 702 def parties(self): 703 """Return the number of threads required to trip the barrier.""" 704 return self._parties 705 706 @property 707 def n_waiting(self): 708 """Return the number of threads currently waiting at the barrier.""" 709 # We don't need synchronization here since this is an ephemeral result 710 # anyway. It returns the correct value in the steady state. 711 if self._state == 0: 712 return self._count 713 return 0 714 715 @property 716 def broken(self): 717 """Return True if the barrier is in a broken state.""" 718 return self._state == -2 719 720# exception raised by the Barrier class 721class BrokenBarrierError(RuntimeError): 722 pass 723 724 725# Helper to generate new thread names 726_counter = _count().__next__ 727_counter() # Consume 0 so first non-main thread has id 1. 728def _newname(template="Thread-%d"): 729 return template % _counter() 730 731# Active thread administration 732_active_limbo_lock = _allocate_lock() 733_active = {} # maps thread id to Thread object 734_limbo = {} 735_dangling = WeakSet() 736# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() 737# to wait until all Python thread states get deleted: 738# see Thread._set_tstate_lock(). 739_shutdown_locks_lock = _allocate_lock() 740_shutdown_locks = set() 741 742# Main class for threads 743 744class Thread: 745 """A class that represents a thread of control. 746 747 This class can be safely subclassed in a limited fashion. There are two ways 748 to specify the activity: by passing a callable object to the constructor, or 749 by overriding the run() method in a subclass. 750 751 """ 752 753 _initialized = False 754 # Need to store a reference to sys.exc_info for printing 755 # out exceptions when a thread tries to use a global var. during interp. 756 # shutdown and thus raises an exception about trying to perform some 757 # operation on/with a NoneType 758 _exc_info = _sys.exc_info 759 # Keep sys.exc_clear too to clear the exception just before 760 # allowing .join() to return. 761 #XXX __exc_clear = _sys.exc_clear 762 763 def __init__(self, group=None, target=None, name=None, 764 args=(), kwargs=None, *, daemon=None): 765 """This constructor should always be called with keyword arguments. Arguments are: 766 767 *group* should be None; reserved for future extension when a ThreadGroup 768 class is implemented. 769 770 *target* is the callable object to be invoked by the run() 771 method. Defaults to None, meaning nothing is called. 772 773 *name* is the thread name. By default, a unique name is constructed of 774 the form "Thread-N" where N is a small decimal number. 775 776 *args* is the argument tuple for the target invocation. Defaults to (). 777 778 *kwargs* is a dictionary of keyword arguments for the target 779 invocation. Defaults to {}. 780 781 If a subclass overrides the constructor, it must make sure to invoke 782 the base class constructor (Thread.__init__()) before doing anything 783 else to the thread. 784 785 """ 786 assert group is None, "group argument must be None for now" 787 if kwargs is None: 788 kwargs = {} 789 self._target = target 790 self._name = str(name or _newname()) 791 self._args = args 792 self._kwargs = kwargs 793 if daemon is not None: 794 self._daemonic = daemon 795 else: 796 self._daemonic = current_thread().daemon 797 self._ident = None 798 self._tstate_lock = None 799 self._started = Event() 800 self._is_stopped = False 801 self._initialized = True 802 # sys.stderr is not stored in the class like 803 # sys.exc_info since it can be changed between instances 804 self._stderr = _sys.stderr 805 # For debugging and _after_fork() 806 _dangling.add(self) 807 808 def _reset_internal_locks(self, is_alive): 809 # private! Called by _after_fork() to reset our internal locks as 810 # they may be in an invalid state leading to a deadlock or crash. 811 self._started._reset_internal_locks() 812 if is_alive: 813 self._set_tstate_lock() 814 else: 815 # The thread isn't alive after fork: it doesn't have a tstate 816 # anymore. 817 self._is_stopped = True 818 self._tstate_lock = None 819 820 def __repr__(self): 821 assert self._initialized, "Thread.__init__() was not called" 822 status = "initial" 823 if self._started.is_set(): 824 status = "started" 825 self.is_alive() # easy way to get ._is_stopped set when appropriate 826 if self._is_stopped: 827 status = "stopped" 828 if self._daemonic: 829 status += " daemon" 830 if self._ident is not None: 831 status += " %s" % self._ident 832 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 833 834 def start(self): 835 """Start the thread's activity. 836 837 It must be called at most once per thread object. It arranges for the 838 object's run() method to be invoked in a separate thread of control. 839 840 This method will raise a RuntimeError if called more than once on the 841 same thread object. 842 843 """ 844 if not self._initialized: 845 raise RuntimeError("thread.__init__() not called") 846 847 if self._started.is_set(): 848 raise RuntimeError("threads can only be started once") 849 with _active_limbo_lock: 850 _limbo[self] = self 851 try: 852 _start_new_thread(self._bootstrap, ()) 853 except Exception: 854 with _active_limbo_lock: 855 del _limbo[self] 856 raise 857 self._started.wait() 858 859 def run(self): 860 """Method representing the thread's activity. 861 862 You may override this method in a subclass. The standard run() method 863 invokes the callable object passed to the object's constructor as the 864 target argument, if any, with sequential and keyword arguments taken 865 from the args and kwargs arguments, respectively. 866 867 """ 868 try: 869 if self._target: 870 self._target(*self._args, **self._kwargs) 871 finally: 872 # Avoid a refcycle if the thread is running a function with 873 # an argument that has a member that points to the thread. 874 del self._target, self._args, self._kwargs 875 876 def _bootstrap(self): 877 # Wrapper around the real bootstrap code that ignores 878 # exceptions during interpreter cleanup. Those typically 879 # happen when a daemon thread wakes up at an unfortunate 880 # moment, finds the world around it destroyed, and raises some 881 # random exception *** while trying to report the exception in 882 # _bootstrap_inner() below ***. Those random exceptions 883 # don't help anybody, and they confuse users, so we suppress 884 # them. We suppress them only when it appears that the world 885 # indeed has already been destroyed, so that exceptions in 886 # _bootstrap_inner() during normal business hours are properly 887 # reported. Also, we only suppress them for daemonic threads; 888 # if a non-daemonic encounters this, something else is wrong. 889 try: 890 self._bootstrap_inner() 891 except: 892 if self._daemonic and _sys is None: 893 return 894 raise 895 896 def _set_ident(self): 897 self._ident = get_ident() 898 899 def _set_tstate_lock(self): 900 """ 901 Set a lock object which will be released by the interpreter when 902 the underlying thread state (see pystate.h) gets deleted. 903 """ 904 self._tstate_lock = _set_sentinel() 905 self._tstate_lock.acquire() 906 907 if not self.daemon: 908 with _shutdown_locks_lock: 909 _shutdown_locks.add(self._tstate_lock) 910 911 def _bootstrap_inner(self): 912 try: 913 self._set_ident() 914 self._set_tstate_lock() 915 self._started.set() 916 with _active_limbo_lock: 917 _active[self._ident] = self 918 del _limbo[self] 919 920 if _trace_hook: 921 _sys.settrace(_trace_hook) 922 if _profile_hook: 923 _sys.setprofile(_profile_hook) 924 925 try: 926 self.run() 927 except SystemExit: 928 pass 929 except: 930 # If sys.stderr is no more (most likely from interpreter 931 # shutdown) use self._stderr. Otherwise still use sys (as in 932 # _sys) in case sys.stderr was redefined since the creation of 933 # self. 934 if _sys and _sys.stderr is not None: 935 print("Exception in thread %s:\n%s" % 936 (self.name, _format_exc()), file=_sys.stderr) 937 elif self._stderr is not None: 938 # Do the best job possible w/o a huge amt. of code to 939 # approximate a traceback (code ideas from 940 # Lib/traceback.py) 941 exc_type, exc_value, exc_tb = self._exc_info() 942 try: 943 print(( 944 "Exception in thread " + self.name + 945 " (most likely raised during interpreter shutdown):"), file=self._stderr) 946 print(( 947 "Traceback (most recent call last):"), file=self._stderr) 948 while exc_tb: 949 print(( 950 ' File "%s", line %s, in %s' % 951 (exc_tb.tb_frame.f_code.co_filename, 952 exc_tb.tb_lineno, 953 exc_tb.tb_frame.f_code.co_name)), file=self._stderr) 954 exc_tb = exc_tb.tb_next 955 print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) 956 self._stderr.flush() 957 # Make sure that exc_tb gets deleted since it is a memory 958 # hog; deleting everything else is just for thoroughness 959 finally: 960 del exc_type, exc_value, exc_tb 961 finally: 962 # Prevent a race in 963 # test_threading.test_no_refcycle_through_target when 964 # the exception keeps the target alive past when we 965 # assert that it's dead. 966 #XXX self._exc_clear() 967 pass 968 finally: 969 with _active_limbo_lock: 970 try: 971 # We don't call self._delete() because it also 972 # grabs _active_limbo_lock. 973 del _active[get_ident()] 974 except: 975 pass 976 977 def _stop(self): 978 # After calling ._stop(), .is_alive() returns False and .join() returns 979 # immediately. ._tstate_lock must be released before calling ._stop(). 980 # 981 # Normal case: C code at the end of the thread's life 982 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 983 # that's detected by our ._wait_for_tstate_lock(), called by .join() 984 # and .is_alive(). Any number of threads _may_ call ._stop() 985 # simultaneously (for example, if multiple threads are blocked in 986 # .join() calls), and they're not serialized. That's harmless - 987 # they'll just make redundant rebindings of ._is_stopped and 988 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 989 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 990 # (the assert is executed only if ._tstate_lock is None). 991 # 992 # Special case: _main_thread releases ._tstate_lock via this 993 # module's _shutdown() function. 994 lock = self._tstate_lock 995 if lock is not None: 996 assert not lock.locked() 997 self._is_stopped = True 998 self._tstate_lock = None 999 if not self.daemon: 1000 with _shutdown_locks_lock: 1001 _shutdown_locks.discard(lock) 1002 1003 def _delete(self): 1004 "Remove current thread from the dict of currently running threads." 1005 with _active_limbo_lock: 1006 del _active[get_ident()] 1007 # There must not be any python code between the previous line 1008 # and after the lock is released. Otherwise a tracing function 1009 # could try to acquire the lock again in the same thread, (in 1010 # current_thread()), and would block. 1011 1012 def join(self, timeout=None): 1013 """Wait until the thread terminates. 1014 1015 This blocks the calling thread until the thread whose join() method is 1016 called terminates -- either normally or through an unhandled exception 1017 or until the optional timeout occurs. 1018 1019 When the timeout argument is present and not None, it should be a 1020 floating point number specifying a timeout for the operation in seconds 1021 (or fractions thereof). As join() always returns None, you must call 1022 is_alive() after join() to decide whether a timeout happened -- if the 1023 thread is still alive, the join() call timed out. 1024 1025 When the timeout argument is not present or None, the operation will 1026 block until the thread terminates. 1027 1028 A thread can be join()ed many times. 1029 1030 join() raises a RuntimeError if an attempt is made to join the current 1031 thread as that would cause a deadlock. It is also an error to join() a 1032 thread before it has been started and attempts to do so raises the same 1033 exception. 1034 1035 """ 1036 if not self._initialized: 1037 raise RuntimeError("Thread.__init__() not called") 1038 if not self._started.is_set(): 1039 raise RuntimeError("cannot join thread before it is started") 1040 if self is current_thread(): 1041 raise RuntimeError("cannot join current thread") 1042 1043 if timeout is None: 1044 self._wait_for_tstate_lock() 1045 else: 1046 # the behavior of a negative timeout isn't documented, but 1047 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1048 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1049 1050 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1051 # Issue #18808: wait for the thread state to be gone. 1052 # At the end of the thread's life, after all knowledge of the thread 1053 # is removed from C data structures, C code releases our _tstate_lock. 1054 # This method passes its arguments to _tstate_lock.acquire(). 1055 # If the lock is acquired, the C code is done, and self._stop() is 1056 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1057 lock = self._tstate_lock 1058 if lock is None: # already determined that the C code is done 1059 assert self._is_stopped 1060 elif lock.acquire(block, timeout): 1061 lock.release() 1062 self._stop() 1063 1064 @property 1065 def name(self): 1066 """A string used for identification purposes only. 1067 1068 It has no semantics. Multiple threads may be given the same name. The 1069 initial name is set by the constructor. 1070 1071 """ 1072 assert self._initialized, "Thread.__init__() not called" 1073 return self._name 1074 1075 @name.setter 1076 def name(self, name): 1077 assert self._initialized, "Thread.__init__() not called" 1078 self._name = str(name) 1079 1080 @property 1081 def ident(self): 1082 """Thread identifier of this thread or None if it has not been started. 1083 1084 This is a nonzero integer. See the get_ident() function. Thread 1085 identifiers may be recycled when a thread exits and another thread is 1086 created. The identifier is available even after the thread has exited. 1087 1088 """ 1089 assert self._initialized, "Thread.__init__() not called" 1090 return self._ident 1091 1092 def is_alive(self): 1093 """Return whether the thread is alive. 1094 1095 This method returns True just before the run() method starts until just 1096 after the run() method terminates. The module function enumerate() 1097 returns a list of all alive threads. 1098 1099 """ 1100 assert self._initialized, "Thread.__init__() not called" 1101 if self._is_stopped or not self._started.is_set(): 1102 return False 1103 self._wait_for_tstate_lock(False) 1104 return not self._is_stopped 1105 1106 def isAlive(self): 1107 """Return whether the thread is alive. 1108 1109 This method is deprecated, use is_alive() instead. 1110 """ 1111 import warnings 1112 warnings.warn('isAlive() is deprecated, use is_alive() instead', 1113 PendingDeprecationWarning, stacklevel=2) 1114 return self.is_alive() 1115 1116 @property 1117 def daemon(self): 1118 """A boolean value indicating whether this thread is a daemon thread. 1119 1120 This must be set before start() is called, otherwise RuntimeError is 1121 raised. Its initial value is inherited from the creating thread; the 1122 main thread is not a daemon thread and therefore all threads created in 1123 the main thread default to daemon = False. 1124 1125 The entire Python program exits when only daemon threads are left. 1126 1127 """ 1128 assert self._initialized, "Thread.__init__() not called" 1129 return self._daemonic 1130 1131 @daemon.setter 1132 def daemon(self, daemonic): 1133 if not self._initialized: 1134 raise RuntimeError("Thread.__init__() not called") 1135 if self._started.is_set(): 1136 raise RuntimeError("cannot set daemon status of active thread") 1137 self._daemonic = daemonic 1138 1139 def isDaemon(self): 1140 return self.daemon 1141 1142 def setDaemon(self, daemonic): 1143 self.daemon = daemonic 1144 1145 def getName(self): 1146 return self.name 1147 1148 def setName(self, name): 1149 self.name = name 1150 1151# The timer class was contributed by Itamar Shtull-Trauring 1152 1153class Timer(Thread): 1154 """Call a function after a specified number of seconds: 1155 1156 t = Timer(30.0, f, args=None, kwargs=None) 1157 t.start() 1158 t.cancel() # stop the timer's action if it's still waiting 1159 1160 """ 1161 1162 def __init__(self, interval, function, args=None, kwargs=None): 1163 Thread.__init__(self) 1164 self.interval = interval 1165 self.function = function 1166 self.args = args if args is not None else [] 1167 self.kwargs = kwargs if kwargs is not None else {} 1168 self.finished = Event() 1169 1170 def cancel(self): 1171 """Stop the timer if it hasn't finished yet.""" 1172 self.finished.set() 1173 1174 def run(self): 1175 self.finished.wait(self.interval) 1176 if not self.finished.is_set(): 1177 self.function(*self.args, **self.kwargs) 1178 self.finished.set() 1179 1180 1181# Special thread class to represent the main thread 1182 1183class _MainThread(Thread): 1184 1185 def __init__(self): 1186 Thread.__init__(self, name="MainThread", daemon=False) 1187 self._set_tstate_lock() 1188 self._started.set() 1189 self._set_ident() 1190 with _active_limbo_lock: 1191 _active[self._ident] = self 1192 1193 1194# Dummy thread class to represent threads not started here. 1195# These aren't garbage collected when they die, nor can they be waited for. 1196# If they invoke anything in threading.py that calls current_thread(), they 1197# leave an entry in the _active dict forever after. 1198# Their purpose is to return *something* from current_thread(). 1199# They are marked as daemon threads so we won't wait for them 1200# when we exit (conform previous semantics). 1201 1202class _DummyThread(Thread): 1203 1204 def __init__(self): 1205 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1206 1207 self._started.set() 1208 self._set_ident() 1209 with _active_limbo_lock: 1210 _active[self._ident] = self 1211 1212 def _stop(self): 1213 pass 1214 1215 def is_alive(self): 1216 assert not self._is_stopped and self._started.is_set() 1217 return True 1218 1219 def join(self, timeout=None): 1220 assert False, "cannot join a dummy thread" 1221 1222 1223# Global API functions 1224 1225def current_thread(): 1226 """Return the current Thread object, corresponding to the caller's thread of control. 1227 1228 If the caller's thread of control was not created through the threading 1229 module, a dummy thread object with limited functionality is returned. 1230 1231 """ 1232 try: 1233 return _active[get_ident()] 1234 except KeyError: 1235 return _DummyThread() 1236 1237currentThread = current_thread 1238 1239def active_count(): 1240 """Return the number of Thread objects currently alive. 1241 1242 The returned count is equal to the length of the list returned by 1243 enumerate(). 1244 1245 """ 1246 with _active_limbo_lock: 1247 return len(_active) + len(_limbo) 1248 1249activeCount = active_count 1250 1251def _enumerate(): 1252 # Same as enumerate(), but without the lock. Internal use only. 1253 return list(_active.values()) + list(_limbo.values()) 1254 1255def enumerate(): 1256 """Return a list of all Thread objects currently alive. 1257 1258 The list includes daemonic threads, dummy thread objects created by 1259 current_thread(), and the main thread. It excludes terminated threads and 1260 threads that have not yet been started. 1261 1262 """ 1263 with _active_limbo_lock: 1264 return list(_active.values()) + list(_limbo.values()) 1265 1266from _thread import stack_size 1267 1268# Create the main thread object, 1269# and make it available for the interpreter 1270# (Py_Main) as threading._shutdown. 1271 1272_main_thread = _MainThread() 1273 1274def _shutdown(): 1275 """ 1276 Wait until the Python thread state of all non-daemon threads get deleted. 1277 """ 1278 # Obscure: other threads may be waiting to join _main_thread. That's 1279 # dubious, but some code does it. We can't wait for C code to release 1280 # the main thread's tstate_lock - that won't happen until the interpreter 1281 # is nearly dead. So we release it here. Note that just calling _stop() 1282 # isn't enough: other threads may already be waiting on _tstate_lock. 1283 if _main_thread._is_stopped: 1284 # _shutdown() was already called 1285 return 1286 1287 # Main thread 1288 tlock = _main_thread._tstate_lock 1289 # The main thread isn't finished yet, so its thread state lock can't have 1290 # been released. 1291 assert tlock is not None 1292 assert tlock.locked() 1293 tlock.release() 1294 _main_thread._stop() 1295 1296 # Join all non-deamon threads 1297 while True: 1298 with _shutdown_locks_lock: 1299 locks = list(_shutdown_locks) 1300 _shutdown_locks.clear() 1301 1302 if not locks: 1303 break 1304 1305 for lock in locks: 1306 # mimick Thread.join() 1307 lock.acquire() 1308 lock.release() 1309 1310 # new threads can be spawned while we were waiting for the other 1311 # threads to complete 1312 1313 1314def main_thread(): 1315 """Return the main thread object. 1316 1317 In normal conditions, the main thread is the thread from which the 1318 Python interpreter was started. 1319 """ 1320 return _main_thread 1321 1322# get thread-local implementation, either from the thread 1323# module, or from the python fallback 1324 1325try: 1326 from _thread import _local as local 1327except ImportError: 1328 from _threading_local import local 1329 1330 1331def _after_fork(): 1332 """ 1333 Cleanup threading module state that should not exist after a fork. 1334 """ 1335 # Reset _active_limbo_lock, in case we forked while the lock was held 1336 # by another (non-forked) thread. http://bugs.python.org/issue874900 1337 global _active_limbo_lock, _main_thread 1338 global _shutdown_locks_lock, _shutdown_locks 1339 _active_limbo_lock = _allocate_lock() 1340 1341 # fork() only copied the current thread; clear references to others. 1342 new_active = {} 1343 1344 try: 1345 current = _active[get_ident()] 1346 except KeyError: 1347 # fork() was called in a thread which was not spawned 1348 # by threading.Thread. For example, a thread spawned 1349 # by thread.start_new_thread(). 1350 current = _MainThread() 1351 1352 _main_thread = current 1353 1354 # reset _shutdown() locks: threads re-register their _tstate_lock below 1355 _shutdown_locks_lock = _allocate_lock() 1356 _shutdown_locks = set() 1357 1358 with _active_limbo_lock: 1359 # Dangling thread instances must still have their locks reset, 1360 # because someone may join() them. 1361 threads = set(_enumerate()) 1362 threads.update(_dangling) 1363 for thread in threads: 1364 # Any lock/condition variable may be currently locked or in an 1365 # invalid state, so we reinitialize them. 1366 if thread is current: 1367 # There is only one active thread. We reset the ident to 1368 # its new value since it can have changed. 1369 thread._reset_internal_locks(True) 1370 ident = get_ident() 1371 thread._ident = ident 1372 new_active[ident] = thread 1373 else: 1374 # All the others are already stopped. 1375 thread._reset_internal_locks(False) 1376 thread._stop() 1377 1378 _limbo.clear() 1379 _active.clear() 1380 _active.update(new_active) 1381 assert len(_active) == 1 1382 1383 1384if hasattr(_os, "register_at_fork"): 1385 _os.register_at_fork(after_in_child=_after_fork) 1386