1# 2# Module implementing synchronization primitives 3# 4# processing/synchronize.py 5# 6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt 7# 8 9__all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 10 'Event'] 11 12import threading 13import os 14import sys 15 16from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize 17from time import time as _time, sleep as _sleep 18 19from processing import _processing 20from processing.process import currentProcess, _registerAfterFork 21from processing.logger import debug 22from processing.finalize import Finalize 23from processing.forking import assertSpawning 24 25# 26# Constants to describe the kind of blocker (normal lock is a bounded sem) 27# 28 29RECURSIVE_MUTEX, SEMAPHORE, BOUNDED_SEMAPHORE = range(3) 30 31# 32# Base class for semaphores and mutexes; wraps `_processing.SemLock` 33# 34 35class SemLock(object): 36 37 def __init__(self, kind, value): 38 sl = self._semlock = _processing.SemLock(kind, value) 39 debug('created semlock with handle %s' % sl.handle) 40 self.__setstate__((sl.handle, sl.kind, sl.maxvalue)) 41 42 if sys.platform != 'win32': 43 def _afterFork(obj): 44 obj._semlock._afterFork() 45 _registerAfterFork(self, _afterFork) 46 47 def __getstate__(self): 48 assertSpawning(self) 49 return self._state 50 51 def __setstate__(self, state): 52 self._state = state 53 if not hasattr(self, '_semlock'): 54 self._semlock = _processing.SemLock._rebuild(*state) 55 debug('recreated blocker with handle %r' % state[0]) 56 57 self.acquire = self._semlock.acquire 58 self.release = self._semlock.release 59 self.__enter__ = self._semlock.__enter__ 60 self.__exit__ = self._semlock.__exit__ 61 62# 63# Semaphore 64# 65 66class Semaphore(SemLock): 67 68 def __init__(self, value=1): 69 SemLock.__init__(self, SEMAPHORE, value) 70 71 def getValue(self): 72 return self._semlock._getValue() 73 74 def __repr__(self): 75 try: 76 value = self._semlock._getValue() 77 except (KeyboardInterrupt, SystemExit): 78 raise 79 except: 80 value = 'unknown' 81 return '<Semaphore(value=%s)>' % value 82 83# 84# Bounded semaphore 85# 86 87class BoundedSemaphore(Semaphore): 88 89 def __init__(self, value=1): 90 SemLock.__init__(self, BOUNDED_SEMAPHORE, value) 91 92 def __repr__(self): 93 try: 94 value = self._semlock._getValue() 95 except (KeyboardInterrupt, SystemExit): 96 raise 97 except: 98 value = 'unknown' 99 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ 100 (value, self._semlock.maxvalue) 101 102# 103# Non-recursive lock 104# 105 106class Lock(SemLock): 107 108 def __init__(self): 109 SemLock.__init__(self, BOUNDED_SEMAPHORE, 1) 110 111 def __repr__(self): 112 try: 113 if self._semlock._isMine(): 114 name = currentProcess().getName() 115 if threading.currentThread().getName() != 'MainThread': 116 name += '|' + threading.currentThread().getName() 117 elif self._semlock._getValue() == 1: 118 name = 'None' 119 elif self._semlock._count() > 0: 120 name = 'SomeOtherThread' 121 else: 122 name = 'SomeOtherProcess' 123 except (KeyboardInterrupt, SystemExit): 124 raise 125 except Exception: 126 name = 'unknown' 127 return '<Lock(owner=%s)>' % name 128 129# 130# Recursive lock 131# 132 133class RLock(SemLock): 134 135 def __init__(self): 136 SemLock.__init__(self, RECURSIVE_MUTEX, 1) 137 138 def __repr__(self): 139 try: 140 if self._semlock._isMine(): 141 name = currentProcess().getName() 142 if threading.currentThread().getName() != 'MainThread': 143 name += '|' + threading.currentThread().getName() 144 count = self._semlock._count() 145 elif self._semlock._getValue() == 1: 146 name, count = 'None', 0 147 elif self._semlock._count() > 0: 148 name, count = 'SomeOtherThread', 'nonzero' 149 else: 150 name, count = 'SomeOtherProcess', 'nonzero' 151 except (KeyboardInterrupt, SystemExit): 152 raise 153 except Exception: 154 name, count = 'unknown', 'unknown' 155 return '<RLock(%s, %s)>' % (name, count) 156 157# 158# Condition variable 159# 160 161class Condition(object): 162 163 def __init__(self, lock=None): 164 state = (lock or RLock(), Semaphore(0), Semaphore(0), Semaphore(0)) 165 self.__setstate__(state) 166 167 def __getstate__(self): 168 assertSpawning(self) 169 return self._state 170 171 def __setstate__(self, state): 172 (self._lock, self._sleeping_count, 173 self._woken_count, self._wait_semaphore) = self._state = state 174 self.acquire = self._lock.acquire 175 self.release = self._lock.release 176 self.__enter__ = self._lock.__enter__ 177 self.__exit__ = self._lock.__exit__ 178 179 def __repr__(self): 180 try: 181 num_waiters = (self._sleeping_count._semlock._getValue() - 182 self._woken_count._semlock._getValue()) 183 except (KeyboardInterrupt, SystemExit): 184 raise 185 except Exception: 186 num_waiters = 'unkown' 187 return '<Condition(%s, %s)>' % (self._lock, num_waiters) 188 189 def wait(self, timeout=None): 190 assert self._lock._semlock._isMine(), \ 191 'must acquire() condition before using wait()' 192 193 # indicate that this thread is going to sleep 194 self._sleeping_count.release() 195 196 # release lock 197 count = self._lock._semlock._count() 198 for i in xrange(count): 199 self._lock.release() 200 201 try: 202 # wait for notification or timeout 203 self._wait_semaphore.acquire(True, timeout) 204 finally: 205 # indicate that this thread has woken 206 self._woken_count.release() 207 208 # reacquire lock 209 for i in xrange(count): 210 self._lock.acquire() 211 212 def notify(self): 213 assert self._lock._semlock._isMine(), 'lock is not owned' 214 assert not self._wait_semaphore.acquire(False) 215 216 # to take account of timeouts since last notify() we subtract 217 # woken_count from sleeping_count and rezero woken_count 218 while self._woken_count.acquire(False): 219 res = self._sleeping_count.acquire(False) 220 assert res 221 222 if self._sleeping_count.acquire(False): # try grabbing a sleeper 223 self._wait_semaphore.release() # wake up one sleeper 224 self._woken_count.acquire() # wait for the sleeper to wake 225 226 # rezero _wait_semaphore in case a timeout just happened 227 self._wait_semaphore.acquire(False) 228 229 def notifyAll(self): 230 assert self._lock._semlock._isMine(), 'lock is not owned' 231 assert not self._wait_semaphore.acquire(False) 232 233 # to take account of timeouts since last notify*() we subtract 234 # woken_count from sleeping_count and rezero woken_count 235 while self._woken_count.acquire(False): 236 res = self._sleeping_count.acquire(False) 237 assert res 238 239 sleepers = 0 240 while self._sleeping_count.acquire(False): 241 self._wait_semaphore.release() # wake up one sleeper 242 sleepers += 1 243 244 if sleepers: 245 for i in xrange(sleepers): 246 self._woken_count.acquire() # wait for a sleeper to wake 247 248 # rezero wait_semaphore in case some timeouts just happened 249 while self._wait_semaphore.acquire(False): 250 pass 251 252# 253# Event 254# 255 256class Event(object): 257 258 def __init__(self): 259 self._cond = Condition(Lock()) 260 self._flag = Semaphore(0) 261 262 def isSet(self): 263 self._cond.acquire() 264 try: 265 if self._flag.acquire(False): 266 self._flag.release() 267 return True 268 return False 269 finally: 270 self._cond.release() 271 272 def set(self): 273 self._cond.acquire() 274 try: 275 self._flag.acquire(False) 276 self._flag.release() 277 self._cond.notifyAll() 278 finally: 279 self._cond.release() 280 281 def clear(self): 282 self._cond.acquire() 283 try: 284 self._flag.acquire(False) 285 finally: 286 self._cond.release() 287 288 def wait(self, timeout=None): 289 self._cond.acquire() 290 try: 291 if self._flag.acquire(False): 292 self._flag.release() 293 else: 294 self._cond.wait(timeout) 295 finally: 296 self._cond.release() 297