1from .compat import threading
2
3import logging
4log = logging.getLogger(__name__)
5
6
7class LockError(Exception):
8    pass
9
10
11class ReadWriteMutex(object):
12    """A mutex which allows multiple readers, single writer.
13
14    :class:`.ReadWriteMutex` uses a Python ``threading.Condition``
15    to provide this functionality across threads within a process.
16
17    The Beaker package also contained a file-lock based version
18    of this concept, so that readers/writers could be synchronized
19    across processes with a common filesystem.  A future Dogpile
20    release may include this additional class at some point.
21
22    """
23
24    def __init__(self):
25        # counts how many asynchronous methods are executing
26        self.async = 0
27
28        # pointer to thread that is the current sync operation
29        self.current_sync_operation = None
30
31        # condition object to lock on
32        self.condition = threading.Condition(threading.Lock())
33
34    def acquire_read_lock(self, wait = True):
35        """Acquire the 'read' lock."""
36        self.condition.acquire()
37        try:
38            # see if a synchronous operation is waiting to start
39            # or is already running, in which case we wait (or just
40            # give up and return)
41            if wait:
42                while self.current_sync_operation is not None:
43                    self.condition.wait()
44            else:
45                if self.current_sync_operation is not None:
46                    return False
47
48            self.async += 1
49            log.debug("%s acquired read lock", self)
50        finally:
51            self.condition.release()
52
53        if not wait:
54            return True
55
56    def release_read_lock(self):
57        """Release the 'read' lock."""
58        self.condition.acquire()
59        try:
60            self.async -= 1
61
62            # check if we are the last asynchronous reader thread
63            # out the door.
64            if self.async == 0:
65                # yes. so if a sync operation is waiting, notifyAll to wake
66                # it up
67                if self.current_sync_operation is not None:
68                    self.condition.notifyAll()
69            elif self.async < 0:
70                raise LockError("Synchronizer error - too many "
71                                "release_read_locks called")
72            log.debug("%s released read lock", self)
73        finally:
74            self.condition.release()
75
76    def acquire_write_lock(self, wait = True):
77        """Acquire the 'write' lock."""
78        self.condition.acquire()
79        try:
80            # here, we are not a synchronous reader, and after returning,
81            # assuming waiting or immediate availability, we will be.
82
83            if wait:
84                # if another sync is working, wait
85                while self.current_sync_operation is not None:
86                    self.condition.wait()
87            else:
88                # if another sync is working,
89                # we dont want to wait, so forget it
90                if self.current_sync_operation is not None:
91                    return False
92
93            # establish ourselves as the current sync
94            # this indicates to other read/write operations
95            # that they should wait until this is None again
96            self.current_sync_operation = threading.currentThread()
97
98            # now wait again for asyncs to finish
99            if self.async > 0:
100                if wait:
101                    # wait
102                    self.condition.wait()
103                else:
104                    # we dont want to wait, so forget it
105                    self.current_sync_operation = None
106                    return False
107            log.debug("%s acquired write lock", self)
108        finally:
109            self.condition.release()
110
111        if not wait:
112            return True
113
114    def release_write_lock(self):
115        """Release the 'write' lock."""
116        self.condition.acquire()
117        try:
118            if self.current_sync_operation is not threading.currentThread():
119                raise LockError("Synchronizer error - current thread doesn't "
120                                "have the write lock")
121
122            # reset the current sync operation so
123            # another can get it
124            self.current_sync_operation = None
125
126            # tell everyone to get ready
127            self.condition.notifyAll()
128
129            log.debug("%s released write lock", self)
130        finally:
131            # everyone go !!
132            self.condition.release()
133