1from java.lang import IllegalThreadStateException, InterruptedException 2from java.util import Collections, WeakHashMap 3from java.util.concurrent import Semaphore, CyclicBarrier 4from java.util.concurrent.locks import ReentrantLock 5from org.python.util import jython 6from org.python.core import Py 7from thread import _newFunctionThread 8from thread import _local as local 9from _threading import Lock, RLock, Condition, _Lock, _RLock, _threads, _active, _jthread_to_pythread, _register_thread, _unregister_thread 10import java.lang.Thread 11import sys as _sys 12from traceback import print_exc as _print_exc 13 14# Rename some stuff so "from threading import *" is safe 15__all__ = ['activeCount', 'active_count', 'Condition', 'currentThread', 16 'current_thread', 'enumerate', 'Event', 17 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 18 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] 19 20_VERBOSE = False 21 22if __debug__: 23 24 class _Verbose(object): 25 26 def __init__(self, verbose=None): 27 if verbose is None: 28 verbose = _VERBOSE 29 self.__verbose = verbose 30 31 def _note(self, format, *args): 32 if self.__verbose: 33 format = format % args 34 format = "%s: %s\n" % ( 35 currentThread().getName(), format) 36 _sys.stderr.write(format) 37 38else: 39 # Disable this when using "python -O" 40 class _Verbose(object): 41 def __init__(self, verbose=None): 42 pass 43 def _note(self, *args): 44 pass 45 46# Support for profile and trace hooks 47 48_profile_hook = None 49_trace_hook = None 50 51def setprofile(func): 52 global _profile_hook 53 _profile_hook = func 54 55def settrace(func): 56 global _trace_hook 57 _trace_hook = func 58 59 60class Semaphore(object): 61 def __init__(self, value=1): 62 if value < 0: 63 raise ValueError("Semaphore initial value must be >= 0") 64 self._semaphore = java.util.concurrent.Semaphore(value) 65 66 def acquire(self, blocking=True): 67 if blocking: 68 self._semaphore.acquire() 69 return True 70 else: 71 return self._semaphore.tryAcquire() 72 73 def __enter__(self): 74 self.acquire() 75 return self 76 77 def release(self): 78 self._semaphore.release() 79 80 def __exit__(self, t, v, tb): 81 self.release() 82 83 84ThreadStates = { 85 java.lang.Thread.State.NEW : 'initial', 86 java.lang.Thread.State.RUNNABLE: 'started', 87 java.lang.Thread.State.BLOCKED: 'started', 88 java.lang.Thread.State.WAITING: 'started', 89 java.lang.Thread.State.TIMED_WAITING: 'started', 90 java.lang.Thread.State.TERMINATED: 'stopped', 91} 92 93class JavaThread(object): 94 def __init__(self, thread): 95 self._thread = thread 96 _register_thread(thread, self) 97 98 def __repr__(self): 99 _thread = self._thread 100 status = ThreadStates[_thread.getState()] 101 if _thread.isDaemon(): status + " daemon" 102 return "<%s(%s, %s %s)>" % (self.__class__.__name__, self.getName(), status, self.ident) 103 104 def __eq__(self, other): 105 if isinstance(other, JavaThread): 106 return self._thread == other._thread 107 else: 108 return False 109 110 def __ne__(self, other): 111 return not self.__eq__(other) 112 113 def start(self): 114 try: 115 self._thread.start() 116 except IllegalThreadStateException: 117 raise RuntimeError("threads can only be started once") 118 119 def run(self): 120 self._thread.run() 121 122 def join(self, timeout=None): 123 if self._thread == java.lang.Thread.currentThread(): 124 raise RuntimeError("cannot join current thread") 125 elif self._thread.getState() == java.lang.Thread.State.NEW: 126 raise RuntimeError("cannot join thread before it is started") 127 if timeout: 128 millis = timeout * 1000. 129 millis_int = int(millis) 130 nanos = int((millis - millis_int) * 1e6) 131 self._thread.join(millis_int, nanos) 132 else: 133 self._thread.join() 134 135 def ident(self): 136 return self._thread.getId() 137 138 ident = property(ident) 139 140 def getName(self): 141 return self._thread.getName() 142 143 def setName(self, name): 144 self._thread.setName(str(name)) 145 146 name = property(getName, setName) 147 148 def isAlive(self): 149 return self._thread.isAlive() 150 151 is_alive = isAlive 152 153 def isDaemon(self): 154 return self._thread.isDaemon() 155 156 def setDaemon(self, daemonic): 157 if self._thread.getState() != java.lang.Thread.State.NEW: 158 # thread could in fact be dead... Python uses the same error 159 raise RuntimeError("cannot set daemon status of active thread") 160 try: 161 self._thread.setDaemon(bool(daemonic)) 162 except IllegalThreadStateException: 163 # changing daemonization only makes sense in Java when the 164 # thread is alive; need extra test on the exception 165 # because of possible races on interrogating with getState 166 raise RuntimeError("cannot set daemon status of active thread") 167 168 daemon = property(isDaemon, setDaemon) 169 170 def __tojava__(self, c): 171 if isinstance(self._thread, c): 172 return self._thread 173 if isinstance(self, c): 174 return self 175 return Py.NoConversion 176 177 178class Thread(JavaThread): 179 def __init__(self, group=None, target=None, name=None, args=None, kwargs=None): 180 assert group is None, "group argument must be None for now" 181 _thread = self._create_thread() 182 JavaThread.__init__(self, _thread) 183 if args is None: 184 args = () 185 if kwargs is None: 186 kwargs = {} 187 self._target = target 188 self._args = args 189 self._kwargs = kwargs 190 if name: 191 self._thread.setName(str(name)) 192 193 def _create_thread(self): 194 return _newFunctionThread(self.__bootstrap, ()) 195 196 def run(self): 197 if self._target: 198 self._target(*self._args, **self._kwargs) 199 200 def __bootstrap(self): 201 try: 202 if _trace_hook: 203 _sys.settrace(_trace_hook) 204 if _profile_hook: 205 _sys.setprofile(_profile_hook) 206 try: 207 self.run() 208 except SystemExit: 209 pass 210 except InterruptedException: 211 # Quiet InterruptedExceptions if they're caused by 212 # _systemrestart 213 if not jython.shouldRestart: 214 raise 215 except: 216 # If sys.stderr is no more (most likely from interpreter 217 # shutdown) use self.__stderr. Otherwise still use sys (as in 218 # _sys) in case sys.stderr was redefined. 219 if _sys: 220 _sys.stderr.write("Exception in thread %s:" % 221 self.getName()) 222 _print_exc(file=_sys.stderr) 223 else: 224 # Do the best job possible w/o a huge amt. of code to 225 # approx. a traceback stack trace 226 exc_type, exc_value, exc_tb = self.__exc_info() 227 try: 228 print>>self.__stderr, ( 229 "Exception in thread " + self.getName() + 230 " (most likely raised during interpreter shutdown):") 231 print>>self.__stderr, ( 232 "Traceback (most recent call last):") 233 while exc_tb: 234 print>>self.__stderr, ( 235 ' File "%s", line %s, in %s' % 236 (exc_tb.tb_frame.f_code.co_filename, 237 exc_tb.tb_lineno, 238 exc_tb.tb_frame.f_code.co_name)) 239 exc_tb = exc_tb.tb_next 240 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) 241 # Make sure that exc_tb gets deleted since it is a memory 242 # hog; deleting everything else is just for thoroughness 243 finally: 244 del exc_type, exc_value, exc_tb 245 246 finally: 247 self.__stop() 248 try: 249 self.__delete() 250 except: 251 pass 252 253 def __stop(self): 254 pass 255 256 def __delete(self): 257 _unregister_thread(self._thread) 258 259 260class _MainThread(Thread): 261 def __init__(self): 262 Thread.__init__(self, name="MainThread") 263 import atexit 264 atexit.register(self.__exitfunc) 265 266 def _create_thread(self): 267 return java.lang.Thread.currentThread() 268 269 def _set_daemon(self): 270 return False 271 272 def __exitfunc(self): 273 _unregister_thread(self._thread) 274 t = _pickSomeNonDaemonThread() 275 while t: 276 t.join() 277 t = _pickSomeNonDaemonThread() 278 279def _pickSomeNonDaemonThread(): 280 for t in enumerate(): 281 if not t.isDaemon() and t.isAlive(): 282 return t 283 return None 284 285def currentThread(): 286 jthread = java.lang.Thread.currentThread() 287 pythread = _jthread_to_pythread[jthread] 288 if pythread is None: 289 pythread = JavaThread(jthread) 290 return pythread 291 292current_thread = currentThread 293 294def activeCount(): 295 return len(_threads) 296 297active_count = activeCount 298 299def enumerate(): 300 return _threads.values() 301 302from thread import stack_size 303 304 305_MainThread() 306 307 308###################################################################### 309# pure Python code from CPythonLib/threading.py 310 311# The timer class was contributed by Itamar Shtull-Trauring 312 313def Timer(*args, **kwargs): 314 return _Timer(*args, **kwargs) 315 316class _Timer(Thread): 317 """Call a function after a specified number of seconds: 318 319 t = Timer(30.0, f, args=[], kwargs={}) 320 t.start() 321 t.cancel() # stop the timer's action if it's still waiting 322 """ 323 324 def __init__(self, interval, function, args=[], kwargs={}): 325 Thread.__init__(self) 326 self.interval = interval 327 self.function = function 328 self.args = args 329 self.kwargs = kwargs 330 self.finished = Event() 331 332 def cancel(self): 333 """Stop the timer if it hasn't finished yet""" 334 self.finished.set() 335 336 def run(self): 337 self.finished.wait(self.interval) 338 if not self.finished.isSet(): 339 self.function(*self.args, **self.kwargs) 340 self.finished.set() 341 342 343# NOT USED except by BoundedSemaphore 344class _Semaphore(_Verbose): 345 346 # After Tim Peters' semaphore class, but not quite the same (no maximum) 347 348 def __init__(self, value=1, verbose=None): 349 if value < 0: 350 raise ValueError("Semaphore initial value must be >= 0") 351 _Verbose.__init__(self, verbose) 352 self.__cond = Condition(Lock()) 353 self.__value = value 354 355 def acquire(self, blocking=1): 356 rc = False 357 self.__cond.acquire() 358 while self.__value == 0: 359 if not blocking: 360 break 361 if __debug__: 362 self._note("%s.acquire(%s): blocked waiting, value=%s", 363 self, blocking, self.__value) 364 self.__cond.wait() 365 else: 366 self.__value = self.__value - 1 367 if __debug__: 368 self._note("%s.acquire: success, value=%s", 369 self, self.__value) 370 rc = True 371 self.__cond.release() 372 return rc 373 374 def release(self): 375 self.__cond.acquire() 376 self.__value = self.__value + 1 377 if __debug__: 378 self._note("%s.release: success, value=%s", 379 self, self.__value) 380 self.__cond.notify() 381 self.__cond.release() 382 383 384def BoundedSemaphore(*args, **kwargs): 385 return _BoundedSemaphore(*args, **kwargs) 386 387class _BoundedSemaphore(_Semaphore): 388 """Semaphore that checks that # releases is <= # acquires""" 389 def __init__(self, value=1, verbose=None): 390 _Semaphore.__init__(self, value, verbose) 391 self._initial_value = value 392 393 def __enter__(self): 394 self.acquire() 395 return self 396 397 def release(self): 398 if self._Semaphore__value >= self._initial_value: 399 raise ValueError, "Semaphore released too many times" 400 return _Semaphore.release(self) 401 402 def __exit__(self, t, v, tb): 403 self.release() 404 405 406def Event(*args, **kwargs): 407 return _Event(*args, **kwargs) 408 409class _Event(_Verbose): 410 411 # After Tim Peters' event class (without is_posted()) 412 413 def __init__(self, verbose=None): 414 _Verbose.__init__(self, verbose) 415 self.__cond = Condition(Lock()) 416 self.__flag = False 417 418 def isSet(self): 419 return self.__flag 420 421 is_set = isSet 422 423 def set(self): 424 self.__cond.acquire() 425 try: 426 self.__flag = True 427 self.__cond.notifyAll() 428 finally: 429 self.__cond.release() 430 431 def clear(self): 432 self.__cond.acquire() 433 try: 434 self.__flag = False 435 finally: 436 self.__cond.release() 437 438 def wait(self, timeout=None): 439 self.__cond.acquire() 440 try: 441 if not self.__flag: 442 self.__cond.wait(timeout) 443 # Issue 2005: Since CPython 2.7, threading.Event.wait(timeout) returns boolean. 444 # The function should return False if timeout is reached before the event is set. 445 return self.__flag 446 finally: 447 self.__cond.release() 448