1#!/usr/local/bin/python3.8
2# vim:fileencoding=utf-8
3
4
5__license__ = 'GPL v3'
6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
7
8import time, random
9from threading import Thread
10from calibre.db.tests.base import BaseTest
11from calibre.db.locking import SHLock, RWLockWrapper, LockingError
12
13
14def wait_for(period):
15    # time.sleep() is not useful for very small values on windows because the
16    # default timer has a resolutions of about 16ms see
17    # https://stackoverflow.com/questions/40594587/why-time-sleep-is-so-slow-in-windows
18    deadline = time.perf_counter() + period
19    while time.perf_counter() < deadline:
20        pass
21
22
23class TestLock(BaseTest):
24    """Tests for db locking """
25
26    def test_owns_locks(self):
27        lock = SHLock()
28        self.assertFalse(lock.owns_lock())
29        lock.acquire(shared=True)
30        self.assertTrue(lock.owns_lock())
31        lock.release()
32        self.assertFalse(lock.owns_lock())
33        lock.acquire(shared=False)
34        self.assertTrue(lock.owns_lock())
35        lock.release()
36        self.assertFalse(lock.owns_lock())
37
38        done = []
39
40        def test():
41            if not lock.owns_lock():
42                done.append(True)
43        lock.acquire()
44        t = Thread(target=test)
45        t.daemon = True
46        t.start()
47        t.join(1)
48        self.assertEqual(len(done), 1)
49        lock.release()
50
51    def test_multithread_deadlock(self):
52        lock = SHLock()
53
54        def two_shared():
55            r = RWLockWrapper(lock)
56            with r:
57                time.sleep(0.2)
58                with r:
59                    pass
60
61        def one_exclusive():
62            time.sleep(0.1)
63            w = RWLockWrapper(lock, is_shared=False)
64            with w:
65                pass
66        threads = [Thread(target=two_shared), Thread(target=one_exclusive)]
67        for t in threads:
68            t.daemon = True
69            t.start()
70        for t in threads:
71            t.join(5)
72        live = [t for t in threads if t.is_alive()]
73        self.assertListEqual(live, [], 'ShLock hung')
74
75    def test_upgrade(self):
76        lock = SHLock()
77        lock.acquire(shared=True)
78        self.assertRaises(LockingError, lock.acquire, shared=False)
79        lock.release()
80
81    def test_downgrade(self):
82        lock = SHLock()
83        lock.acquire(shared=False)
84        self.assertRaises(LockingError, lock.acquire, shared=True)
85        lock.release()
86
87    def test_recursive(self):
88        lock = SHLock()
89        lock.acquire(shared=True)
90        lock.acquire(shared=True)
91        self.assertEqual(lock.is_shared, 2)
92        lock.release()
93        lock.release()
94        self.assertFalse(lock.is_shared)
95        lock.acquire(shared=False)
96        lock.acquire(shared=False)
97        self.assertEqual(lock.is_exclusive, 2)
98        lock.release()
99        lock.release()
100        self.assertFalse(lock.is_exclusive)
101
102    def test_release(self):
103        lock = SHLock()
104        self.assertRaises(LockingError, lock.release)
105
106        def get_lock(shared):
107            lock.acquire(shared=shared)
108            time.sleep(1)
109            lock.release()
110
111        threads = [Thread(target=get_lock, args=(x,)) for x in (True,
112            False)]
113        for t in threads:
114            t.daemon = True
115            t.start()
116            self.assertRaises(LockingError, lock.release)
117            t.join(15)
118            self.assertFalse(t.is_alive())
119        self.assertFalse(lock.is_shared)
120        self.assertFalse(lock.is_exclusive)
121
122    def test_acquire(self):
123        lock = SHLock()
124
125        def get_lock(shared):
126            lock.acquire(shared=shared)
127            time.sleep(1)
128            lock.release()
129
130        shared = Thread(target=get_lock, args=(True,))
131        shared.daemon = True
132        shared.start()
133        time.sleep(0.1)
134        self.assertTrue(lock.acquire(shared=True, blocking=False))
135        lock.release()
136        self.assertFalse(lock.acquire(shared=False, blocking=False))
137        lock.acquire(shared=False)
138        shared.join(1)
139        self.assertFalse(shared.is_alive())
140        lock.release()
141        self.assertTrue(lock.acquire(shared=False, blocking=False))
142        lock.release()
143
144        exclusive = Thread(target=get_lock, args=(False,))
145        exclusive.daemon = True
146        exclusive.start()
147        time.sleep(0.1)
148        self.assertFalse(lock.acquire(shared=False, blocking=False))
149        self.assertFalse(lock.acquire(shared=True, blocking=False))
150        lock.acquire(shared=True)
151        exclusive.join(1)
152        self.assertFalse(exclusive.is_alive())
153        lock.release()
154        lock.acquire(shared=False)
155        lock.release()
156        lock.acquire(shared=True)
157        lock.release()
158        self.assertFalse(lock.is_shared)
159        self.assertFalse(lock.is_exclusive)
160
161    def test_contention(self):
162        lock = SHLock()
163        done = []
164
165        def lots_of_acquires():
166            for _ in range(1000):
167                shared = random.choice([True,False])
168                lock.acquire(shared=shared)
169                lock.acquire(shared=shared)
170                wait_for(random.random() * 0.0001)
171                lock.release()
172                wait_for(random.random() * 0.0001)
173                lock.acquire(shared=shared)
174                wait_for(random.random() * 0.0001)
175                lock.release()
176                lock.release()
177            done.append(True)
178        threads = [Thread(target=lots_of_acquires) for _ in range(2)]
179        for t in threads:
180            t.daemon = True
181            t.start()
182        end_at = time.monotonic() + 20
183        for t in threads:
184            left = end_at - time.monotonic()
185            if left <= 0:
186                break
187            t.join(left)
188        live = [t for t in threads if t.is_alive()]
189        self.assertEqual(len(live), 0, 'ShLock hung or very slow, {} threads alive'.format(len(live)))
190        self.assertEqual(len(done), len(threads), 'SHLock locking failed')
191        self.assertFalse(lock.is_shared)
192        self.assertFalse(lock.is_exclusive)
193
194
195def find_tests():
196    import unittest
197    return unittest.defaultTestLoader.loadTestsFromTestCase(TestLock)
198
199
200def run_tests():
201    from calibre.utils.run_tests import run_tests
202    run_tests(find_tests)
203