1#!/usr/local/bin/python3.8 2# vim:fileencoding=utf-8 3 4 5__license__ = 'GPL v3' 6__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>' 7 8import time, random 9from threading import Thread 10from calibre.db.tests.base import BaseTest 11from calibre.db.locking import SHLock, RWLockWrapper, LockingError 12 13 14def wait_for(period): 15 # time.sleep() is not useful for very small values on windows because the 16 # default timer has a resolutions of about 16ms see 17 # https://stackoverflow.com/questions/40594587/why-time-sleep-is-so-slow-in-windows 18 deadline = time.perf_counter() + period 19 while time.perf_counter() < deadline: 20 pass 21 22 23class TestLock(BaseTest): 24 """Tests for db locking """ 25 26 def test_owns_locks(self): 27 lock = SHLock() 28 self.assertFalse(lock.owns_lock()) 29 lock.acquire(shared=True) 30 self.assertTrue(lock.owns_lock()) 31 lock.release() 32 self.assertFalse(lock.owns_lock()) 33 lock.acquire(shared=False) 34 self.assertTrue(lock.owns_lock()) 35 lock.release() 36 self.assertFalse(lock.owns_lock()) 37 38 done = [] 39 40 def test(): 41 if not lock.owns_lock(): 42 done.append(True) 43 lock.acquire() 44 t = Thread(target=test) 45 t.daemon = True 46 t.start() 47 t.join(1) 48 self.assertEqual(len(done), 1) 49 lock.release() 50 51 def test_multithread_deadlock(self): 52 lock = SHLock() 53 54 def two_shared(): 55 r = RWLockWrapper(lock) 56 with r: 57 time.sleep(0.2) 58 with r: 59 pass 60 61 def one_exclusive(): 62 time.sleep(0.1) 63 w = RWLockWrapper(lock, is_shared=False) 64 with w: 65 pass 66 threads = [Thread(target=two_shared), Thread(target=one_exclusive)] 67 for t in threads: 68 t.daemon = True 69 t.start() 70 for t in threads: 71 t.join(5) 72 live = [t for t in threads if t.is_alive()] 73 self.assertListEqual(live, [], 'ShLock hung') 74 75 def test_upgrade(self): 76 lock = SHLock() 77 lock.acquire(shared=True) 78 self.assertRaises(LockingError, lock.acquire, shared=False) 79 lock.release() 80 81 def test_downgrade(self): 82 lock = SHLock() 83 lock.acquire(shared=False) 84 self.assertRaises(LockingError, lock.acquire, shared=True) 85 lock.release() 86 87 def test_recursive(self): 88 lock = SHLock() 89 lock.acquire(shared=True) 90 lock.acquire(shared=True) 91 self.assertEqual(lock.is_shared, 2) 92 lock.release() 93 lock.release() 94 self.assertFalse(lock.is_shared) 95 lock.acquire(shared=False) 96 lock.acquire(shared=False) 97 self.assertEqual(lock.is_exclusive, 2) 98 lock.release() 99 lock.release() 100 self.assertFalse(lock.is_exclusive) 101 102 def test_release(self): 103 lock = SHLock() 104 self.assertRaises(LockingError, lock.release) 105 106 def get_lock(shared): 107 lock.acquire(shared=shared) 108 time.sleep(1) 109 lock.release() 110 111 threads = [Thread(target=get_lock, args=(x,)) for x in (True, 112 False)] 113 for t in threads: 114 t.daemon = True 115 t.start() 116 self.assertRaises(LockingError, lock.release) 117 t.join(15) 118 self.assertFalse(t.is_alive()) 119 self.assertFalse(lock.is_shared) 120 self.assertFalse(lock.is_exclusive) 121 122 def test_acquire(self): 123 lock = SHLock() 124 125 def get_lock(shared): 126 lock.acquire(shared=shared) 127 time.sleep(1) 128 lock.release() 129 130 shared = Thread(target=get_lock, args=(True,)) 131 shared.daemon = True 132 shared.start() 133 time.sleep(0.1) 134 self.assertTrue(lock.acquire(shared=True, blocking=False)) 135 lock.release() 136 self.assertFalse(lock.acquire(shared=False, blocking=False)) 137 lock.acquire(shared=False) 138 shared.join(1) 139 self.assertFalse(shared.is_alive()) 140 lock.release() 141 self.assertTrue(lock.acquire(shared=False, blocking=False)) 142 lock.release() 143 144 exclusive = Thread(target=get_lock, args=(False,)) 145 exclusive.daemon = True 146 exclusive.start() 147 time.sleep(0.1) 148 self.assertFalse(lock.acquire(shared=False, blocking=False)) 149 self.assertFalse(lock.acquire(shared=True, blocking=False)) 150 lock.acquire(shared=True) 151 exclusive.join(1) 152 self.assertFalse(exclusive.is_alive()) 153 lock.release() 154 lock.acquire(shared=False) 155 lock.release() 156 lock.acquire(shared=True) 157 lock.release() 158 self.assertFalse(lock.is_shared) 159 self.assertFalse(lock.is_exclusive) 160 161 def test_contention(self): 162 lock = SHLock() 163 done = [] 164 165 def lots_of_acquires(): 166 for _ in range(1000): 167 shared = random.choice([True,False]) 168 lock.acquire(shared=shared) 169 lock.acquire(shared=shared) 170 wait_for(random.random() * 0.0001) 171 lock.release() 172 wait_for(random.random() * 0.0001) 173 lock.acquire(shared=shared) 174 wait_for(random.random() * 0.0001) 175 lock.release() 176 lock.release() 177 done.append(True) 178 threads = [Thread(target=lots_of_acquires) for _ in range(2)] 179 for t in threads: 180 t.daemon = True 181 t.start() 182 end_at = time.monotonic() + 20 183 for t in threads: 184 left = end_at - time.monotonic() 185 if left <= 0: 186 break 187 t.join(left) 188 live = [t for t in threads if t.is_alive()] 189 self.assertEqual(len(live), 0, 'ShLock hung or very slow, {} threads alive'.format(len(live))) 190 self.assertEqual(len(done), len(threads), 'SHLock locking failed') 191 self.assertFalse(lock.is_shared) 192 self.assertFalse(lock.is_exclusive) 193 194 195def find_tests(): 196 import unittest 197 return unittest.defaultTestLoader.loadTestsFromTestCase(TestLock) 198 199 200def run_tests(): 201 from calibre.utils.run_tests import run_tests 202 run_tests(find_tests) 203