1# Copyright Mateusz Kobos, (c) 2011 2# https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/ 3# released under the MIT licence 4 5import unittest 6import threading 7import time 8import copy 9from ._rwlock import RWLock 10 11 12class Writer(threading.Thread): 13 def __init__( 14 self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write 15 ): 16 """ 17 @param buffer_: common buffer_ shared by the readers and writers 18 @type buffer_: list 19 @type rw_lock: L{RWLock} 20 @param init_sleep_time: sleep time before doing any action 21 @type init_sleep_time: C{float} 22 @param sleep_time: sleep time while in critical section 23 @type sleep_time: C{float} 24 @param to_write: data that will be appended to the buffer 25 """ 26 threading.Thread.__init__(self) 27 self.__buffer = buffer_ 28 self.__rw_lock = rw_lock 29 self.__init_sleep_time = init_sleep_time 30 self.__sleep_time = sleep_time 31 self.__to_write = to_write 32 self.entry_time = None 33 """Time of entry to the critical section""" 34 self.exit_time = None 35 """Time of exit from the critical section""" 36 37 def run(self): 38 time.sleep(self.__init_sleep_time) 39 self.__rw_lock.writer_acquire() 40 self.entry_time = time.time() 41 time.sleep(self.__sleep_time) 42 self.__buffer.append(self.__to_write) 43 self.exit_time = time.time() 44 self.__rw_lock.writer_release() 45 46 47class Reader(threading.Thread): 48 def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time): 49 """ 50 @param buffer_: common buffer shared by the readers and writers 51 @type buffer_: list 52 @type rw_lock: L{RWLock} 53 @param init_sleep_time: sleep time before doing any action 54 @type init_sleep_time: C{float} 55 @param sleep_time: sleep time while in critical section 56 @type sleep_time: C{float} 57 """ 58 threading.Thread.__init__(self) 59 self.__buffer = buffer_ 60 self.__rw_lock = rw_lock 61 self.__init_sleep_time = init_sleep_time 62 self.__sleep_time = sleep_time 63 self.buffer_read = None 64 """a copy of a the buffer read while in critical section""" 65 self.entry_time = None 66 """Time of entry to the critical section""" 67 self.exit_time = None 68 """Time of exit from the critical section""" 69 70 def run(self): 71 time.sleep(self.__init_sleep_time) 72 self.__rw_lock.reader_acquire() 73 self.entry_time = time.time() 74 time.sleep(self.__sleep_time) 75 self.buffer_read = copy.deepcopy(self.__buffer) 76 self.exit_time = time.time() 77 self.__rw_lock.reader_release() 78 79 80class RWLockTestCase(unittest.TestCase): 81 def test_readers_nonexclusive_access(self): 82 (buffer_, rw_lock, threads) = self.__init_variables() 83 84 threads.append(Reader(buffer_, rw_lock, 0, 0)) 85 threads.append(Writer(buffer_, rw_lock, 0.2, 0.4, 1)) 86 threads.append(Reader(buffer_, rw_lock, 0.3, 0.3)) 87 threads.append(Reader(buffer_, rw_lock, 0.5, 0)) 88 89 self.__start_and_join_threads(threads) 90 91 ## The third reader should enter after the second one but it should 92 ## exit before the second one exits 93 ## (i.e. the readers should be in the critical section 94 ## at the same time) 95 96 self.assertEqual([], threads[0].buffer_read) 97 self.assertEqual([1], threads[2].buffer_read) 98 self.assertEqual([1], threads[3].buffer_read) 99 self.assertTrue(threads[1].exit_time <= threads[2].entry_time) 100 self.assertTrue(threads[2].entry_time <= threads[3].entry_time) 101 self.assertTrue(threads[3].exit_time < threads[2].exit_time) 102 103 def test_writers_exclusive_access(self): 104 (buffer_, rw_lock, threads) = self.__init_variables() 105 106 threads.append(Writer(buffer_, rw_lock, 0, 0.4, 1)) 107 threads.append(Writer(buffer_, rw_lock, 0.1, 0, 2)) 108 threads.append(Reader(buffer_, rw_lock, 0.2, 0)) 109 110 self.__start_and_join_threads(threads) 111 112 ## The second writer should wait for the first one to exit 113 114 self.assertEqual([1, 2], threads[2].buffer_read) 115 self.assertTrue(threads[0].exit_time <= threads[1].entry_time) 116 self.assertTrue(threads[1].exit_time <= threads[2].exit_time) 117 118 def test_writer_priority(self): 119 (buffer_, rw_lock, threads) = self.__init_variables() 120 121 threads.append(Writer(buffer_, rw_lock, 0, 0, 1)) 122 threads.append(Reader(buffer_, rw_lock, 0.1, 0.4)) 123 threads.append(Writer(buffer_, rw_lock, 0.2, 0, 2)) 124 threads.append(Reader(buffer_, rw_lock, 0.3, 0)) 125 threads.append(Reader(buffer_, rw_lock, 0.3, 0)) 126 127 self.__start_and_join_threads(threads) 128 129 ## The second writer should go before the second and the third reader 130 131 self.assertEqual([1], threads[1].buffer_read) 132 self.assertEqual([1, 2], threads[3].buffer_read) 133 self.assertEqual([1, 2], threads[4].buffer_read) 134 self.assertTrue(threads[0].exit_time < threads[1].entry_time) 135 self.assertTrue(threads[1].exit_time <= threads[2].entry_time) 136 self.assertTrue(threads[2].exit_time <= threads[3].entry_time) 137 self.assertTrue(threads[2].exit_time <= threads[4].entry_time) 138 139 def test_many_writers_priority(self): 140 (buffer_, rw_lock, threads) = self.__init_variables() 141 142 threads.append(Writer(buffer_, rw_lock, 0, 0, 1)) 143 threads.append(Reader(buffer_, rw_lock, 0.1, 0.6)) 144 threads.append(Writer(buffer_, rw_lock, 0.2, 0.1, 2)) 145 threads.append(Reader(buffer_, rw_lock, 0.3, 0)) 146 threads.append(Reader(buffer_, rw_lock, 0.4, 0)) 147 threads.append(Writer(buffer_, rw_lock, 0.5, 0.1, 3)) 148 149 self.__start_and_join_threads(threads) 150 151 ## The two last writers should go first -- after the first reader and 152 ## before the second and the third reader 153 154 self.assertEqual([1], threads[1].buffer_read) 155 self.assertEqual([1, 2, 3], threads[3].buffer_read) 156 self.assertEqual([1, 2, 3], threads[4].buffer_read) 157 self.assertTrue(threads[0].exit_time < threads[1].entry_time) 158 self.assertTrue(threads[1].exit_time <= threads[2].entry_time) 159 self.assertTrue(threads[1].exit_time <= threads[5].entry_time) 160 self.assertTrue(threads[2].exit_time <= threads[3].entry_time) 161 self.assertTrue(threads[2].exit_time <= threads[4].entry_time) 162 self.assertTrue(threads[5].exit_time <= threads[3].entry_time) 163 self.assertTrue(threads[5].exit_time <= threads[4].entry_time) 164 165 @staticmethod 166 def __init_variables(): 167 buffer_ = [] 168 rw_lock = RWLock() 169 threads = [] 170 return (buffer_, rw_lock, threads) 171 172 @staticmethod 173 def __start_and_join_threads(threads): 174 for t in threads: 175 t.start() 176 for t in threads: 177 t.join() 178