1"""Thread module emulating a subset of Java's threading model.""" 2 3import os as _os 4import sys as _sys 5import _thread 6 7from time import monotonic as _time 8from _weakrefset import WeakSet 9from itertools import islice as _islice, count as _count 10try: 11 from _collections import deque as _deque 12except ImportError: 13 from collections import deque as _deque 14 15# Note regarding PEP 8 compliant names 16# This threading model was originally inspired by Java, and inherited 17# the convention of camelCase function and method names from that 18# language. Those original names are not in any imminent danger of 19# being deprecated (even for Py3k),so this module provides them as an 20# alias for the PEP 8 compliant names 21# Note that using the new PEP 8 compliant names facilitates substitution 22# with the multiprocessing module, which doesn't provide the old 23# Java inspired names. 24 25__all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 26 'enumerate', 'main_thread', 'TIMEOUT_MAX', 27 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 28 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 29 'setprofile', 'settrace', 'local', 'stack_size', 30 'excepthook', 'ExceptHookArgs'] 31 32# Rename some stuff so "from threading import *" is safe 33_start_new_thread = _thread.start_new_thread 34_allocate_lock = _thread.allocate_lock 35_set_sentinel = _thread._set_sentinel 36get_ident = _thread.get_ident 37try: 38 get_native_id = _thread.get_native_id 39 _HAVE_THREAD_NATIVE_ID = True 40 __all__.append('get_native_id') 41except AttributeError: 42 _HAVE_THREAD_NATIVE_ID = False 43ThreadError = _thread.error 44try: 45 _CRLock = _thread.RLock 46except AttributeError: 47 _CRLock = None 48TIMEOUT_MAX = _thread.TIMEOUT_MAX 49del _thread 50 51 52# Support for profile and trace hooks 53 54_profile_hook = None 55_trace_hook = None 56 57def setprofile(func): 58 """Set a profile function for all threads started from the threading module. 59 60 The func will be passed to sys.setprofile() for each thread, before its 61 run() method is called. 62 63 """ 64 global _profile_hook 65 _profile_hook = func 66 67def settrace(func): 68 """Set a trace function for all threads started from the threading module. 69 70 The func will be passed to sys.settrace() for each thread, before its run() 71 method is called. 72 73 """ 74 global _trace_hook 75 _trace_hook = func 76 77# Synchronization classes 78 79Lock = _allocate_lock 80 81def RLock(*args, **kwargs): 82 """Factory function that returns a new reentrant lock. 83 84 A reentrant lock must be released by the thread that acquired it. Once a 85 thread has acquired a reentrant lock, the same thread may acquire it again 86 without blocking; the thread must release it once for each time it has 87 acquired it. 88 89 """ 90 if _CRLock is None: 91 return _PyRLock(*args, **kwargs) 92 return _CRLock(*args, **kwargs) 93 94class _RLock: 95 """This class implements reentrant lock objects. 96 97 A reentrant lock must be released by the thread that acquired it. Once a 98 thread has acquired a reentrant lock, the same thread may acquire it 99 again without blocking; the thread must release it once for each time it 100 has acquired it. 101 102 """ 103 104 def __init__(self): 105 self._block = _allocate_lock() 106 self._owner = None 107 self._count = 0 108 109 def __repr__(self): 110 owner = self._owner 111 try: 112 owner = _active[owner].name 113 except KeyError: 114 pass 115 return "<%s %s.%s object owner=%r count=%d at %s>" % ( 116 "locked" if self._block.locked() else "unlocked", 117 self.__class__.__module__, 118 self.__class__.__qualname__, 119 owner, 120 self._count, 121 hex(id(self)) 122 ) 123 124 def acquire(self, blocking=True, timeout=-1): 125 """Acquire a lock, blocking or non-blocking. 126 127 When invoked without arguments: if this thread already owns the lock, 128 increment the recursion level by one, and return immediately. Otherwise, 129 if another thread owns the lock, block until the lock is unlocked. Once 130 the lock is unlocked (not owned by any thread), then grab ownership, set 131 the recursion level to one, and return. If more than one thread is 132 blocked waiting until the lock is unlocked, only one at a time will be 133 able to grab ownership of the lock. There is no return value in this 134 case. 135 136 When invoked with the blocking argument set to true, do the same thing 137 as when called without arguments, and return true. 138 139 When invoked with the blocking argument set to false, do not block. If a 140 call without an argument would block, return false immediately; 141 otherwise, do the same thing as when called without arguments, and 142 return true. 143 144 When invoked with the floating-point timeout argument set to a positive 145 value, block for at most the number of seconds specified by timeout 146 and as long as the lock cannot be acquired. Return true if the lock has 147 been acquired, false if the timeout has elapsed. 148 149 """ 150 me = get_ident() 151 if self._owner == me: 152 self._count += 1 153 return 1 154 rc = self._block.acquire(blocking, timeout) 155 if rc: 156 self._owner = me 157 self._count = 1 158 return rc 159 160 __enter__ = acquire 161 162 def release(self): 163 """Release a lock, decrementing the recursion level. 164 165 If after the decrement it is zero, reset the lock to unlocked (not owned 166 by any thread), and if any other threads are blocked waiting for the 167 lock to become unlocked, allow exactly one of them to proceed. If after 168 the decrement the recursion level is still nonzero, the lock remains 169 locked and owned by the calling thread. 170 171 Only call this method when the calling thread owns the lock. A 172 RuntimeError is raised if this method is called when the lock is 173 unlocked. 174 175 There is no return value. 176 177 """ 178 if self._owner != get_ident(): 179 raise RuntimeError("cannot release un-acquired lock") 180 self._count = count = self._count - 1 181 if not count: 182 self._owner = None 183 self._block.release() 184 185 def __exit__(self, t, v, tb): 186 self.release() 187 188 # Internal methods used by condition variables 189 190 def _acquire_restore(self, state): 191 self._block.acquire() 192 self._count, self._owner = state 193 194 def _release_save(self): 195 if self._count == 0: 196 raise RuntimeError("cannot release un-acquired lock") 197 count = self._count 198 self._count = 0 199 owner = self._owner 200 self._owner = None 201 self._block.release() 202 return (count, owner) 203 204 def _is_owned(self): 205 return self._owner == get_ident() 206 207_PyRLock = _RLock 208 209 210class Condition: 211 """Class that implements a condition variable. 212 213 A condition variable allows one or more threads to wait until they are 214 notified by another thread. 215 216 If the lock argument is given and not None, it must be a Lock or RLock 217 object, and it is used as the underlying lock. Otherwise, a new RLock object 218 is created and used as the underlying lock. 219 220 """ 221 222 def __init__(self, lock=None): 223 if lock is None: 224 lock = RLock() 225 self._lock = lock 226 # Export the lock's acquire() and release() methods 227 self.acquire = lock.acquire 228 self.release = lock.release 229 # If the lock defines _release_save() and/or _acquire_restore(), 230 # these override the default implementations (which just call 231 # release() and acquire() on the lock). Ditto for _is_owned(). 232 try: 233 self._release_save = lock._release_save 234 except AttributeError: 235 pass 236 try: 237 self._acquire_restore = lock._acquire_restore 238 except AttributeError: 239 pass 240 try: 241 self._is_owned = lock._is_owned 242 except AttributeError: 243 pass 244 self._waiters = _deque() 245 246 def __enter__(self): 247 return self._lock.__enter__() 248 249 def __exit__(self, *args): 250 return self._lock.__exit__(*args) 251 252 def __repr__(self): 253 return "<Condition(%s, %d)>" % (self._lock, len(self._waiters)) 254 255 def _release_save(self): 256 self._lock.release() # No state to save 257 258 def _acquire_restore(self, x): 259 self._lock.acquire() # Ignore saved state 260 261 def _is_owned(self): 262 # Return True if lock is owned by current_thread. 263 # This method is called only if _lock doesn't have _is_owned(). 264 if self._lock.acquire(0): 265 self._lock.release() 266 return False 267 else: 268 return True 269 270 def wait(self, timeout=None): 271 """Wait until notified or until a timeout occurs. 272 273 If the calling thread has not acquired the lock when this method is 274 called, a RuntimeError is raised. 275 276 This method releases the underlying lock, and then blocks until it is 277 awakened by a notify() or notify_all() call for the same condition 278 variable in another thread, or until the optional timeout occurs. Once 279 awakened or timed out, it re-acquires the lock and returns. 280 281 When the timeout argument is present and not None, it should be a 282 floating point number specifying a timeout for the operation in seconds 283 (or fractions thereof). 284 285 When the underlying lock is an RLock, it is not released using its 286 release() method, since this may not actually unlock the lock when it 287 was acquired multiple times recursively. Instead, an internal interface 288 of the RLock class is used, which really unlocks it even when it has 289 been recursively acquired several times. Another internal interface is 290 then used to restore the recursion level when the lock is reacquired. 291 292 """ 293 if not self._is_owned(): 294 raise RuntimeError("cannot wait on un-acquired lock") 295 waiter = _allocate_lock() 296 waiter.acquire() 297 self._waiters.append(waiter) 298 saved_state = self._release_save() 299 gotit = False 300 try: # restore state no matter what (e.g., KeyboardInterrupt) 301 if timeout is None: 302 waiter.acquire() 303 gotit = True 304 else: 305 if timeout > 0: 306 gotit = waiter.acquire(True, timeout) 307 else: 308 gotit = waiter.acquire(False) 309 return gotit 310 finally: 311 self._acquire_restore(saved_state) 312 if not gotit: 313 try: 314 self._waiters.remove(waiter) 315 except ValueError: 316 pass 317 318 def wait_for(self, predicate, timeout=None): 319 """Wait until a condition evaluates to True. 320 321 predicate should be a callable which result will be interpreted as a 322 boolean value. A timeout may be provided giving the maximum time to 323 wait. 324 325 """ 326 endtime = None 327 waittime = timeout 328 result = predicate() 329 while not result: 330 if waittime is not None: 331 if endtime is None: 332 endtime = _time() + waittime 333 else: 334 waittime = endtime - _time() 335 if waittime <= 0: 336 break 337 self.wait(waittime) 338 result = predicate() 339 return result 340 341 def notify(self, n=1): 342 """Wake up one or more threads waiting on this condition, if any. 343 344 If the calling thread has not acquired the lock when this method is 345 called, a RuntimeError is raised. 346 347 This method wakes up at most n of the threads waiting for the condition 348 variable; it is a no-op if no threads are waiting. 349 350 """ 351 if not self._is_owned(): 352 raise RuntimeError("cannot notify on un-acquired lock") 353 all_waiters = self._waiters 354 waiters_to_notify = _deque(_islice(all_waiters, n)) 355 if not waiters_to_notify: 356 return 357 for waiter in waiters_to_notify: 358 waiter.release() 359 try: 360 all_waiters.remove(waiter) 361 except ValueError: 362 pass 363 364 def notify_all(self): 365 """Wake up all threads waiting on this condition. 366 367 If the calling thread has not acquired the lock when this method 368 is called, a RuntimeError is raised. 369 370 """ 371 self.notify(len(self._waiters)) 372 373 notifyAll = notify_all 374 375 376class Semaphore: 377 """This class implements semaphore objects. 378 379 Semaphores manage a counter representing the number of release() calls minus 380 the number of acquire() calls, plus an initial value. The acquire() method 381 blocks if necessary until it can return without making the counter 382 negative. If not given, value defaults to 1. 383 384 """ 385 386 # After Tim Peters' semaphore class, but not quite the same (no maximum) 387 388 def __init__(self, value=1): 389 if value < 0: 390 raise ValueError("semaphore initial value must be >= 0") 391 self._cond = Condition(Lock()) 392 self._value = value 393 394 def acquire(self, blocking=True, timeout=None): 395 """Acquire a semaphore, decrementing the internal counter by one. 396 397 When invoked without arguments: if the internal counter is larger than 398 zero on entry, decrement it by one and return immediately. If it is zero 399 on entry, block, waiting until some other thread has called release() to 400 make it larger than zero. This is done with proper interlocking so that 401 if multiple acquire() calls are blocked, release() will wake exactly one 402 of them up. The implementation may pick one at random, so the order in 403 which blocked threads are awakened should not be relied on. There is no 404 return value in this case. 405 406 When invoked with blocking set to true, do the same thing as when called 407 without arguments, and return true. 408 409 When invoked with blocking set to false, do not block. If a call without 410 an argument would block, return false immediately; otherwise, do the 411 same thing as when called without arguments, and return true. 412 413 When invoked with a timeout other than None, it will block for at 414 most timeout seconds. If acquire does not complete successfully in 415 that interval, return false. Return true otherwise. 416 417 """ 418 if not blocking and timeout is not None: 419 raise ValueError("can't specify timeout for non-blocking acquire") 420 rc = False 421 endtime = None 422 with self._cond: 423 while self._value == 0: 424 if not blocking: 425 break 426 if timeout is not None: 427 if endtime is None: 428 endtime = _time() + timeout 429 else: 430 timeout = endtime - _time() 431 if timeout <= 0: 432 break 433 self._cond.wait(timeout) 434 else: 435 self._value -= 1 436 rc = True 437 return rc 438 439 __enter__ = acquire 440 441 def release(self): 442 """Release a semaphore, incrementing the internal counter by one. 443 444 When the counter is zero on entry and another thread is waiting for it 445 to become larger than zero again, wake up that thread. 446 447 """ 448 with self._cond: 449 self._value += 1 450 self._cond.notify() 451 452 def __exit__(self, t, v, tb): 453 self.release() 454 455 456class BoundedSemaphore(Semaphore): 457 """Implements a bounded semaphore. 458 459 A bounded semaphore checks to make sure its current value doesn't exceed its 460 initial value. If it does, ValueError is raised. In most situations 461 semaphores are used to guard resources with limited capacity. 462 463 If the semaphore is released too many times it's a sign of a bug. If not 464 given, value defaults to 1. 465 466 Like regular semaphores, bounded semaphores manage a counter representing 467 the number of release() calls minus the number of acquire() calls, plus an 468 initial value. The acquire() method blocks if necessary until it can return 469 without making the counter negative. If not given, value defaults to 1. 470 471 """ 472 473 def __init__(self, value=1): 474 Semaphore.__init__(self, value) 475 self._initial_value = value 476 477 def release(self): 478 """Release a semaphore, incrementing the internal counter by one. 479 480 When the counter is zero on entry and another thread is waiting for it 481 to become larger than zero again, wake up that thread. 482 483 If the number of releases exceeds the number of acquires, 484 raise a ValueError. 485 486 """ 487 with self._cond: 488 if self._value >= self._initial_value: 489 raise ValueError("Semaphore released too many times") 490 self._value += 1 491 self._cond.notify() 492 493 494class Event: 495 """Class implementing event objects. 496 497 Events manage a flag that can be set to true with the set() method and reset 498 to false with the clear() method. The wait() method blocks until the flag is 499 true. The flag is initially false. 500 501 """ 502 503 # After Tim Peters' event class (without is_posted()) 504 505 def __init__(self): 506 self._cond = Condition(Lock()) 507 self._flag = False 508 509 def _reset_internal_locks(self): 510 # private! called by Thread._reset_internal_locks by _after_fork() 511 self._cond.__init__(Lock()) 512 513 def is_set(self): 514 """Return true if and only if the internal flag is true.""" 515 return self._flag 516 517 isSet = is_set 518 519 def set(self): 520 """Set the internal flag to true. 521 522 All threads waiting for it to become true are awakened. Threads 523 that call wait() once the flag is true will not block at all. 524 525 """ 526 with self._cond: 527 self._flag = True 528 self._cond.notify_all() 529 530 def clear(self): 531 """Reset the internal flag to false. 532 533 Subsequently, threads calling wait() will block until set() is called to 534 set the internal flag to true again. 535 536 """ 537 with self._cond: 538 self._flag = False 539 540 def wait(self, timeout=None): 541 """Block until the internal flag is true. 542 543 If the internal flag is true on entry, return immediately. Otherwise, 544 block until another thread calls set() to set the flag to true, or until 545 the optional timeout occurs. 546 547 When the timeout argument is present and not None, it should be a 548 floating point number specifying a timeout for the operation in seconds 549 (or fractions thereof). 550 551 This method returns the internal flag on exit, so it will always return 552 True except if a timeout is given and the operation times out. 553 554 """ 555 with self._cond: 556 signaled = self._flag 557 if not signaled: 558 signaled = self._cond.wait(timeout) 559 return signaled 560 561 562# A barrier class. Inspired in part by the pthread_barrier_* api and 563# the CyclicBarrier class from Java. See 564# http://sourceware.org/pthreads-win32/manual/pthread_barrier_init.html and 565# http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ 566# CyclicBarrier.html 567# for information. 568# We maintain two main states, 'filling' and 'draining' enabling the barrier 569# to be cyclic. Threads are not allowed into it until it has fully drained 570# since the previous cycle. In addition, a 'resetting' state exists which is 571# similar to 'draining' except that threads leave with a BrokenBarrierError, 572# and a 'broken' state in which all threads get the exception. 573class Barrier: 574 """Implements a Barrier. 575 576 Useful for synchronizing a fixed number of threads at known synchronization 577 points. Threads block on 'wait()' and are simultaneously awoken once they 578 have all made that call. 579 580 """ 581 582 def __init__(self, parties, action=None, timeout=None): 583 """Create a barrier, initialised to 'parties' threads. 584 585 'action' is a callable which, when supplied, will be called by one of 586 the threads after they have all entered the barrier and just prior to 587 releasing them all. If a 'timeout' is provided, it is used as the 588 default for all subsequent 'wait()' calls. 589 590 """ 591 self._cond = Condition(Lock()) 592 self._action = action 593 self._timeout = timeout 594 self._parties = parties 595 self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken 596 self._count = 0 597 598 def wait(self, timeout=None): 599 """Wait for the barrier. 600 601 When the specified number of threads have started waiting, they are all 602 simultaneously awoken. If an 'action' was provided for the barrier, one 603 of the threads will have executed that callback prior to returning. 604 Returns an individual index number from 0 to 'parties-1'. 605 606 """ 607 if timeout is None: 608 timeout = self._timeout 609 with self._cond: 610 self._enter() # Block while the barrier drains. 611 index = self._count 612 self._count += 1 613 try: 614 if index + 1 == self._parties: 615 # We release the barrier 616 self._release() 617 else: 618 # We wait until someone releases us 619 self._wait(timeout) 620 return index 621 finally: 622 self._count -= 1 623 # Wake up any threads waiting for barrier to drain. 624 self._exit() 625 626 # Block until the barrier is ready for us, or raise an exception 627 # if it is broken. 628 def _enter(self): 629 while self._state in (-1, 1): 630 # It is draining or resetting, wait until done 631 self._cond.wait() 632 #see if the barrier is in a broken state 633 if self._state < 0: 634 raise BrokenBarrierError 635 assert self._state == 0 636 637 # Optionally run the 'action' and release the threads waiting 638 # in the barrier. 639 def _release(self): 640 try: 641 if self._action: 642 self._action() 643 # enter draining state 644 self._state = 1 645 self._cond.notify_all() 646 except: 647 #an exception during the _action handler. Break and reraise 648 self._break() 649 raise 650 651 # Wait in the barrier until we are released. Raise an exception 652 # if the barrier is reset or broken. 653 def _wait(self, timeout): 654 if not self._cond.wait_for(lambda : self._state != 0, timeout): 655 #timed out. Break the barrier 656 self._break() 657 raise BrokenBarrierError 658 if self._state < 0: 659 raise BrokenBarrierError 660 assert self._state == 1 661 662 # If we are the last thread to exit the barrier, signal any threads 663 # waiting for the barrier to drain. 664 def _exit(self): 665 if self._count == 0: 666 if self._state in (-1, 1): 667 #resetting or draining 668 self._state = 0 669 self._cond.notify_all() 670 671 def reset(self): 672 """Reset the barrier to the initial state. 673 674 Any threads currently waiting will get the BrokenBarrier exception 675 raised. 676 677 """ 678 with self._cond: 679 if self._count > 0: 680 if self._state == 0: 681 #reset the barrier, waking up threads 682 self._state = -1 683 elif self._state == -2: 684 #was broken, set it to reset state 685 #which clears when the last thread exits 686 self._state = -1 687 else: 688 self._state = 0 689 self._cond.notify_all() 690 691 def abort(self): 692 """Place the barrier into a 'broken' state. 693 694 Useful in case of error. Any currently waiting threads and threads 695 attempting to 'wait()' will have BrokenBarrierError raised. 696 697 """ 698 with self._cond: 699 self._break() 700 701 def _break(self): 702 # An internal error was detected. The barrier is set to 703 # a broken state all parties awakened. 704 self._state = -2 705 self._cond.notify_all() 706 707 @property 708 def parties(self): 709 """Return the number of threads required to trip the barrier.""" 710 return self._parties 711 712 @property 713 def n_waiting(self): 714 """Return the number of threads currently waiting at the barrier.""" 715 # We don't need synchronization here since this is an ephemeral result 716 # anyway. It returns the correct value in the steady state. 717 if self._state == 0: 718 return self._count 719 return 0 720 721 @property 722 def broken(self): 723 """Return True if the barrier is in a broken state.""" 724 return self._state == -2 725 726# exception raised by the Barrier class 727class BrokenBarrierError(RuntimeError): 728 pass 729 730 731# Helper to generate new thread names 732_counter = _count().__next__ 733_counter() # Consume 0 so first non-main thread has id 1. 734def _newname(template="Thread-%d"): 735 return template % _counter() 736 737# Active thread administration 738_active_limbo_lock = _allocate_lock() 739_active = {} # maps thread id to Thread object 740_limbo = {} 741_dangling = WeakSet() 742# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() 743# to wait until all Python thread states get deleted: 744# see Thread._set_tstate_lock(). 745_shutdown_locks_lock = _allocate_lock() 746_shutdown_locks = set() 747 748# Main class for threads 749 750class Thread: 751 """A class that represents a thread of control. 752 753 This class can be safely subclassed in a limited fashion. There are two ways 754 to specify the activity: by passing a callable object to the constructor, or 755 by overriding the run() method in a subclass. 756 757 """ 758 759 _initialized = False 760 761 def __init__(self, group=None, target=None, name=None, 762 args=(), kwargs=None, *, daemon=None): 763 """This constructor should always be called with keyword arguments. Arguments are: 764 765 *group* should be None; reserved for future extension when a ThreadGroup 766 class is implemented. 767 768 *target* is the callable object to be invoked by the run() 769 method. Defaults to None, meaning nothing is called. 770 771 *name* is the thread name. By default, a unique name is constructed of 772 the form "Thread-N" where N is a small decimal number. 773 774 *args* is the argument tuple for the target invocation. Defaults to (). 775 776 *kwargs* is a dictionary of keyword arguments for the target 777 invocation. Defaults to {}. 778 779 If a subclass overrides the constructor, it must make sure to invoke 780 the base class constructor (Thread.__init__()) before doing anything 781 else to the thread. 782 783 """ 784 assert group is None, "group argument must be None for now" 785 if kwargs is None: 786 kwargs = {} 787 self._target = target 788 self._name = str(name or _newname()) 789 self._args = args 790 self._kwargs = kwargs 791 if daemon is not None: 792 self._daemonic = daemon 793 else: 794 self._daemonic = current_thread().daemon 795 self._ident = None 796 if _HAVE_THREAD_NATIVE_ID: 797 self._native_id = None 798 self._tstate_lock = None 799 self._started = Event() 800 self._is_stopped = False 801 self._initialized = True 802 # Copy of sys.stderr used by self._invoke_excepthook() 803 self._stderr = _sys.stderr 804 self._invoke_excepthook = _make_invoke_excepthook() 805 # For debugging and _after_fork() 806 _dangling.add(self) 807 808 def _reset_internal_locks(self, is_alive): 809 # private! Called by _after_fork() to reset our internal locks as 810 # they may be in an invalid state leading to a deadlock or crash. 811 self._started._reset_internal_locks() 812 if is_alive: 813 self._set_tstate_lock() 814 else: 815 # The thread isn't alive after fork: it doesn't have a tstate 816 # anymore. 817 self._is_stopped = True 818 self._tstate_lock = None 819 820 def __repr__(self): 821 assert self._initialized, "Thread.__init__() was not called" 822 status = "initial" 823 if self._started.is_set(): 824 status = "started" 825 self.is_alive() # easy way to get ._is_stopped set when appropriate 826 if self._is_stopped: 827 status = "stopped" 828 if self._daemonic: 829 status += " daemon" 830 if self._ident is not None: 831 status += " %s" % self._ident 832 return "<%s(%s, %s)>" % (self.__class__.__name__, self._name, status) 833 834 def start(self): 835 """Start the thread's activity. 836 837 It must be called at most once per thread object. It arranges for the 838 object's run() method to be invoked in a separate thread of control. 839 840 This method will raise a RuntimeError if called more than once on the 841 same thread object. 842 843 """ 844 if not self._initialized: 845 raise RuntimeError("thread.__init__() not called") 846 847 if self._started.is_set(): 848 raise RuntimeError("threads can only be started once") 849 with _active_limbo_lock: 850 _limbo[self] = self 851 try: 852 _start_new_thread(self._bootstrap, ()) 853 except Exception: 854 with _active_limbo_lock: 855 del _limbo[self] 856 raise 857 self._started.wait() 858 859 def run(self): 860 """Method representing the thread's activity. 861 862 You may override this method in a subclass. The standard run() method 863 invokes the callable object passed to the object's constructor as the 864 target argument, if any, with sequential and keyword arguments taken 865 from the args and kwargs arguments, respectively. 866 867 """ 868 try: 869 if self._target: 870 self._target(*self._args, **self._kwargs) 871 finally: 872 # Avoid a refcycle if the thread is running a function with 873 # an argument that has a member that points to the thread. 874 del self._target, self._args, self._kwargs 875 876 def _bootstrap(self): 877 # Wrapper around the real bootstrap code that ignores 878 # exceptions during interpreter cleanup. Those typically 879 # happen when a daemon thread wakes up at an unfortunate 880 # moment, finds the world around it destroyed, and raises some 881 # random exception *** while trying to report the exception in 882 # _bootstrap_inner() below ***. Those random exceptions 883 # don't help anybody, and they confuse users, so we suppress 884 # them. We suppress them only when it appears that the world 885 # indeed has already been destroyed, so that exceptions in 886 # _bootstrap_inner() during normal business hours are properly 887 # reported. Also, we only suppress them for daemonic threads; 888 # if a non-daemonic encounters this, something else is wrong. 889 try: 890 self._bootstrap_inner() 891 except: 892 if self._daemonic and _sys is None: 893 return 894 raise 895 896 def _set_ident(self): 897 self._ident = get_ident() 898 899 if _HAVE_THREAD_NATIVE_ID: 900 def _set_native_id(self): 901 self._native_id = get_native_id() 902 903 def _set_tstate_lock(self): 904 """ 905 Set a lock object which will be released by the interpreter when 906 the underlying thread state (see pystate.h) gets deleted. 907 """ 908 self._tstate_lock = _set_sentinel() 909 self._tstate_lock.acquire() 910 911 if not self.daemon: 912 with _shutdown_locks_lock: 913 _shutdown_locks.add(self._tstate_lock) 914 915 def _bootstrap_inner(self): 916 try: 917 self._set_ident() 918 self._set_tstate_lock() 919 if _HAVE_THREAD_NATIVE_ID: 920 self._set_native_id() 921 self._started.set() 922 with _active_limbo_lock: 923 _active[self._ident] = self 924 del _limbo[self] 925 926 if _trace_hook: 927 _sys.settrace(_trace_hook) 928 if _profile_hook: 929 _sys.setprofile(_profile_hook) 930 931 try: 932 self.run() 933 except: 934 self._invoke_excepthook(self) 935 finally: 936 with _active_limbo_lock: 937 try: 938 # We don't call self._delete() because it also 939 # grabs _active_limbo_lock. 940 del _active[get_ident()] 941 except: 942 pass 943 944 def _stop(self): 945 # After calling ._stop(), .is_alive() returns False and .join() returns 946 # immediately. ._tstate_lock must be released before calling ._stop(). 947 # 948 # Normal case: C code at the end of the thread's life 949 # (release_sentinel in _threadmodule.c) releases ._tstate_lock, and 950 # that's detected by our ._wait_for_tstate_lock(), called by .join() 951 # and .is_alive(). Any number of threads _may_ call ._stop() 952 # simultaneously (for example, if multiple threads are blocked in 953 # .join() calls), and they're not serialized. That's harmless - 954 # they'll just make redundant rebindings of ._is_stopped and 955 # ._tstate_lock. Obscure: we rebind ._tstate_lock last so that the 956 # "assert self._is_stopped" in ._wait_for_tstate_lock() always works 957 # (the assert is executed only if ._tstate_lock is None). 958 # 959 # Special case: _main_thread releases ._tstate_lock via this 960 # module's _shutdown() function. 961 lock = self._tstate_lock 962 if lock is not None: 963 assert not lock.locked() 964 self._is_stopped = True 965 self._tstate_lock = None 966 if not self.daemon: 967 with _shutdown_locks_lock: 968 _shutdown_locks.discard(lock) 969 970 def _delete(self): 971 "Remove current thread from the dict of currently running threads." 972 with _active_limbo_lock: 973 del _active[get_ident()] 974 # There must not be any python code between the previous line 975 # and after the lock is released. Otherwise a tracing function 976 # could try to acquire the lock again in the same thread, (in 977 # current_thread()), and would block. 978 979 def join(self, timeout=None): 980 """Wait until the thread terminates. 981 982 This blocks the calling thread until the thread whose join() method is 983 called terminates -- either normally or through an unhandled exception 984 or until the optional timeout occurs. 985 986 When the timeout argument is present and not None, it should be a 987 floating point number specifying a timeout for the operation in seconds 988 (or fractions thereof). As join() always returns None, you must call 989 is_alive() after join() to decide whether a timeout happened -- if the 990 thread is still alive, the join() call timed out. 991 992 When the timeout argument is not present or None, the operation will 993 block until the thread terminates. 994 995 A thread can be join()ed many times. 996 997 join() raises a RuntimeError if an attempt is made to join the current 998 thread as that would cause a deadlock. It is also an error to join() a 999 thread before it has been started and attempts to do so raises the same 1000 exception. 1001 1002 """ 1003 if not self._initialized: 1004 raise RuntimeError("Thread.__init__() not called") 1005 if not self._started.is_set(): 1006 raise RuntimeError("cannot join thread before it is started") 1007 if self is current_thread(): 1008 raise RuntimeError("cannot join current thread") 1009 1010 if timeout is None: 1011 self._wait_for_tstate_lock() 1012 else: 1013 # the behavior of a negative timeout isn't documented, but 1014 # historically .join(timeout=x) for x<0 has acted as if timeout=0 1015 self._wait_for_tstate_lock(timeout=max(timeout, 0)) 1016 1017 def _wait_for_tstate_lock(self, block=True, timeout=-1): 1018 # Issue #18808: wait for the thread state to be gone. 1019 # At the end of the thread's life, after all knowledge of the thread 1020 # is removed from C data structures, C code releases our _tstate_lock. 1021 # This method passes its arguments to _tstate_lock.acquire(). 1022 # If the lock is acquired, the C code is done, and self._stop() is 1023 # called. That sets ._is_stopped to True, and ._tstate_lock to None. 1024 lock = self._tstate_lock 1025 if lock is None: # already determined that the C code is done 1026 assert self._is_stopped 1027 elif lock.acquire(block, timeout): 1028 lock.release() 1029 self._stop() 1030 1031 @property 1032 def name(self): 1033 """A string used for identification purposes only. 1034 1035 It has no semantics. Multiple threads may be given the same name. The 1036 initial name is set by the constructor. 1037 1038 """ 1039 assert self._initialized, "Thread.__init__() not called" 1040 return self._name 1041 1042 @name.setter 1043 def name(self, name): 1044 assert self._initialized, "Thread.__init__() not called" 1045 self._name = str(name) 1046 1047 @property 1048 def ident(self): 1049 """Thread identifier of this thread or None if it has not been started. 1050 1051 This is a nonzero integer. See the get_ident() function. Thread 1052 identifiers may be recycled when a thread exits and another thread is 1053 created. The identifier is available even after the thread has exited. 1054 1055 """ 1056 assert self._initialized, "Thread.__init__() not called" 1057 return self._ident 1058 1059 if _HAVE_THREAD_NATIVE_ID: 1060 @property 1061 def native_id(self): 1062 """Native integral thread ID of this thread, or None if it has not been started. 1063 1064 This is a non-negative integer. See the get_native_id() function. 1065 This represents the Thread ID as reported by the kernel. 1066 1067 """ 1068 assert self._initialized, "Thread.__init__() not called" 1069 return self._native_id 1070 1071 def is_alive(self): 1072 """Return whether the thread is alive. 1073 1074 This method returns True just before the run() method starts until just 1075 after the run() method terminates. The module function enumerate() 1076 returns a list of all alive threads. 1077 1078 """ 1079 assert self._initialized, "Thread.__init__() not called" 1080 if self._is_stopped or not self._started.is_set(): 1081 return False 1082 self._wait_for_tstate_lock(False) 1083 return not self._is_stopped 1084 1085 def isAlive(self): 1086 """Return whether the thread is alive. 1087 1088 This method is deprecated, use is_alive() instead. 1089 """ 1090 import warnings 1091 warnings.warn('isAlive() is deprecated, use is_alive() instead', 1092 DeprecationWarning, stacklevel=2) 1093 return self.is_alive() 1094 1095 @property 1096 def daemon(self): 1097 """A boolean value indicating whether this thread is a daemon thread. 1098 1099 This must be set before start() is called, otherwise RuntimeError is 1100 raised. Its initial value is inherited from the creating thread; the 1101 main thread is not a daemon thread and therefore all threads created in 1102 the main thread default to daemon = False. 1103 1104 The entire Python program exits when only daemon threads are left. 1105 1106 """ 1107 assert self._initialized, "Thread.__init__() not called" 1108 return self._daemonic 1109 1110 @daemon.setter 1111 def daemon(self, daemonic): 1112 if not self._initialized: 1113 raise RuntimeError("Thread.__init__() not called") 1114 if self._started.is_set(): 1115 raise RuntimeError("cannot set daemon status of active thread") 1116 self._daemonic = daemonic 1117 1118 def isDaemon(self): 1119 return self.daemon 1120 1121 def setDaemon(self, daemonic): 1122 self.daemon = daemonic 1123 1124 def getName(self): 1125 return self.name 1126 1127 def setName(self, name): 1128 self.name = name 1129 1130 1131try: 1132 from _thread import (_excepthook as excepthook, 1133 _ExceptHookArgs as ExceptHookArgs) 1134except ImportError: 1135 # Simple Python implementation if _thread._excepthook() is not available 1136 from traceback import print_exception as _print_exception 1137 from collections import namedtuple 1138 1139 _ExceptHookArgs = namedtuple( 1140 'ExceptHookArgs', 1141 'exc_type exc_value exc_traceback thread') 1142 1143 def ExceptHookArgs(args): 1144 return _ExceptHookArgs(*args) 1145 1146 def excepthook(args, /): 1147 """ 1148 Handle uncaught Thread.run() exception. 1149 """ 1150 if args.exc_type == SystemExit: 1151 # silently ignore SystemExit 1152 return 1153 1154 if _sys is not None and _sys.stderr is not None: 1155 stderr = _sys.stderr 1156 elif args.thread is not None: 1157 stderr = args.thread._stderr 1158 if stderr is None: 1159 # do nothing if sys.stderr is None and sys.stderr was None 1160 # when the thread was created 1161 return 1162 else: 1163 # do nothing if sys.stderr is None and args.thread is None 1164 return 1165 1166 if args.thread is not None: 1167 name = args.thread.name 1168 else: 1169 name = get_ident() 1170 print(f"Exception in thread {name}:", 1171 file=stderr, flush=True) 1172 _print_exception(args.exc_type, args.exc_value, args.exc_traceback, 1173 file=stderr) 1174 stderr.flush() 1175 1176 1177def _make_invoke_excepthook(): 1178 # Create a local namespace to ensure that variables remain alive 1179 # when _invoke_excepthook() is called, even if it is called late during 1180 # Python shutdown. It is mostly needed for daemon threads. 1181 1182 old_excepthook = excepthook 1183 old_sys_excepthook = _sys.excepthook 1184 if old_excepthook is None: 1185 raise RuntimeError("threading.excepthook is None") 1186 if old_sys_excepthook is None: 1187 raise RuntimeError("sys.excepthook is None") 1188 1189 sys_exc_info = _sys.exc_info 1190 local_print = print 1191 local_sys = _sys 1192 1193 def invoke_excepthook(thread): 1194 global excepthook 1195 try: 1196 hook = excepthook 1197 if hook is None: 1198 hook = old_excepthook 1199 1200 args = ExceptHookArgs([*sys_exc_info(), thread]) 1201 1202 hook(args) 1203 except Exception as exc: 1204 exc.__suppress_context__ = True 1205 del exc 1206 1207 if local_sys is not None and local_sys.stderr is not None: 1208 stderr = local_sys.stderr 1209 else: 1210 stderr = thread._stderr 1211 1212 local_print("Exception in threading.excepthook:", 1213 file=stderr, flush=True) 1214 1215 if local_sys is not None and local_sys.excepthook is not None: 1216 sys_excepthook = local_sys.excepthook 1217 else: 1218 sys_excepthook = old_sys_excepthook 1219 1220 sys_excepthook(*sys_exc_info()) 1221 finally: 1222 # Break reference cycle (exception stored in a variable) 1223 args = None 1224 1225 return invoke_excepthook 1226 1227 1228# The timer class was contributed by Itamar Shtull-Trauring 1229 1230class Timer(Thread): 1231 """Call a function after a specified number of seconds: 1232 1233 t = Timer(30.0, f, args=None, kwargs=None) 1234 t.start() 1235 t.cancel() # stop the timer's action if it's still waiting 1236 1237 """ 1238 1239 def __init__(self, interval, function, args=None, kwargs=None): 1240 Thread.__init__(self) 1241 self.interval = interval 1242 self.function = function 1243 self.args = args if args is not None else [] 1244 self.kwargs = kwargs if kwargs is not None else {} 1245 self.finished = Event() 1246 1247 def cancel(self): 1248 """Stop the timer if it hasn't finished yet.""" 1249 self.finished.set() 1250 1251 def run(self): 1252 self.finished.wait(self.interval) 1253 if not self.finished.is_set(): 1254 self.function(*self.args, **self.kwargs) 1255 self.finished.set() 1256 1257 1258# Special thread class to represent the main thread 1259 1260class _MainThread(Thread): 1261 1262 def __init__(self): 1263 Thread.__init__(self, name="MainThread", daemon=False) 1264 self._set_tstate_lock() 1265 self._started.set() 1266 self._set_ident() 1267 if _HAVE_THREAD_NATIVE_ID: 1268 self._set_native_id() 1269 with _active_limbo_lock: 1270 _active[self._ident] = self 1271 1272 1273# Dummy thread class to represent threads not started here. 1274# These aren't garbage collected when they die, nor can they be waited for. 1275# If they invoke anything in threading.py that calls current_thread(), they 1276# leave an entry in the _active dict forever after. 1277# Their purpose is to return *something* from current_thread(). 1278# They are marked as daemon threads so we won't wait for them 1279# when we exit (conform previous semantics). 1280 1281class _DummyThread(Thread): 1282 1283 def __init__(self): 1284 Thread.__init__(self, name=_newname("Dummy-%d"), daemon=True) 1285 1286 self._started.set() 1287 self._set_ident() 1288 if _HAVE_THREAD_NATIVE_ID: 1289 self._set_native_id() 1290 with _active_limbo_lock: 1291 _active[self._ident] = self 1292 1293 def _stop(self): 1294 pass 1295 1296 def is_alive(self): 1297 assert not self._is_stopped and self._started.is_set() 1298 return True 1299 1300 def join(self, timeout=None): 1301 assert False, "cannot join a dummy thread" 1302 1303 1304# Global API functions 1305 1306def current_thread(): 1307 """Return the current Thread object, corresponding to the caller's thread of control. 1308 1309 If the caller's thread of control was not created through the threading 1310 module, a dummy thread object with limited functionality is returned. 1311 1312 """ 1313 try: 1314 return _active[get_ident()] 1315 except KeyError: 1316 return _DummyThread() 1317 1318currentThread = current_thread 1319 1320def active_count(): 1321 """Return the number of Thread objects currently alive. 1322 1323 The returned count is equal to the length of the list returned by 1324 enumerate(). 1325 1326 """ 1327 with _active_limbo_lock: 1328 return len(_active) + len(_limbo) 1329 1330activeCount = active_count 1331 1332def _enumerate(): 1333 # Same as enumerate(), but without the lock. Internal use only. 1334 return list(_active.values()) + list(_limbo.values()) 1335 1336def enumerate(): 1337 """Return a list of all Thread objects currently alive. 1338 1339 The list includes daemonic threads, dummy thread objects created by 1340 current_thread(), and the main thread. It excludes terminated threads and 1341 threads that have not yet been started. 1342 1343 """ 1344 with _active_limbo_lock: 1345 return list(_active.values()) + list(_limbo.values()) 1346 1347from _thread import stack_size 1348 1349# Create the main thread object, 1350# and make it available for the interpreter 1351# (Py_Main) as threading._shutdown. 1352 1353_main_thread = _MainThread() 1354 1355def _shutdown(): 1356 """ 1357 Wait until the Python thread state of all non-daemon threads get deleted. 1358 """ 1359 # Obscure: other threads may be waiting to join _main_thread. That's 1360 # dubious, but some code does it. We can't wait for C code to release 1361 # the main thread's tstate_lock - that won't happen until the interpreter 1362 # is nearly dead. So we release it here. Note that just calling _stop() 1363 # isn't enough: other threads may already be waiting on _tstate_lock. 1364 if _main_thread._is_stopped: 1365 # _shutdown() was already called 1366 return 1367 1368 # Main thread 1369 tlock = _main_thread._tstate_lock 1370 # The main thread isn't finished yet, so its thread state lock can't have 1371 # been released. 1372 assert tlock is not None 1373 assert tlock.locked() 1374 tlock.release() 1375 _main_thread._stop() 1376 1377 # Join all non-deamon threads 1378 while True: 1379 with _shutdown_locks_lock: 1380 locks = list(_shutdown_locks) 1381 _shutdown_locks.clear() 1382 1383 if not locks: 1384 break 1385 1386 for lock in locks: 1387 # mimick Thread.join() 1388 lock.acquire() 1389 lock.release() 1390 1391 # new threads can be spawned while we were waiting for the other 1392 # threads to complete 1393 1394 1395def main_thread(): 1396 """Return the main thread object. 1397 1398 In normal conditions, the main thread is the thread from which the 1399 Python interpreter was started. 1400 """ 1401 return _main_thread 1402 1403# get thread-local implementation, either from the thread 1404# module, or from the python fallback 1405 1406try: 1407 from _thread import _local as local 1408except ImportError: 1409 from _threading_local import local 1410 1411 1412def _after_fork(): 1413 """ 1414 Cleanup threading module state that should not exist after a fork. 1415 """ 1416 # Reset _active_limbo_lock, in case we forked while the lock was held 1417 # by another (non-forked) thread. http://bugs.python.org/issue874900 1418 global _active_limbo_lock, _main_thread 1419 global _shutdown_locks_lock, _shutdown_locks 1420 _active_limbo_lock = _allocate_lock() 1421 1422 # fork() only copied the current thread; clear references to others. 1423 new_active = {} 1424 1425 try: 1426 current = _active[get_ident()] 1427 except KeyError: 1428 # fork() was called in a thread which was not spawned 1429 # by threading.Thread. For example, a thread spawned 1430 # by thread.start_new_thread(). 1431 current = _MainThread() 1432 1433 _main_thread = current 1434 1435 # reset _shutdown() locks: threads re-register their _tstate_lock below 1436 _shutdown_locks_lock = _allocate_lock() 1437 _shutdown_locks = set() 1438 1439 with _active_limbo_lock: 1440 # Dangling thread instances must still have their locks reset, 1441 # because someone may join() them. 1442 threads = set(_enumerate()) 1443 threads.update(_dangling) 1444 for thread in threads: 1445 # Any lock/condition variable may be currently locked or in an 1446 # invalid state, so we reinitialize them. 1447 if thread is current: 1448 # There is only one active thread. We reset the ident to 1449 # its new value since it can have changed. 1450 thread._reset_internal_locks(True) 1451 ident = get_ident() 1452 thread._ident = ident 1453 new_active[ident] = thread 1454 else: 1455 # All the others are already stopped. 1456 thread._reset_internal_locks(False) 1457 thread._stop() 1458 1459 _limbo.clear() 1460 _active.clear() 1461 _active.update(new_active) 1462 assert len(_active) == 1 1463 1464 1465if hasattr(_os, "register_at_fork"): 1466 _os.register_at_fork(after_in_child=_after_fork) 1467