1# Copyright Mateusz Kobos, (c) 2011
2# https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
3# released under the MIT licence
4
5import unittest
6import threading
7import time
8import copy
9from ._rwlock import RWLock
10
11
12class Writer(threading.Thread):
13    def __init__(
14        self, buffer_, rw_lock, init_sleep_time, sleep_time, to_write
15    ):
16        """
17        @param buffer_: common buffer_ shared by the readers and writers
18        @type buffer_: list
19        @type rw_lock: L{RWLock}
20        @param init_sleep_time: sleep time before doing any action
21        @type init_sleep_time: C{float}
22        @param sleep_time: sleep time while in critical section
23        @type sleep_time: C{float}
24        @param to_write: data that will be appended to the buffer
25        """
26        threading.Thread.__init__(self)
27        self.__buffer = buffer_
28        self.__rw_lock = rw_lock
29        self.__init_sleep_time = init_sleep_time
30        self.__sleep_time = sleep_time
31        self.__to_write = to_write
32        self.entry_time = None
33        """Time of entry to the critical section"""
34        self.exit_time = None
35        """Time of exit from the critical section"""
36
37    def run(self):
38        time.sleep(self.__init_sleep_time)
39        self.__rw_lock.writer_acquire()
40        self.entry_time = time.time()
41        time.sleep(self.__sleep_time)
42        self.__buffer.append(self.__to_write)
43        self.exit_time = time.time()
44        self.__rw_lock.writer_release()
45
46
47class Reader(threading.Thread):
48    def __init__(self, buffer_, rw_lock, init_sleep_time, sleep_time):
49        """
50        @param buffer_: common buffer shared by the readers and writers
51        @type buffer_: list
52        @type rw_lock: L{RWLock}
53        @param init_sleep_time: sleep time before doing any action
54        @type init_sleep_time: C{float}
55        @param sleep_time: sleep time while in critical section
56        @type sleep_time: C{float}
57        """
58        threading.Thread.__init__(self)
59        self.__buffer = buffer_
60        self.__rw_lock = rw_lock
61        self.__init_sleep_time = init_sleep_time
62        self.__sleep_time = sleep_time
63        self.buffer_read = None
64        """a copy of a the buffer read while in critical section"""
65        self.entry_time = None
66        """Time of entry to the critical section"""
67        self.exit_time = None
68        """Time of exit from the critical section"""
69
70    def run(self):
71        time.sleep(self.__init_sleep_time)
72        self.__rw_lock.reader_acquire()
73        self.entry_time = time.time()
74        time.sleep(self.__sleep_time)
75        self.buffer_read = copy.deepcopy(self.__buffer)
76        self.exit_time = time.time()
77        self.__rw_lock.reader_release()
78
79
80class RWLockTestCase(unittest.TestCase):
81    def test_readers_nonexclusive_access(self):
82        (buffer_, rw_lock, threads) = self.__init_variables()
83
84        threads.append(Reader(buffer_, rw_lock, 0, 0))
85        threads.append(Writer(buffer_, rw_lock, 0.2, 0.4, 1))
86        threads.append(Reader(buffer_, rw_lock, 0.3, 0.3))
87        threads.append(Reader(buffer_, rw_lock, 0.5, 0))
88
89        self.__start_and_join_threads(threads)
90
91        ## The third reader should enter after the second one but it should
92        ## exit before the second one exits
93        ## (i.e. the readers should be in the critical section
94        ## at the same time)
95
96        self.assertEqual([], threads[0].buffer_read)
97        self.assertEqual([1], threads[2].buffer_read)
98        self.assertEqual([1], threads[3].buffer_read)
99        self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
100        self.assertTrue(threads[2].entry_time <= threads[3].entry_time)
101        self.assertTrue(threads[3].exit_time < threads[2].exit_time)
102
103    def test_writers_exclusive_access(self):
104        (buffer_, rw_lock, threads) = self.__init_variables()
105
106        threads.append(Writer(buffer_, rw_lock, 0, 0.4, 1))
107        threads.append(Writer(buffer_, rw_lock, 0.1, 0, 2))
108        threads.append(Reader(buffer_, rw_lock, 0.2, 0))
109
110        self.__start_and_join_threads(threads)
111
112        ## The second writer should wait for the first one to exit
113
114        self.assertEqual([1, 2], threads[2].buffer_read)
115        self.assertTrue(threads[0].exit_time <= threads[1].entry_time)
116        self.assertTrue(threads[1].exit_time <= threads[2].exit_time)
117
118    def test_writer_priority(self):
119        (buffer_, rw_lock, threads) = self.__init_variables()
120
121        threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
122        threads.append(Reader(buffer_, rw_lock, 0.1, 0.4))
123        threads.append(Writer(buffer_, rw_lock, 0.2, 0, 2))
124        threads.append(Reader(buffer_, rw_lock, 0.3, 0))
125        threads.append(Reader(buffer_, rw_lock, 0.3, 0))
126
127        self.__start_and_join_threads(threads)
128
129        ## The second writer should go before the second and the third reader
130
131        self.assertEqual([1], threads[1].buffer_read)
132        self.assertEqual([1, 2], threads[3].buffer_read)
133        self.assertEqual([1, 2], threads[4].buffer_read)
134        self.assertTrue(threads[0].exit_time < threads[1].entry_time)
135        self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
136        self.assertTrue(threads[2].exit_time <= threads[3].entry_time)
137        self.assertTrue(threads[2].exit_time <= threads[4].entry_time)
138
139    def test_many_writers_priority(self):
140        (buffer_, rw_lock, threads) = self.__init_variables()
141
142        threads.append(Writer(buffer_, rw_lock, 0, 0, 1))
143        threads.append(Reader(buffer_, rw_lock, 0.1, 0.6))
144        threads.append(Writer(buffer_, rw_lock, 0.2, 0.1, 2))
145        threads.append(Reader(buffer_, rw_lock, 0.3, 0))
146        threads.append(Reader(buffer_, rw_lock, 0.4, 0))
147        threads.append(Writer(buffer_, rw_lock, 0.5, 0.1, 3))
148
149        self.__start_and_join_threads(threads)
150
151        ## The two last writers should go first -- after the first reader and
152        ## before the second and the third reader
153
154        self.assertEqual([1], threads[1].buffer_read)
155        self.assertEqual([1, 2, 3], threads[3].buffer_read)
156        self.assertEqual([1, 2, 3], threads[4].buffer_read)
157        self.assertTrue(threads[0].exit_time < threads[1].entry_time)
158        self.assertTrue(threads[1].exit_time <= threads[2].entry_time)
159        self.assertTrue(threads[1].exit_time <= threads[5].entry_time)
160        self.assertTrue(threads[2].exit_time <= threads[3].entry_time)
161        self.assertTrue(threads[2].exit_time <= threads[4].entry_time)
162        self.assertTrue(threads[5].exit_time <= threads[3].entry_time)
163        self.assertTrue(threads[5].exit_time <= threads[4].entry_time)
164
165    @staticmethod
166    def __init_variables():
167        buffer_ = []
168        rw_lock = RWLock()
169        threads = []
170        return (buffer_, rw_lock, threads)
171
172    @staticmethod
173    def __start_and_join_threads(threads):
174        for t in threads:
175            t.start()
176        for t in threads:
177            t.join()
178