1from .compat import threading 2 3import logging 4log = logging.getLogger(__name__) 5 6 7class LockError(Exception): 8 pass 9 10 11class ReadWriteMutex(object): 12 """A mutex which allows multiple readers, single writer. 13 14 :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` 15 to provide this functionality across threads within a process. 16 17 The Beaker package also contained a file-lock based version 18 of this concept, so that readers/writers could be synchronized 19 across processes with a common filesystem. A future Dogpile 20 release may include this additional class at some point. 21 22 """ 23 24 def __init__(self): 25 # counts how many asynchronous methods are executing 26 self.async = 0 27 28 # pointer to thread that is the current sync operation 29 self.current_sync_operation = None 30 31 # condition object to lock on 32 self.condition = threading.Condition(threading.Lock()) 33 34 def acquire_read_lock(self, wait = True): 35 """Acquire the 'read' lock.""" 36 self.condition.acquire() 37 try: 38 # see if a synchronous operation is waiting to start 39 # or is already running, in which case we wait (or just 40 # give up and return) 41 if wait: 42 while self.current_sync_operation is not None: 43 self.condition.wait() 44 else: 45 if self.current_sync_operation is not None: 46 return False 47 48 self.async += 1 49 log.debug("%s acquired read lock", self) 50 finally: 51 self.condition.release() 52 53 if not wait: 54 return True 55 56 def release_read_lock(self): 57 """Release the 'read' lock.""" 58 self.condition.acquire() 59 try: 60 self.async -= 1 61 62 # check if we are the last asynchronous reader thread 63 # out the door. 64 if self.async == 0: 65 # yes. so if a sync operation is waiting, notifyAll to wake 66 # it up 67 if self.current_sync_operation is not None: 68 self.condition.notifyAll() 69 elif self.async < 0: 70 raise LockError("Synchronizer error - too many " 71 "release_read_locks called") 72 log.debug("%s released read lock", self) 73 finally: 74 self.condition.release() 75 76 def acquire_write_lock(self, wait = True): 77 """Acquire the 'write' lock.""" 78 self.condition.acquire() 79 try: 80 # here, we are not a synchronous reader, and after returning, 81 # assuming waiting or immediate availability, we will be. 82 83 if wait: 84 # if another sync is working, wait 85 while self.current_sync_operation is not None: 86 self.condition.wait() 87 else: 88 # if another sync is working, 89 # we dont want to wait, so forget it 90 if self.current_sync_operation is not None: 91 return False 92 93 # establish ourselves as the current sync 94 # this indicates to other read/write operations 95 # that they should wait until this is None again 96 self.current_sync_operation = threading.currentThread() 97 98 # now wait again for asyncs to finish 99 if self.async > 0: 100 if wait: 101 # wait 102 self.condition.wait() 103 else: 104 # we dont want to wait, so forget it 105 self.current_sync_operation = None 106 return False 107 log.debug("%s acquired write lock", self) 108 finally: 109 self.condition.release() 110 111 if not wait: 112 return True 113 114 def release_write_lock(self): 115 """Release the 'write' lock.""" 116 self.condition.acquire() 117 try: 118 if self.current_sync_operation is not threading.currentThread(): 119 raise LockError("Synchronizer error - current thread doesn't " 120 "have the write lock") 121 122 # reset the current sync operation so 123 # another can get it 124 self.current_sync_operation = None 125 126 # tell everyone to get ready 127 self.condition.notifyAll() 128 129 log.debug("%s released write lock", self) 130 finally: 131 # everyone go !! 132 self.condition.release() 133