1# -*- coding: utf-8 -*- 2""" 3ReadWriteLock 4 5Taken from http://code.activestate.com/recipes/502283/ 6 7locks.py - Read-Write lock thread lock implementation 8 9See the class documentation for more info. 10 11Copyright (C) 2007, Heiko Wundram. 12Released under the BSD-license. 13""" 14 15# Imports 16# ------- 17 18from threading import Condition, Lock, currentThread 19from time import time 20 21# Read write lock 22# --------------- 23 24 25class ReadWriteLock(object): 26 """Read-Write lock class. A read-write lock differs from a standard 27 threading.RLock() by allowing multiple threads to simultaneously hold a 28 read lock, while allowing only a single thread to hold a write lock at the 29 same point of time. 30 31 When a read lock is requested while a write lock is held, the reader 32 is blocked; when a write lock is requested while another write lock is 33 held or there are read locks, the writer is blocked. 34 35 Writers are always preferred by this implementation: if there are blocked 36 threads waiting for a write lock, current readers may request more read 37 locks (which they eventually should free, as they starve the waiting 38 writers otherwise), but a new thread requesting a read lock will not 39 be granted one, and block. This might mean starvation for readers if 40 two writer threads interweave their calls to acquire_write() without 41 leaving a window only for readers. 42 43 In case a current reader requests a write lock, this can and will be 44 satisfied without giving up the read locks first, but, only one thread 45 may perform this kind of lock upgrade, as a deadlock would otherwise 46 occur. After the write lock has been granted, the thread will hold a 47 full write lock, and not be downgraded after the upgrading call to 48 acquire_write() has been match by a corresponding release(). 49 """ 50 51 def __init__(self): 52 """Initialize this read-write lock.""" 53 54 # Condition variable, used to signal waiters of a change in object 55 # state. 56 self.__condition = Condition(Lock()) 57 58 # Initialize with no writers. 59 self.__writer = None 60 self.__upgradewritercount = 0 61 self.__pendingwriters = [] 62 63 # Initialize with no readers. 64 self.__readers = {} 65 66 def acquire_read(self, timeout=None): 67 """Acquire a read lock for the current thread, waiting at most 68 timeout seconds or doing a non-blocking check in case timeout is <= 0. 69 70 In case timeout is None, the call to acquire_read blocks until the 71 lock request can be serviced. 72 73 In case the timeout expires before the lock could be serviced, a 74 RuntimeError is thrown.""" 75 76 if timeout is not None: 77 endtime = time() + timeout 78 me = currentThread() 79 self.__condition.acquire() 80 try: 81 if self.__writer is me: 82 # If we are the writer, grant a new read lock, always. 83 self.__writercount += 1 84 return 85 while True: 86 if self.__writer is None: 87 # Only test anything if there is no current writer. 88 if self.__upgradewritercount or self.__pendingwriters: 89 if me in self.__readers: 90 # Only grant a read lock if we already have one 91 # in case writers are waiting for their turn. 92 # This means that writers can't easily get starved 93 # (but see below, readers can). 94 self.__readers[me] += 1 95 return 96 # No, we aren't a reader (yet), wait for our turn. 97 else: 98 # Grant a new read lock, always, in case there are 99 # no pending writers (and no writer). 100 self.__readers[me] = self.__readers.get(me, 0) + 1 101 return 102 if timeout is not None: 103 remaining = endtime - time() 104 if remaining <= 0: 105 # Timeout has expired, signal caller of this. 106 raise RuntimeError("Acquiring read lock timed out") 107 self.__condition.wait(remaining) 108 else: 109 self.__condition.wait() 110 finally: 111 self.__condition.release() 112 113 def acquire_write(self, timeout=None): 114 """Acquire a write lock for the current thread, waiting at most 115 timeout seconds or doing a non-blocking check in case timeout is <= 0. 116 117 In case the write lock cannot be serviced due to the deadlock 118 condition mentioned above, a ValueError is raised. 119 120 In case timeout is None, the call to acquire_write blocks until the 121 lock request can be serviced. 122 123 In case the timeout expires before the lock could be serviced, a 124 RuntimeError is thrown.""" 125 126 if timeout is not None: 127 endtime = time() + timeout 128 me, upgradewriter = currentThread(), False 129 self.__condition.acquire() 130 try: 131 if self.__writer is me: 132 # If we are the writer, grant a new write lock, always. 133 self.__writercount += 1 134 return 135 elif me in self.__readers: 136 # If we are a reader, no need to add us to pendingwriters, 137 # we get the upgradewriter slot. 138 if self.__upgradewritercount: 139 # If we are a reader and want to upgrade, and someone 140 # else also wants to upgrade, there is no way we can do 141 # this except if one of us releases all his read locks. 142 # Signal this to user. 143 raise ValueError("Inevitable dead lock, denying write lock") 144 upgradewriter = True 145 self.__upgradewritercount = self.__readers.pop(me) 146 else: 147 # We aren't a reader, so add us to the pending writers queue 148 # for synchronization with the readers. 149 self.__pendingwriters.append(me) 150 while True: 151 if not self.__readers and self.__writer is None: 152 # Only test anything if there are no readers and writers. 153 if self.__upgradewritercount: 154 if upgradewriter: 155 # There is a writer to upgrade, and it's us. Take 156 # the write lock. 157 self.__writer = me 158 self.__writercount = self.__upgradewritercount + 1 159 self.__upgradewritercount = 0 160 return 161 # There is a writer to upgrade, but it's not us. 162 # Always leave the upgrade writer the advance slot, 163 # because he presumes he'll get a write lock directly 164 # from a previously held read lock. 165 elif self.__pendingwriters[0] is me: 166 # If there are no readers and writers, it's always 167 # fine for us to take the writer slot, removing us 168 # from the pending writers queue. 169 # This might mean starvation for readers, though. 170 self.__writer = me 171 self.__writercount = 1 172 self.__pendingwriters = self.__pendingwriters[1:] 173 return 174 if timeout is not None: 175 remaining = endtime - time() 176 if remaining <= 0: 177 # Timeout has expired, signal caller of this. 178 if upgradewriter: 179 # Put us back on the reader queue. No need to 180 # signal anyone of this change, because no other 181 # writer could've taken our spot before we got 182 # here (because of remaining readers), as the test 183 # for proper conditions is at the start of the 184 # loop, not at the end. 185 self.__readers[me] = self.__upgradewritercount 186 self.__upgradewritercount = 0 187 else: 188 # We were a simple pending writer, just remove us 189 # from the FIFO list. 190 self.__pendingwriters.remove(me) 191 raise RuntimeError("Acquiring write lock timed out") 192 self.__condition.wait(remaining) 193 else: 194 self.__condition.wait() 195 finally: 196 self.__condition.release() 197 198 def release(self): 199 """Release the currently held lock. 200 201 In case the current thread holds no lock, a ValueError is thrown.""" 202 203 me = currentThread() 204 self.__condition.acquire() 205 try: 206 if self.__writer is me: 207 # We are the writer, take one nesting depth away. 208 self.__writercount -= 1 209 if not self.__writercount: 210 # No more write locks; take our writer position away and 211 # notify waiters of the new circumstances. 212 self.__writer = None 213 self.__condition.notifyAll() 214 elif me in self.__readers: 215 # We are a reader currently, take one nesting depth away. 216 self.__readers[me] -= 1 217 if not self.__readers[me]: 218 # No more read locks, take our reader position away. 219 del self.__readers[me] 220 if not self.__readers: 221 # No more readers, notify waiters of the new 222 # circumstances. 223 self.__condition.notifyAll() 224 else: 225 raise ValueError("Trying to release unheld lock") 226 finally: 227 self.__condition.release() 228