1"""Thread module emulating a subset of Java's threading model."""
2
3import sys as _sys
4
5try:
6    import thread
7except ImportError:
8    del _sys.modules[__name__]
9    raise
10
11import warnings
12
13from collections import deque as _deque
14from itertools import count as _count
15from time import time as _time, sleep as _sleep
16from traceback import format_exc as _format_exc
17
18# Note regarding PEP 8 compliant aliases
19#  This threading model was originally inspired by Java, and inherited
20# the convention of camelCase function and method names from that
21# language. While those names are not in any imminent danger of being
22# deprecated, starting with Python 2.6, the module now provides a
23# PEP 8 compliant alias for any such method name.
24# Using the new PEP 8 compliant names also facilitates substitution
25# with the multiprocessing module, which doesn't provide the old
26# Java inspired names.
27
28
29# Rename some stuff so "from threading import *" is safe
30__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
31           'current_thread', 'enumerate', 'Event',
32           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
33           'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34
35_start_new_thread = thread.start_new_thread
36_allocate_lock = thread.allocate_lock
37_get_ident = thread.get_ident
38ThreadError = thread.error
39del thread
40
41
42# sys.exc_clear is used to work around the fact that except blocks
43# don't fully clear the exception until 3.0.
44warnings.filterwarnings('ignore', category=DeprecationWarning,
45                        module='threading', message='sys.exc_clear')
46
47# Debug support (adapted from ihooks.py).
48# All the major classes here derive from _Verbose.  We force that to
49# be a new-style class so that all the major classes here are new-style.
50# This helps debugging (type(instance) is more revealing for instances
51# of new-style classes).
52
53_VERBOSE = False
54
55if __debug__:
56
57    class _Verbose(object):
58
59        def __init__(self, verbose=None):
60            if verbose is None:
61                verbose = _VERBOSE
62            self.__verbose = verbose
63
64        def _note(self, format, *args):
65            if self.__verbose:
66                format = format % args
67                # Issue #4188: calling current_thread() can incur an infinite
68                # recursion if it has to create a DummyThread on the fly.
69                ident = _get_ident()
70                try:
71                    name = _active[ident].name
72                except KeyError:
73                    name = "<OS thread %d>" % ident
74                format = "%s: %s\n" % (name, format)
75                _sys.stderr.write(format)
76
77else:
78    # Disable this when using "python -O"
79    class _Verbose(object):
80        def __init__(self, verbose=None):
81            pass
82        def _note(self, *args):
83            pass
84
85# Support for profile and trace hooks
86
87_profile_hook = None
88_trace_hook = None
89
90def setprofile(func):
91    """Set a profile function for all threads started from the threading module.
92
93    The func will be passed to sys.setprofile() for each thread, before its
94    run() method is called.
95
96    """
97    global _profile_hook
98    _profile_hook = func
99
100def settrace(func):
101    """Set a trace function for all threads started from the threading module.
102
103    The func will be passed to sys.settrace() for each thread, before its run()
104    method is called.
105
106    """
107    global _trace_hook
108    _trace_hook = func
109
110# Synchronization classes
111
112Lock = _allocate_lock
113
114def RLock(*args, **kwargs):
115    """Factory function that returns a new reentrant lock.
116
117    A reentrant lock must be released by the thread that acquired it. Once a
118    thread has acquired a reentrant lock, the same thread may acquire it again
119    without blocking; the thread must release it once for each time it has
120    acquired it.
121
122    """
123    return _RLock(*args, **kwargs)
124
125class _RLock(_Verbose):
126    """A reentrant lock must be released by the thread that acquired it. Once a
127       thread has acquired a reentrant lock, the same thread may acquire it
128       again without blocking; the thread must release it once for each time it
129       has acquired it.
130    """
131
132    def __init__(self, verbose=None):
133        _Verbose.__init__(self, verbose)
134        self.__block = _allocate_lock()
135        self.__owner = None
136        self.__count = 0
137
138    def __repr__(self):
139        owner = self.__owner
140        try:
141            owner = _active[owner].name
142        except KeyError:
143            pass
144        return "<%s owner=%r count=%d>" % (
145                self.__class__.__name__, owner, self.__count)
146
147    def acquire(self, blocking=1):
148        """Acquire a lock, blocking or non-blocking.
149
150        When invoked without arguments: if this thread already owns the lock,
151        increment the recursion level by one, and return immediately. Otherwise,
152        if another thread owns the lock, block until the lock is unlocked. Once
153        the lock is unlocked (not owned by any thread), then grab ownership, set
154        the recursion level to one, and return. If more than one thread is
155        blocked waiting until the lock is unlocked, only one at a time will be
156        able to grab ownership of the lock. There is no return value in this
157        case.
158
159        When invoked with the blocking argument set to true, do the same thing
160        as when called without arguments, and return true.
161
162        When invoked with the blocking argument set to false, do not block. If a
163        call without an argument would block, return false immediately;
164        otherwise, do the same thing as when called without arguments, and
165        return true.
166
167        """
168        me = _get_ident()
169        if self.__owner == me:
170            self.__count = self.__count + 1
171            if __debug__:
172                self._note("%s.acquire(%s): recursive success", self, blocking)
173            return 1
174        rc = self.__block.acquire(blocking)
175        if rc:
176            self.__owner = me
177            self.__count = 1
178            if __debug__:
179                self._note("%s.acquire(%s): initial success", self, blocking)
180        else:
181            if __debug__:
182                self._note("%s.acquire(%s): failure", self, blocking)
183        return rc
184
185    __enter__ = acquire
186
187    def release(self):
188        """Release a lock, decrementing the recursion level.
189
190        If after the decrement it is zero, reset the lock to unlocked (not owned
191        by any thread), and if any other threads are blocked waiting for the
192        lock to become unlocked, allow exactly one of them to proceed. If after
193        the decrement the recursion level is still nonzero, the lock remains
194        locked and owned by the calling thread.
195
196        Only call this method when the calling thread owns the lock. A
197        RuntimeError is raised if this method is called when the lock is
198        unlocked.
199
200        There is no return value.
201
202        """
203        if self.__owner != _get_ident():
204            raise RuntimeError("cannot release un-acquired lock")
205        self.__count = count = self.__count - 1
206        if not count:
207            self.__owner = None
208            self.__block.release()
209            if __debug__:
210                self._note("%s.release(): final release", self)
211        else:
212            if __debug__:
213                self._note("%s.release(): non-final release", self)
214
215    def __exit__(self, t, v, tb):
216        self.release()
217
218    # Internal methods used by condition variables
219
220    def _acquire_restore(self, count_owner):
221        count, owner = count_owner
222        self.__block.acquire()
223        self.__count = count
224        self.__owner = owner
225        if __debug__:
226            self._note("%s._acquire_restore()", self)
227
228    def _release_save(self):
229        if __debug__:
230            self._note("%s._release_save()", self)
231        count = self.__count
232        self.__count = 0
233        owner = self.__owner
234        self.__owner = None
235        self.__block.release()
236        return (count, owner)
237
238    def _is_owned(self):
239        return self.__owner == _get_ident()
240
241
242def Condition(*args, **kwargs):
243    """Factory function that returns a new condition variable object.
244
245    A condition variable allows one or more threads to wait until they are
246    notified by another thread.
247
248    If the lock argument is given and not None, it must be a Lock or RLock
249    object, and it is used as the underlying lock. Otherwise, a new RLock object
250    is created and used as the underlying lock.
251
252    """
253    return _Condition(*args, **kwargs)
254
255class _Condition(_Verbose):
256    """Condition variables allow one or more threads to wait until they are
257       notified by another thread.
258    """
259
260    def __init__(self, lock=None, verbose=None):
261        _Verbose.__init__(self, verbose)
262        if lock is None:
263            lock = RLock()
264        self.__lock = lock
265        # Export the lock's acquire() and release() methods
266        self.acquire = lock.acquire
267        self.release = lock.release
268        # If the lock defines _release_save() and/or _acquire_restore(),
269        # these override the default implementations (which just call
270        # release() and acquire() on the lock).  Ditto for _is_owned().
271        try:
272            self._release_save = lock._release_save
273        except AttributeError:
274            pass
275        try:
276            self._acquire_restore = lock._acquire_restore
277        except AttributeError:
278            pass
279        try:
280            self._is_owned = lock._is_owned
281        except AttributeError:
282            pass
283        self.__waiters = []
284
285    def __enter__(self):
286        return self.__lock.__enter__()
287
288    def __exit__(self, *args):
289        return self.__lock.__exit__(*args)
290
291    def __repr__(self):
292        return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
293
294    def _release_save(self):
295        self.__lock.release()           # No state to save
296
297    def _acquire_restore(self, x):
298        self.__lock.acquire()           # Ignore saved state
299
300    def _is_owned(self):
301        # Return True if lock is owned by current_thread.
302        # This method is called only if __lock doesn't have _is_owned().
303        if self.__lock.acquire(0):
304            self.__lock.release()
305            return False
306        else:
307            return True
308
309    def wait(self, timeout=None):
310        """Wait until notified or until a timeout occurs.
311
312        If the calling thread has not acquired the lock when this method is
313        called, a RuntimeError is raised.
314
315        This method releases the underlying lock, and then blocks until it is
316        awakened by a notify() or notifyAll() call for the same condition
317        variable in another thread, or until the optional timeout occurs. Once
318        awakened or timed out, it re-acquires the lock and returns.
319
320        When the timeout argument is present and not None, it should be a
321        floating point number specifying a timeout for the operation in seconds
322        (or fractions thereof).
323
324        When the underlying lock is an RLock, it is not released using its
325        release() method, since this may not actually unlock the lock when it
326        was acquired multiple times recursively. Instead, an internal interface
327        of the RLock class is used, which really unlocks it even when it has
328        been recursively acquired several times. Another internal interface is
329        then used to restore the recursion level when the lock is reacquired.
330
331        """
332        if not self._is_owned():
333            raise RuntimeError("cannot wait on un-acquired lock")
334        waiter = _allocate_lock()
335        waiter.acquire()
336        self.__waiters.append(waiter)
337        saved_state = self._release_save()
338        try:    # restore state no matter what (e.g., KeyboardInterrupt)
339            if timeout is None:
340                waiter.acquire()
341                if __debug__:
342                    self._note("%s.wait(): got it", self)
343            else:
344                # Balancing act:  We can't afford a pure busy loop, so we
345                # have to sleep; but if we sleep the whole timeout time,
346                # we'll be unresponsive.  The scheme here sleeps very
347                # little at first, longer as time goes on, but never longer
348                # than 20 times per second (or the timeout time remaining).
349                endtime = _time() + timeout
350                delay = 0.0005 # 500 us -> initial delay of 1 ms
351                while True:
352                    gotit = waiter.acquire(0)
353                    if gotit:
354                        break
355                    remaining = endtime - _time()
356                    if remaining <= 0:
357                        break
358                    delay = min(delay * 2, remaining, .05)
359                    _sleep(delay)
360                if not gotit:
361                    if __debug__:
362                        self._note("%s.wait(%s): timed out", self, timeout)
363                    try:
364                        self.__waiters.remove(waiter)
365                    except ValueError:
366                        pass
367                else:
368                    if __debug__:
369                        self._note("%s.wait(%s): got it", self, timeout)
370        finally:
371            self._acquire_restore(saved_state)
372
373    def notify(self, n=1):
374        """Wake up one or more threads waiting on this condition, if any.
375
376        If the calling thread has not acquired the lock when this method is
377        called, a RuntimeError is raised.
378
379        This method wakes up at most n of the threads waiting for the condition
380        variable; it is a no-op if no threads are waiting.
381
382        """
383        if not self._is_owned():
384            raise RuntimeError("cannot notify on un-acquired lock")
385        __waiters = self.__waiters
386        waiters = __waiters[:n]
387        if not waiters:
388            if __debug__:
389                self._note("%s.notify(): no waiters", self)
390            return
391        self._note("%s.notify(): notifying %d waiter%s", self, n,
392                   n!=1 and "s" or "")
393        for waiter in waiters:
394            waiter.release()
395            try:
396                __waiters.remove(waiter)
397            except ValueError:
398                pass
399
400    def notifyAll(self):
401        """Wake up all threads waiting on this condition.
402
403        If the calling thread has not acquired the lock when this method
404        is called, a RuntimeError is raised.
405
406        """
407        self.notify(len(self.__waiters))
408
409    notify_all = notifyAll
410
411
412def Semaphore(*args, **kwargs):
413    """A factory function that returns a new semaphore.
414
415    Semaphores manage a counter representing the number of release() calls minus
416    the number of acquire() calls, plus an initial value. The acquire() method
417    blocks if necessary until it can return without making the counter
418    negative. If not given, value defaults to 1.
419
420    """
421    return _Semaphore(*args, **kwargs)
422
423class _Semaphore(_Verbose):
424    """Semaphores manage a counter representing the number of release() calls
425       minus the number of acquire() calls, plus an initial value. The acquire()
426       method blocks if necessary until it can return without making the counter
427       negative. If not given, value defaults to 1.
428
429    """
430
431    # After Tim Peters' semaphore class, but not quite the same (no maximum)
432
433    def __init__(self, value=1, verbose=None):
434        if value < 0:
435            raise ValueError("semaphore initial value must be >= 0")
436        _Verbose.__init__(self, verbose)
437        self.__cond = Condition(Lock())
438        self.__value = value
439
440    def acquire(self, blocking=1):
441        """Acquire a semaphore, decrementing the internal counter by one.
442
443        When invoked without arguments: if the internal counter is larger than
444        zero on entry, decrement it by one and return immediately. If it is zero
445        on entry, block, waiting until some other thread has called release() to
446        make it larger than zero. This is done with proper interlocking so that
447        if multiple acquire() calls are blocked, release() will wake exactly one
448        of them up. The implementation may pick one at random, so the order in
449        which blocked threads are awakened should not be relied on. There is no
450        return value in this case.
451
452        When invoked with blocking set to true, do the same thing as when called
453        without arguments, and return true.
454
455        When invoked with blocking set to false, do not block. If a call without
456        an argument would block, return false immediately; otherwise, do the
457        same thing as when called without arguments, and return true.
458
459        """
460        rc = False
461        with self.__cond:
462            while self.__value == 0:
463                if not blocking:
464                    break
465                if __debug__:
466                    self._note("%s.acquire(%s): blocked waiting, value=%s",
467                            self, blocking, self.__value)
468                self.__cond.wait()
469            else:
470                self.__value = self.__value - 1
471                if __debug__:
472                    self._note("%s.acquire: success, value=%s",
473                            self, self.__value)
474                rc = True
475        return rc
476
477    __enter__ = acquire
478
479    def release(self):
480        """Release a semaphore, incrementing the internal counter by one.
481
482        When the counter is zero on entry and another thread is waiting for it
483        to become larger than zero again, wake up that thread.
484
485        """
486        with self.__cond:
487            self.__value = self.__value + 1
488            if __debug__:
489                self._note("%s.release: success, value=%s",
490                        self, self.__value)
491            self.__cond.notify()
492
493    def __exit__(self, t, v, tb):
494        self.release()
495
496
497def BoundedSemaphore(*args, **kwargs):
498    """A factory function that returns a new bounded semaphore.
499
500    A bounded semaphore checks to make sure its current value doesn't exceed its
501    initial value. If it does, ValueError is raised. In most situations
502    semaphores are used to guard resources with limited capacity.
503
504    If the semaphore is released too many times it's a sign of a bug. If not
505    given, value defaults to 1.
506
507    Like regular semaphores, bounded semaphores manage a counter representing
508    the number of release() calls minus the number of acquire() calls, plus an
509    initial value. The acquire() method blocks if necessary until it can return
510    without making the counter negative. If not given, value defaults to 1.
511
512    """
513    return _BoundedSemaphore(*args, **kwargs)
514
515class _BoundedSemaphore(_Semaphore):
516    """A bounded semaphore checks to make sure its current value doesn't exceed
517       its initial value. If it does, ValueError is raised. In most situations
518       semaphores are used to guard resources with limited capacity.
519    """
520
521    def __init__(self, value=1, verbose=None):
522        _Semaphore.__init__(self, value, verbose)
523        self._initial_value = value
524
525    def release(self):
526        """Release a semaphore, incrementing the internal counter by one.
527
528        When the counter is zero on entry and another thread is waiting for it
529        to become larger than zero again, wake up that thread.
530
531        If the number of releases exceeds the number of acquires,
532        raise a ValueError.
533
534        """
535        with self._Semaphore__cond:
536            if self._Semaphore__value >= self._initial_value:
537                raise ValueError("Semaphore released too many times")
538            self._Semaphore__value += 1
539            self._Semaphore__cond.notify()
540
541
542def Event(*args, **kwargs):
543    """A factory function that returns a new event.
544
545    Events manage a flag that can be set to true with the set() method and reset
546    to false with the clear() method. The wait() method blocks until the flag is
547    true.
548
549    """
550    return _Event(*args, **kwargs)
551
552class _Event(_Verbose):
553    """A factory function that returns a new event object. An event manages a
554       flag that can be set to true with the set() method and reset to false
555       with the clear() method. The wait() method blocks until the flag is true.
556
557    """
558
559    # After Tim Peters' event class (without is_posted())
560
561    def __init__(self, verbose=None):
562        _Verbose.__init__(self, verbose)
563        self.__cond = Condition(Lock())
564        self.__flag = False
565
566    def _reset_internal_locks(self):
567        # private!  called by Thread._reset_internal_locks by _after_fork()
568        self.__cond.__init__(Lock())
569
570    def isSet(self):
571        'Return true if and only if the internal flag is true.'
572        return self.__flag
573
574    is_set = isSet
575
576    def set(self):
577        """Set the internal flag to true.
578
579        All threads waiting for the flag to become true are awakened. Threads
580        that call wait() once the flag is true will not block at all.
581
582        """
583        with self.__cond:
584            self.__flag = True
585            self.__cond.notify_all()
586
587    def clear(self):
588        """Reset the internal flag to false.
589
590        Subsequently, threads calling wait() will block until set() is called to
591        set the internal flag to true again.
592
593        """
594        with self.__cond:
595            self.__flag = False
596
597    def wait(self, timeout=None):
598        """Block until the internal flag is true.
599
600        If the internal flag is true on entry, return immediately. Otherwise,
601        block until another thread calls set() to set the flag to true, or until
602        the optional timeout occurs.
603
604        When the timeout argument is present and not None, it should be a
605        floating point number specifying a timeout for the operation in seconds
606        (or fractions thereof).
607
608        This method returns the internal flag on exit, so it will always return
609        True except if a timeout is given and the operation times out.
610
611        """
612        with self.__cond:
613            if not self.__flag:
614                self.__cond.wait(timeout)
615            return self.__flag
616
617# Helper to generate new thread names
618_counter = _count().next
619_counter() # Consume 0 so first non-main thread has id 1.
620def _newname(template="Thread-%d"):
621    return template % _counter()
622
623# Active thread administration
624_active_limbo_lock = _allocate_lock()
625_active = {}    # maps thread id to Thread object
626_limbo = {}
627
628
629# Main class for threads
630
631class Thread(_Verbose):
632    """A class that represents a thread of control.
633
634    This class can be safely subclassed in a limited fashion.
635
636    """
637    __initialized = False
638    # Need to store a reference to sys.exc_info for printing
639    # out exceptions when a thread tries to use a global var. during interp.
640    # shutdown and thus raises an exception about trying to perform some
641    # operation on/with a NoneType
642    __exc_info = _sys.exc_info
643    # Keep sys.exc_clear too to clear the exception just before
644    # allowing .join() to return.
645    __exc_clear = _sys.exc_clear
646
647    def __init__(self, group=None, target=None, name=None,
648                 args=(), kwargs=None, verbose=None):
649        """This constructor should always be called with keyword arguments. Arguments are:
650
651        *group* should be None; reserved for future extension when a ThreadGroup
652        class is implemented.
653
654        *target* is the callable object to be invoked by the run()
655        method. Defaults to None, meaning nothing is called.
656
657        *name* is the thread name. By default, a unique name is constructed of
658        the form "Thread-N" where N is a small decimal number.
659
660        *args* is the argument tuple for the target invocation. Defaults to ().
661
662        *kwargs* is a dictionary of keyword arguments for the target
663        invocation. Defaults to {}.
664
665        If a subclass overrides the constructor, it must make sure to invoke
666        the base class constructor (Thread.__init__()) before doing anything
667        else to the thread.
668
669"""
670        assert group is None, "group argument must be None for now"
671        _Verbose.__init__(self, verbose)
672        if kwargs is None:
673            kwargs = {}
674        self.__target = target
675        self.__name = str(name or _newname())
676        self.__args = args
677        self.__kwargs = kwargs
678        self.__daemonic = self._set_daemon()
679        self.__ident = None
680        self.__started = Event()
681        self.__stopped = False
682        self.__block = Condition(Lock())
683        self.__initialized = True
684        # sys.stderr is not stored in the class like
685        # sys.exc_info since it can be changed between instances
686        self.__stderr = _sys.stderr
687
688    def _reset_internal_locks(self):
689        # private!  Called by _after_fork() to reset our internal locks as
690        # they may be in an invalid state leading to a deadlock or crash.
691        if hasattr(self, '_Thread__block'):  # DummyThread deletes self.__block
692            self.__block.__init__()
693        self.__started._reset_internal_locks()
694
695    @property
696    def _block(self):
697        # used by a unittest
698        return self.__block
699
700    def _set_daemon(self):
701        # Overridden in _MainThread and _DummyThread
702        return current_thread().daemon
703
704    def __repr__(self):
705        assert self.__initialized, "Thread.__init__() was not called"
706        status = "initial"
707        if self.__started.is_set():
708            status = "started"
709        if self.__stopped:
710            status = "stopped"
711        if self.__daemonic:
712            status += " daemon"
713        if self.__ident is not None:
714            status += " %s" % self.__ident
715        return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
716
717    def start(self):
718        """Start the thread's activity.
719
720        It must be called at most once per thread object. It arranges for the
721        object's run() method to be invoked in a separate thread of control.
722
723        This method will raise a RuntimeError if called more than once on the
724        same thread object.
725
726        """
727        if not self.__initialized:
728            raise RuntimeError("thread.__init__() not called")
729        if self.__started.is_set():
730            raise RuntimeError("threads can only be started once")
731        if __debug__:
732            self._note("%s.start(): starting thread", self)
733        with _active_limbo_lock:
734            _limbo[self] = self
735        try:
736            _start_new_thread(self.__bootstrap, ())
737        except Exception:
738            with _active_limbo_lock:
739                del _limbo[self]
740            raise
741        self.__started.wait()
742
743    def run(self):
744        """Method representing the thread's activity.
745
746        You may override this method in a subclass. The standard run() method
747        invokes the callable object passed to the object's constructor as the
748        target argument, if any, with sequential and keyword arguments taken
749        from the args and kwargs arguments, respectively.
750
751        """
752        try:
753            if self.__target:
754                self.__target(*self.__args, **self.__kwargs)
755        finally:
756            # Avoid a refcycle if the thread is running a function with
757            # an argument that has a member that points to the thread.
758            del self.__target, self.__args, self.__kwargs
759
760    def __bootstrap(self):
761        # Wrapper around the real bootstrap code that ignores
762        # exceptions during interpreter cleanup.  Those typically
763        # happen when a daemon thread wakes up at an unfortunate
764        # moment, finds the world around it destroyed, and raises some
765        # random exception *** while trying to report the exception in
766        # __bootstrap_inner() below ***.  Those random exceptions
767        # don't help anybody, and they confuse users, so we suppress
768        # them.  We suppress them only when it appears that the world
769        # indeed has already been destroyed, so that exceptions in
770        # __bootstrap_inner() during normal business hours are properly
771        # reported.  Also, we only suppress them for daemonic threads;
772        # if a non-daemonic encounters this, something else is wrong.
773        try:
774            self.__bootstrap_inner()
775        except:
776            if self.__daemonic and _sys is None:
777                return
778            raise
779
780    def _set_ident(self):
781        self.__ident = _get_ident()
782
783    def __bootstrap_inner(self):
784        try:
785            self._set_ident()
786            self.__started.set()
787            with _active_limbo_lock:
788                _active[self.__ident] = self
789                del _limbo[self]
790            if __debug__:
791                self._note("%s.__bootstrap(): thread started", self)
792
793            if _trace_hook:
794                self._note("%s.__bootstrap(): registering trace hook", self)
795                _sys.settrace(_trace_hook)
796            if _profile_hook:
797                self._note("%s.__bootstrap(): registering profile hook", self)
798                _sys.setprofile(_profile_hook)
799
800            try:
801                self.run()
802            except SystemExit:
803                if __debug__:
804                    self._note("%s.__bootstrap(): raised SystemExit", self)
805            except:
806                if __debug__:
807                    self._note("%s.__bootstrap(): unhandled exception", self)
808                # If sys.stderr is no more (most likely from interpreter
809                # shutdown) use self.__stderr.  Otherwise still use sys (as in
810                # _sys) in case sys.stderr was redefined since the creation of
811                # self.
812                if _sys and _sys.stderr is not None:
813                    print>>_sys.stderr, ("Exception in thread %s:\n%s" %
814                                         (self.name, _format_exc()))
815                elif self.__stderr is not None:
816                    # Do the best job possible w/o a huge amt. of code to
817                    # approximate a traceback (code ideas from
818                    # Lib/traceback.py)
819                    exc_type, exc_value, exc_tb = self.__exc_info()
820                    try:
821                        print>>self.__stderr, (
822                            "Exception in thread " + self.name +
823                            " (most likely raised during interpreter shutdown):")
824                        print>>self.__stderr, (
825                            "Traceback (most recent call last):")
826                        while exc_tb:
827                            print>>self.__stderr, (
828                                '  File "%s", line %s, in %s' %
829                                (exc_tb.tb_frame.f_code.co_filename,
830                                    exc_tb.tb_lineno,
831                                    exc_tb.tb_frame.f_code.co_name))
832                            exc_tb = exc_tb.tb_next
833                        print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
834                    # Make sure that exc_tb gets deleted since it is a memory
835                    # hog; deleting everything else is just for thoroughness
836                    finally:
837                        del exc_type, exc_value, exc_tb
838            else:
839                if __debug__:
840                    self._note("%s.__bootstrap(): normal return", self)
841            finally:
842                # Prevent a race in
843                # test_threading.test_no_refcycle_through_target when
844                # the exception keeps the target alive past when we
845                # assert that it's dead.
846                self.__exc_clear()
847        finally:
848            with _active_limbo_lock:
849                self.__stop()
850                try:
851                    # We don't call self.__delete() because it also
852                    # grabs _active_limbo_lock.
853                    del _active[_get_ident()]
854                except:
855                    pass
856
857    def __stop(self):
858        # DummyThreads delete self.__block, but they have no waiters to
859        # notify anyway (join() is forbidden on them).
860        if not hasattr(self, '_Thread__block'):
861            return
862        self.__block.acquire()
863        self.__stopped = True
864        self.__block.notify_all()
865        self.__block.release()
866
867    def __delete(self):
868        "Remove current thread from the dict of currently running threads."
869
870        # Notes about running with dummy_thread:
871        #
872        # Must take care to not raise an exception if dummy_thread is being
873        # used (and thus this module is being used as an instance of
874        # dummy_threading).  dummy_thread.get_ident() always returns -1 since
875        # there is only one thread if dummy_thread is being used.  Thus
876        # len(_active) is always <= 1 here, and any Thread instance created
877        # overwrites the (if any) thread currently registered in _active.
878        #
879        # An instance of _MainThread is always created by 'threading'.  This
880        # gets overwritten the instant an instance of Thread is created; both
881        # threads return -1 from dummy_thread.get_ident() and thus have the
882        # same key in the dict.  So when the _MainThread instance created by
883        # 'threading' tries to clean itself up when atexit calls this method
884        # it gets a KeyError if another Thread instance was created.
885        #
886        # This all means that KeyError from trying to delete something from
887        # _active if dummy_threading is being used is a red herring.  But
888        # since it isn't if dummy_threading is *not* being used then don't
889        # hide the exception.
890
891        try:
892            with _active_limbo_lock:
893                del _active[_get_ident()]
894                # There must not be any python code between the previous line
895                # and after the lock is released.  Otherwise a tracing function
896                # could try to acquire the lock again in the same thread, (in
897                # current_thread()), and would block.
898        except KeyError:
899            if 'dummy_threading' not in _sys.modules:
900                raise
901
902    def join(self, timeout=None):
903        """Wait until the thread terminates.
904
905        This blocks the calling thread until the thread whose join() method is
906        called terminates -- either normally or through an unhandled exception
907        or until the optional timeout occurs.
908
909        When the timeout argument is present and not None, it should be a
910        floating point number specifying a timeout for the operation in seconds
911        (or fractions thereof). As join() always returns None, you must call
912        isAlive() after join() to decide whether a timeout happened -- if the
913        thread is still alive, the join() call timed out.
914
915        When the timeout argument is not present or None, the operation will
916        block until the thread terminates.
917
918        A thread can be join()ed many times.
919
920        join() raises a RuntimeError if an attempt is made to join the current
921        thread as that would cause a deadlock. It is also an error to join() a
922        thread before it has been started and attempts to do so raises the same
923        exception.
924
925        """
926        if not self.__initialized:
927            raise RuntimeError("Thread.__init__() not called")
928        if not self.__started.is_set():
929            raise RuntimeError("cannot join thread before it is started")
930        if self is current_thread():
931            raise RuntimeError("cannot join current thread")
932
933        if __debug__:
934            if not self.__stopped:
935                self._note("%s.join(): waiting until thread stops", self)
936        self.__block.acquire()
937        try:
938            if timeout is None:
939                while not self.__stopped:
940                    self.__block.wait()
941                if __debug__:
942                    self._note("%s.join(): thread stopped", self)
943            else:
944                deadline = _time() + timeout
945                while not self.__stopped:
946                    delay = deadline - _time()
947                    if delay <= 0:
948                        if __debug__:
949                            self._note("%s.join(): timed out", self)
950                        break
951                    self.__block.wait(delay)
952                else:
953                    if __debug__:
954                        self._note("%s.join(): thread stopped", self)
955        finally:
956            self.__block.release()
957
958    @property
959    def name(self):
960        """A string used for identification purposes only.
961
962        It has no semantics. Multiple threads may be given the same name. The
963        initial name is set by the constructor.
964
965        """
966        assert self.__initialized, "Thread.__init__() not called"
967        return self.__name
968
969    @name.setter
970    def name(self, name):
971        assert self.__initialized, "Thread.__init__() not called"
972        self.__name = str(name)
973
974    @property
975    def ident(self):
976        """Thread identifier of this thread or None if it has not been started.
977
978        This is a nonzero integer. See the thread.get_ident() function. Thread
979        identifiers may be recycled when a thread exits and another thread is
980        created. The identifier is available even after the thread has exited.
981
982        """
983        assert self.__initialized, "Thread.__init__() not called"
984        return self.__ident
985
986    def isAlive(self):
987        """Return whether the thread is alive.
988
989        This method returns True just before the run() method starts until just
990        after the run() method terminates. The module function enumerate()
991        returns a list of all alive threads.
992
993        """
994        assert self.__initialized, "Thread.__init__() not called"
995        return self.__started.is_set() and not self.__stopped
996
997    is_alive = isAlive
998
999    @property
1000    def daemon(self):
1001        """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
1002
1003        This must be set before start() is called, otherwise RuntimeError is
1004        raised. Its initial value is inherited from the creating thread; the
1005        main thread is not a daemon thread and therefore all threads created in
1006        the main thread default to daemon = False.
1007
1008        The entire Python program exits when only daemon threads are left.
1009
1010        """
1011        assert self.__initialized, "Thread.__init__() not called"
1012        return self.__daemonic
1013
1014    @daemon.setter
1015    def daemon(self, daemonic):
1016        if not self.__initialized:
1017            raise RuntimeError("Thread.__init__() not called")
1018        if self.__started.is_set():
1019            raise RuntimeError("cannot set daemon status of active thread");
1020        self.__daemonic = daemonic
1021
1022    def isDaemon(self):
1023        return self.daemon
1024
1025    def setDaemon(self, daemonic):
1026        self.daemon = daemonic
1027
1028    def getName(self):
1029        return self.name
1030
1031    def setName(self, name):
1032        self.name = name
1033
1034# The timer class was contributed by Itamar Shtull-Trauring
1035
1036def Timer(*args, **kwargs):
1037    """Factory function to create a Timer object.
1038
1039    Timers call a function after a specified number of seconds:
1040
1041        t = Timer(30.0, f, args=[], kwargs={})
1042        t.start()
1043        t.cancel()     # stop the timer's action if it's still waiting
1044
1045    """
1046    return _Timer(*args, **kwargs)
1047
1048class _Timer(Thread):
1049    """Call a function after a specified number of seconds:
1050
1051            t = Timer(30.0, f, args=[], kwargs={})
1052            t.start()
1053            t.cancel()     # stop the timer's action if it's still waiting
1054
1055    """
1056
1057    def __init__(self, interval, function, args=[], kwargs={}):
1058        Thread.__init__(self)
1059        self.interval = interval
1060        self.function = function
1061        self.args = args
1062        self.kwargs = kwargs
1063        self.finished = Event()
1064
1065    def cancel(self):
1066        """Stop the timer if it hasn't finished yet"""
1067        self.finished.set()
1068
1069    def run(self):
1070        self.finished.wait(self.interval)
1071        if not self.finished.is_set():
1072            self.function(*self.args, **self.kwargs)
1073        self.finished.set()
1074
1075# Special thread class to represent the main thread
1076# This is garbage collected through an exit handler
1077
1078class _MainThread(Thread):
1079
1080    def __init__(self):
1081        Thread.__init__(self, name="MainThread")
1082        self._Thread__started.set()
1083        self._set_ident()
1084        with _active_limbo_lock:
1085            _active[_get_ident()] = self
1086
1087    def _set_daemon(self):
1088        return False
1089
1090    def _exitfunc(self):
1091        self._Thread__stop()
1092        t = _pickSomeNonDaemonThread()
1093        if t:
1094            if __debug__:
1095                self._note("%s: waiting for other threads", self)
1096        while t:
1097            t.join()
1098            t = _pickSomeNonDaemonThread()
1099        if __debug__:
1100            self._note("%s: exiting", self)
1101        self._Thread__delete()
1102
1103def _pickSomeNonDaemonThread():
1104    for t in enumerate():
1105        if not t.daemon and t.is_alive():
1106            return t
1107    return None
1108
1109
1110# Dummy thread class to represent threads not started here.
1111# These aren't garbage collected when they die, nor can they be waited for.
1112# If they invoke anything in threading.py that calls current_thread(), they
1113# leave an entry in the _active dict forever after.
1114# Their purpose is to return *something* from current_thread().
1115# They are marked as daemon threads so we won't wait for them
1116# when we exit (conform previous semantics).
1117
1118class _DummyThread(Thread):
1119
1120    def __init__(self):
1121        Thread.__init__(self, name=_newname("Dummy-%d"))
1122
1123        # Thread.__block consumes an OS-level locking primitive, which
1124        # can never be used by a _DummyThread.  Since a _DummyThread
1125        # instance is immortal, that's bad, so release this resource.
1126        del self._Thread__block
1127
1128        self._Thread__started.set()
1129        self._set_ident()
1130        with _active_limbo_lock:
1131            _active[_get_ident()] = self
1132
1133    def _set_daemon(self):
1134        return True
1135
1136    def join(self, timeout=None):
1137        assert False, "cannot join a dummy thread"
1138
1139
1140# Global API functions
1141
1142def currentThread():
1143    """Return the current Thread object, corresponding to the caller's thread of control.
1144
1145    If the caller's thread of control was not created through the threading
1146    module, a dummy thread object with limited functionality is returned.
1147
1148    """
1149    try:
1150        return _active[_get_ident()]
1151    except KeyError:
1152        ##print "current_thread(): no current thread for", _get_ident()
1153        return _DummyThread()
1154
1155current_thread = currentThread
1156
1157def activeCount():
1158    """Return the number of Thread objects currently alive.
1159
1160    The returned count is equal to the length of the list returned by
1161    enumerate().
1162
1163    """
1164    with _active_limbo_lock:
1165        return len(_active) + len(_limbo)
1166
1167active_count = activeCount
1168
1169def _enumerate():
1170    # Same as enumerate(), but without the lock. Internal use only.
1171    return _active.values() + _limbo.values()
1172
1173def enumerate():
1174    """Return a list of all Thread objects currently alive.
1175
1176    The list includes daemonic threads, dummy thread objects created by
1177    current_thread(), and the main thread. It excludes terminated threads and
1178    threads that have not yet been started.
1179
1180    """
1181    with _active_limbo_lock:
1182        return _active.values() + _limbo.values()
1183
1184from thread import stack_size
1185
1186# Create the main thread object,
1187# and make it available for the interpreter
1188# (Py_Main) as threading._shutdown.
1189
1190_shutdown = _MainThread()._exitfunc
1191
1192# get thread-local implementation, either from the thread
1193# module, or from the python fallback
1194
1195try:
1196    from thread import _local as local
1197except ImportError:
1198    from _threading_local import local
1199
1200
1201def _after_fork():
1202    # This function is called by Python/ceval.c:PyEval_ReInitThreads which
1203    # is called from PyOS_AfterFork.  Here we cleanup threading module state
1204    # that should not exist after a fork.
1205
1206    # Reset _active_limbo_lock, in case we forked while the lock was held
1207    # by another (non-forked) thread.  http://bugs.python.org/issue874900
1208    global _active_limbo_lock
1209    _active_limbo_lock = _allocate_lock()
1210
1211    # fork() only copied the current thread; clear references to others.
1212    new_active = {}
1213    current = current_thread()
1214    with _active_limbo_lock:
1215        for thread in _enumerate():
1216            # Any lock/condition variable may be currently locked or in an
1217            # invalid state, so we reinitialize them.
1218            if hasattr(thread, '_reset_internal_locks'):
1219                thread._reset_internal_locks()
1220            if thread is current:
1221                # There is only one active thread. We reset the ident to
1222                # its new value since it can have changed.
1223                ident = _get_ident()
1224                thread._Thread__ident = ident
1225                new_active[ident] = thread
1226            else:
1227                # All the others are already stopped.
1228                thread._Thread__stop()
1229
1230        _limbo.clear()
1231        _active.clear()
1232        _active.update(new_active)
1233        assert len(_active) == 1
1234
1235
1236# Self-test code
1237
1238def _test():
1239
1240    class BoundedQueue(_Verbose):
1241
1242        def __init__(self, limit):
1243            _Verbose.__init__(self)
1244            self.mon = RLock()
1245            self.rc = Condition(self.mon)
1246            self.wc = Condition(self.mon)
1247            self.limit = limit
1248            self.queue = _deque()
1249
1250        def put(self, item):
1251            self.mon.acquire()
1252            while len(self.queue) >= self.limit:
1253                self._note("put(%s): queue full", item)
1254                self.wc.wait()
1255            self.queue.append(item)
1256            self._note("put(%s): appended, length now %d",
1257                       item, len(self.queue))
1258            self.rc.notify()
1259            self.mon.release()
1260
1261        def get(self):
1262            self.mon.acquire()
1263            while not self.queue:
1264                self._note("get(): queue empty")
1265                self.rc.wait()
1266            item = self.queue.popleft()
1267            self._note("get(): got %s, %d left", item, len(self.queue))
1268            self.wc.notify()
1269            self.mon.release()
1270            return item
1271
1272    class ProducerThread(Thread):
1273
1274        def __init__(self, queue, quota):
1275            Thread.__init__(self, name="Producer")
1276            self.queue = queue
1277            self.quota = quota
1278
1279        def run(self):
1280            from random import random
1281            counter = 0
1282            while counter < self.quota:
1283                counter = counter + 1
1284                self.queue.put("%s.%d" % (self.name, counter))
1285                _sleep(random() * 0.00001)
1286
1287
1288    class ConsumerThread(Thread):
1289
1290        def __init__(self, queue, count):
1291            Thread.__init__(self, name="Consumer")
1292            self.queue = queue
1293            self.count = count
1294
1295        def run(self):
1296            while self.count > 0:
1297                item = self.queue.get()
1298                print item
1299                self.count = self.count - 1
1300
1301    NP = 3
1302    QL = 4
1303    NI = 5
1304
1305    Q = BoundedQueue(QL)
1306    P = []
1307    for i in range(NP):
1308        t = ProducerThread(Q, NI)
1309        t.name = ("Producer-%d" % (i+1))
1310        P.append(t)
1311    C = ConsumerThread(Q, NI*NP)
1312    for t in P:
1313        t.start()
1314        _sleep(0.000001)
1315    C.start()
1316    for t in P:
1317        t.join()
1318    C.join()
1319
1320if __name__ == '__main__':
1321    _test()
1322