1#
2# Module implementing synchronization primitives
3#
4# processing/synchronize.py
5#
6# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
7#
8
9__all__ = [ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
10            'Event']
11
12import threading
13import os
14import sys
15
16from struct import pack as _pack, unpack as _unpack, calcsize as _calcsize
17from time import time as _time, sleep as _sleep
18
19from processing import _processing
20from processing.process import currentProcess, _registerAfterFork
21from processing.logger import debug
22from processing.finalize import Finalize
23from processing.forking import assertSpawning
24
25#
26# Constants to describe the kind of blocker (normal lock is a bounded sem)
27#
28
29RECURSIVE_MUTEX, SEMAPHORE, BOUNDED_SEMAPHORE = range(3)
30
31#
32# Base class for semaphores and mutexes; wraps `_processing.SemLock`
33#
34
35class SemLock(object):
36
37    def __init__(self, kind, value):
38        sl = self._semlock = _processing.SemLock(kind, value)
39        debug('created semlock with handle %s' % sl.handle)
40        self.__setstate__((sl.handle, sl.kind, sl.maxvalue))
41
42        if sys.platform != 'win32':
43            def _afterFork(obj):
44                obj._semlock._afterFork()
45            _registerAfterFork(self, _afterFork)
46
47    def __getstate__(self):
48        assertSpawning(self)
49        return self._state
50
51    def __setstate__(self, state):
52        self._state = state
53        if not hasattr(self, '_semlock'):
54            self._semlock = _processing.SemLock._rebuild(*state)
55            debug('recreated blocker with handle %r' % state[0])
56
57        self.acquire = self._semlock.acquire
58        self.release = self._semlock.release
59        self.__enter__ = self._semlock.__enter__
60        self.__exit__ = self._semlock.__exit__
61
62#
63# Semaphore
64#
65
66class Semaphore(SemLock):
67
68    def __init__(self, value=1):
69        SemLock.__init__(self, SEMAPHORE, value)
70
71    def getValue(self):
72        return self._semlock._getValue()
73
74    def __repr__(self):
75        try:
76            value = self._semlock._getValue()
77        except (KeyboardInterrupt, SystemExit):
78            raise
79        except:
80            value = 'unknown'
81        return '<Semaphore(value=%s)>' % value
82
83#
84# Bounded semaphore
85#
86
87class BoundedSemaphore(Semaphore):
88
89    def __init__(self, value=1):
90        SemLock.__init__(self, BOUNDED_SEMAPHORE, value)
91
92    def __repr__(self):
93        try:
94            value = self._semlock._getValue()
95        except (KeyboardInterrupt, SystemExit):
96            raise
97        except:
98            value = 'unknown'
99        return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
100               (value, self._semlock.maxvalue)
101
102#
103# Non-recursive lock
104#
105
106class Lock(SemLock):
107
108    def __init__(self):
109        SemLock.__init__(self, BOUNDED_SEMAPHORE, 1)
110
111    def __repr__(self):
112        try:
113            if self._semlock._isMine():
114                name = currentProcess().getName()
115                if threading.currentThread().getName() != 'MainThread':
116                    name += '|' + threading.currentThread().getName()
117            elif self._semlock._getValue() == 1:
118                name = 'None'
119            elif self._semlock._count() > 0:
120                name = 'SomeOtherThread'
121            else:
122                name = 'SomeOtherProcess'
123        except (KeyboardInterrupt, SystemExit):
124            raise
125        except Exception:
126            name = 'unknown'
127        return '<Lock(owner=%s)>' % name
128
129#
130# Recursive lock
131#
132
133class RLock(SemLock):
134
135    def __init__(self):
136        SemLock.__init__(self, RECURSIVE_MUTEX, 1)
137
138    def __repr__(self):
139        try:
140            if self._semlock._isMine():
141                name = currentProcess().getName()
142                if threading.currentThread().getName() != 'MainThread':
143                    name += '|' + threading.currentThread().getName()
144                count = self._semlock._count()
145            elif self._semlock._getValue() == 1:
146                name, count = 'None', 0
147            elif self._semlock._count() > 0:
148                name, count = 'SomeOtherThread', 'nonzero'
149            else:
150                name, count = 'SomeOtherProcess', 'nonzero'
151        except (KeyboardInterrupt, SystemExit):
152            raise
153        except Exception:
154            name, count = 'unknown', 'unknown'
155        return '<RLock(%s, %s)>' % (name, count)
156
157#
158# Condition variable
159#
160
161class Condition(object):
162
163    def __init__(self, lock=None):
164        state = (lock or RLock(), Semaphore(0), Semaphore(0), Semaphore(0))
165        self.__setstate__(state)
166
167    def __getstate__(self):
168        assertSpawning(self)
169        return self._state
170
171    def __setstate__(self, state):
172        (self._lock, self._sleeping_count,
173         self._woken_count, self._wait_semaphore) = self._state = state
174        self.acquire = self._lock.acquire
175        self.release = self._lock.release
176        self.__enter__ = self._lock.__enter__
177        self.__exit__ = self._lock.__exit__
178
179    def __repr__(self):
180        try:
181            num_waiters = (self._sleeping_count._semlock._getValue() -
182                           self._woken_count._semlock._getValue())
183        except (KeyboardInterrupt, SystemExit):
184            raise
185        except Exception:
186            num_waiters = 'unkown'
187        return '<Condition(%s, %s)>' % (self._lock, num_waiters)
188
189    def wait(self, timeout=None):
190        assert self._lock._semlock._isMine(), \
191               'must acquire() condition before using wait()'
192
193        # indicate that this thread is going to sleep
194        self._sleeping_count.release()
195
196        # release lock
197        count = self._lock._semlock._count()
198        for i in xrange(count):
199            self._lock.release()
200
201        try:
202            # wait for notification or timeout
203            self._wait_semaphore.acquire(True, timeout)
204        finally:
205            # indicate that this thread has woken
206            self._woken_count.release()
207
208            # reacquire lock
209            for i in xrange(count):
210                self._lock.acquire()
211
212    def notify(self):
213        assert self._lock._semlock._isMine(), 'lock is not owned'
214        assert not self._wait_semaphore.acquire(False)
215
216        # to take account of timeouts since last notify() we subtract
217        # woken_count from sleeping_count and rezero woken_count
218        while self._woken_count.acquire(False):
219            res = self._sleeping_count.acquire(False)
220            assert res
221
222        if self._sleeping_count.acquire(False): # try grabbing a sleeper
223            self._wait_semaphore.release()      # wake up one sleeper
224            self._woken_count.acquire()         # wait for the sleeper to wake
225
226            # rezero _wait_semaphore in case a timeout just happened
227            self._wait_semaphore.acquire(False)
228
229    def notifyAll(self):
230        assert self._lock._semlock._isMine(), 'lock is not owned'
231        assert not self._wait_semaphore.acquire(False)
232
233        # to take account of timeouts since last notify*() we subtract
234        # woken_count from sleeping_count and rezero woken_count
235        while self._woken_count.acquire(False):
236            res = self._sleeping_count.acquire(False)
237            assert res
238
239        sleepers = 0
240        while self._sleeping_count.acquire(False):
241            self._wait_semaphore.release()        # wake up one sleeper
242            sleepers += 1
243
244        if sleepers:
245            for i in xrange(sleepers):
246                self._woken_count.acquire()       # wait for a sleeper to wake
247
248            # rezero wait_semaphore in case some timeouts just happened
249            while self._wait_semaphore.acquire(False):
250                pass
251
252#
253# Event
254#
255
256class Event(object):
257
258    def __init__(self):
259        self._cond = Condition(Lock())
260        self._flag = Semaphore(0)
261
262    def isSet(self):
263        self._cond.acquire()
264        try:
265            if self._flag.acquire(False):
266                self._flag.release()
267                return True
268            return False
269        finally:
270            self._cond.release()
271
272    def set(self):
273        self._cond.acquire()
274        try:
275            self._flag.acquire(False)
276            self._flag.release()
277            self._cond.notifyAll()
278        finally:
279            self._cond.release()
280
281    def clear(self):
282        self._cond.acquire()
283        try:
284            self._flag.acquire(False)
285        finally:
286            self._cond.release()
287
288    def wait(self, timeout=None):
289        self._cond.acquire()
290        try:
291            if self._flag.acquire(False):
292                self._flag.release()
293            else:
294                self._cond.wait(timeout)
295        finally:
296            self._cond.release()
297