1from java.lang import IllegalThreadStateException, InterruptedException
2from java.util import Collections, WeakHashMap
3from java.util.concurrent import Semaphore, CyclicBarrier
4from java.util.concurrent.locks import ReentrantLock
5from org.python.util import jython
6from org.python.core import Py
7from thread import _newFunctionThread
8from thread import _local as local
9from _threading import Lock, RLock, Condition, _Lock, _RLock, _threads, _active, _jthread_to_pythread, _register_thread, _unregister_thread
10import java.lang.Thread
11import sys as _sys
12from traceback import print_exc as _print_exc
13
14# Rename some stuff so "from threading import *" is safe
15__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
16           'current_thread', 'enumerate', 'Event',
17           'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
18           'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
19
20_VERBOSE = False
21
22if __debug__:
23
24    class _Verbose(object):
25
26        def __init__(self, verbose=None):
27            if verbose is None:
28                verbose = _VERBOSE
29            self.__verbose = verbose
30
31        def _note(self, format, *args):
32            if self.__verbose:
33                format = format % args
34                format = "%s: %s\n" % (
35                    currentThread().getName(), format)
36                _sys.stderr.write(format)
37
38else:
39    # Disable this when using "python -O"
40    class _Verbose(object):
41        def __init__(self, verbose=None):
42            pass
43        def _note(self, *args):
44            pass
45
46# Support for profile and trace hooks
47
48_profile_hook = None
49_trace_hook = None
50
51def setprofile(func):
52    global _profile_hook
53    _profile_hook = func
54
55def settrace(func):
56    global _trace_hook
57    _trace_hook = func
58
59
60class Semaphore(object):
61    def __init__(self, value=1):
62        if value < 0:
63            raise ValueError("Semaphore initial value must be >= 0")
64        self._semaphore = java.util.concurrent.Semaphore(value)
65
66    def acquire(self, blocking=True):
67        if blocking:
68            self._semaphore.acquire()
69            return True
70        else:
71            return self._semaphore.tryAcquire()
72
73    def __enter__(self):
74        self.acquire()
75        return self
76
77    def release(self):
78        self._semaphore.release()
79
80    def __exit__(self, t, v, tb):
81        self.release()
82
83
84ThreadStates = {
85    java.lang.Thread.State.NEW : 'initial',
86    java.lang.Thread.State.RUNNABLE: 'started',
87    java.lang.Thread.State.BLOCKED: 'started',
88    java.lang.Thread.State.WAITING: 'started',
89    java.lang.Thread.State.TIMED_WAITING: 'started',
90    java.lang.Thread.State.TERMINATED: 'stopped',
91}
92
93class JavaThread(object):
94    def __init__(self, thread):
95        self._thread = thread
96        _register_thread(thread, self)
97
98    def __repr__(self):
99        _thread = self._thread
100        status = ThreadStates[_thread.getState()]
101        if _thread.isDaemon(): status + " daemon"
102        return "<%s(%s, %s %s)>" % (self.__class__.__name__, self.getName(), status, self.ident)
103
104    def __eq__(self, other):
105        if isinstance(other, JavaThread):
106            return self._thread == other._thread
107        else:
108            return False
109
110    def __ne__(self, other):
111        return not self.__eq__(other)
112
113    def start(self):
114        try:
115            self._thread.start()
116        except IllegalThreadStateException:
117            raise RuntimeError("threads can only be started once")
118
119    def run(self):
120        self._thread.run()
121
122    def join(self, timeout=None):
123        if self._thread == java.lang.Thread.currentThread():
124            raise RuntimeError("cannot join current thread")
125        elif self._thread.getState() == java.lang.Thread.State.NEW:
126            raise RuntimeError("cannot join thread before it is started")
127        if timeout:
128            millis = timeout * 1000.
129            millis_int = int(millis)
130            nanos = int((millis - millis_int) * 1e6)
131            self._thread.join(millis_int, nanos)
132        else:
133            self._thread.join()
134
135    def ident(self):
136        return self._thread.getId()
137
138    ident = property(ident)
139
140    def getName(self):
141        return self._thread.getName()
142
143    def setName(self, name):
144        self._thread.setName(str(name))
145
146    name = property(getName, setName)
147
148    def isAlive(self):
149        return self._thread.isAlive()
150
151    is_alive = isAlive
152
153    def isDaemon(self):
154        return self._thread.isDaemon()
155
156    def setDaemon(self, daemonic):
157        if self._thread.getState() != java.lang.Thread.State.NEW:
158            # thread could in fact be dead... Python uses the same error
159            raise RuntimeError("cannot set daemon status of active thread")
160        try:
161            self._thread.setDaemon(bool(daemonic))
162        except IllegalThreadStateException:
163            # changing daemonization only makes sense in Java when the
164            # thread is alive; need extra test on the exception
165            # because of possible races on interrogating with getState
166            raise RuntimeError("cannot set daemon status of active thread")
167
168    daemon = property(isDaemon, setDaemon)
169
170    def __tojava__(self, c):
171        if isinstance(self._thread, c):
172            return self._thread
173        if isinstance(self, c):
174            return self
175        return Py.NoConversion
176
177
178class Thread(JavaThread):
179    def __init__(self, group=None, target=None, name=None, args=None, kwargs=None):
180        assert group is None, "group argument must be None for now"
181        _thread = self._create_thread()
182        JavaThread.__init__(self, _thread)
183        if args is None:
184            args = ()
185        if kwargs is None:
186            kwargs = {}
187        self._target = target
188        self._args = args
189        self._kwargs = kwargs
190        if name:
191            self._thread.setName(str(name))
192
193    def _create_thread(self):
194        return _newFunctionThread(self.__bootstrap, ())
195
196    def run(self):
197        if self._target:
198            self._target(*self._args, **self._kwargs)
199
200    def __bootstrap(self):
201        try:
202            if _trace_hook:
203                _sys.settrace(_trace_hook)
204            if _profile_hook:
205                _sys.setprofile(_profile_hook)
206            try:
207                self.run()
208            except SystemExit:
209                pass
210            except InterruptedException:
211                # Quiet InterruptedExceptions if they're caused by
212                # _systemrestart
213                if not jython.shouldRestart:
214                    raise
215            except:
216                # If sys.stderr is no more (most likely from interpreter
217                # shutdown) use self.__stderr.  Otherwise still use sys (as in
218                # _sys) in case sys.stderr was redefined.
219                if _sys:
220                    _sys.stderr.write("Exception in thread %s:" %
221                            self.getName())
222                    _print_exc(file=_sys.stderr)
223                else:
224                    # Do the best job possible w/o a huge amt. of code to
225                    # approx. a traceback stack trace
226                    exc_type, exc_value, exc_tb = self.__exc_info()
227                    try:
228                        print>>self.__stderr, (
229                            "Exception in thread " + self.getName() +
230                            " (most likely raised during interpreter shutdown):")
231                        print>>self.__stderr, (
232                            "Traceback (most recent call last):")
233                        while exc_tb:
234                            print>>self.__stderr, (
235                                '  File "%s", line %s, in %s' %
236                                (exc_tb.tb_frame.f_code.co_filename,
237                                    exc_tb.tb_lineno,
238                                    exc_tb.tb_frame.f_code.co_name))
239                            exc_tb = exc_tb.tb_next
240                        print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
241                    # Make sure that exc_tb gets deleted since it is a memory
242                    # hog; deleting everything else is just for thoroughness
243                    finally:
244                        del exc_type, exc_value, exc_tb
245
246        finally:
247            self.__stop()
248            try:
249                self.__delete()
250            except:
251                pass
252
253    def __stop(self):
254        pass
255
256    def __delete(self):
257        _unregister_thread(self._thread)
258
259
260class _MainThread(Thread):
261    def __init__(self):
262        Thread.__init__(self, name="MainThread")
263        import atexit
264        atexit.register(self.__exitfunc)
265
266    def _create_thread(self):
267        return java.lang.Thread.currentThread()
268
269    def _set_daemon(self):
270        return False
271
272    def __exitfunc(self):
273        _unregister_thread(self._thread)
274        t = _pickSomeNonDaemonThread()
275        while t:
276            t.join()
277            t = _pickSomeNonDaemonThread()
278
279def _pickSomeNonDaemonThread():
280    for t in enumerate():
281        if not t.isDaemon() and t.isAlive():
282            return t
283    return None
284
285def currentThread():
286    jthread = java.lang.Thread.currentThread()
287    pythread = _jthread_to_pythread[jthread]
288    if pythread is None:
289        pythread = JavaThread(jthread)
290    return pythread
291
292current_thread = currentThread
293
294def activeCount():
295    return len(_threads)
296
297active_count = activeCount
298
299def enumerate():
300    return _threads.values()
301
302from thread import stack_size
303
304
305_MainThread()
306
307
308######################################################################
309# pure Python code from CPythonLib/threading.py
310
311# The timer class was contributed by Itamar Shtull-Trauring
312
313def Timer(*args, **kwargs):
314    return _Timer(*args, **kwargs)
315
316class _Timer(Thread):
317    """Call a function after a specified number of seconds:
318
319    t = Timer(30.0, f, args=[], kwargs={})
320    t.start()
321    t.cancel() # stop the timer's action if it's still waiting
322    """
323
324    def __init__(self, interval, function, args=[], kwargs={}):
325        Thread.__init__(self)
326        self.interval = interval
327        self.function = function
328        self.args = args
329        self.kwargs = kwargs
330        self.finished = Event()
331
332    def cancel(self):
333        """Stop the timer if it hasn't finished yet"""
334        self.finished.set()
335
336    def run(self):
337        self.finished.wait(self.interval)
338        if not self.finished.isSet():
339            self.function(*self.args, **self.kwargs)
340        self.finished.set()
341
342
343# NOT USED except by BoundedSemaphore
344class _Semaphore(_Verbose):
345
346    # After Tim Peters' semaphore class, but not quite the same (no maximum)
347
348    def __init__(self, value=1, verbose=None):
349        if value < 0:
350            raise ValueError("Semaphore initial value must be >= 0")
351        _Verbose.__init__(self, verbose)
352        self.__cond = Condition(Lock())
353        self.__value = value
354
355    def acquire(self, blocking=1):
356        rc = False
357        self.__cond.acquire()
358        while self.__value == 0:
359            if not blocking:
360                break
361            if __debug__:
362                self._note("%s.acquire(%s): blocked waiting, value=%s",
363                           self, blocking, self.__value)
364            self.__cond.wait()
365        else:
366            self.__value = self.__value - 1
367            if __debug__:
368                self._note("%s.acquire: success, value=%s",
369                           self, self.__value)
370            rc = True
371        self.__cond.release()
372        return rc
373
374    def release(self):
375        self.__cond.acquire()
376        self.__value = self.__value + 1
377        if __debug__:
378            self._note("%s.release: success, value=%s",
379                       self, self.__value)
380        self.__cond.notify()
381        self.__cond.release()
382
383
384def BoundedSemaphore(*args, **kwargs):
385    return _BoundedSemaphore(*args, **kwargs)
386
387class _BoundedSemaphore(_Semaphore):
388    """Semaphore that checks that # releases is <= # acquires"""
389    def __init__(self, value=1, verbose=None):
390        _Semaphore.__init__(self, value, verbose)
391        self._initial_value = value
392
393    def __enter__(self):
394        self.acquire()
395        return self
396
397    def release(self):
398        if self._Semaphore__value >= self._initial_value:
399            raise ValueError, "Semaphore released too many times"
400        return _Semaphore.release(self)
401
402    def __exit__(self, t, v, tb):
403        self.release()
404
405
406def Event(*args, **kwargs):
407    return _Event(*args, **kwargs)
408
409class _Event(_Verbose):
410
411    # After Tim Peters' event class (without is_posted())
412
413    def __init__(self, verbose=None):
414        _Verbose.__init__(self, verbose)
415        self.__cond = Condition(Lock())
416        self.__flag = False
417
418    def isSet(self):
419        return self.__flag
420
421    is_set = isSet
422
423    def set(self):
424        self.__cond.acquire()
425        try:
426            self.__flag = True
427            self.__cond.notifyAll()
428        finally:
429            self.__cond.release()
430
431    def clear(self):
432        self.__cond.acquire()
433        try:
434            self.__flag = False
435        finally:
436            self.__cond.release()
437
438    def wait(self, timeout=None):
439        self.__cond.acquire()
440        try:
441            if not self.__flag:
442                self.__cond.wait(timeout)
443            # Issue 2005: Since CPython 2.7, threading.Event.wait(timeout) returns boolean.
444            # The function should return False if timeout is reached before the event is set.
445            return self.__flag
446        finally:
447            self.__cond.release()
448