1# -*- coding: utf-8 -*- 2# (c) 2009-2020 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav 3# Licensed under the MIT license: 4# http://www.opensource.org/licenses/mit-license.php 5"""Unit test for lock_manager.py""" 6from tempfile import gettempdir 7from time import sleep 8from wsgidav import lock_manager, lock_storage 9from wsgidav.dav_error import DAVError 10from wsgidav.lock_storage_redis import LockStorageRedis 11 12import os 13import sys 14import unittest 15 16 17# ======================================================================== 18# BasicTest 19# ======================================================================== 20 21 22class BasicTest(unittest.TestCase): 23 """Test lock_manager.LockManager().""" 24 25 principal = "Joe Tester" 26 owner = b"joe.tester@example.com" 27 root = "/dav/res" 28 timeout = 10 * 60 # Default lock timeout 10 minutes 29 30 # @classmethod 31 # def suite(cls): 32 # """Return test case suite (so we can control the order).""" 33 # suite = TestSuite() 34 # suite.addTest(cls("testPreconditions")) 35 # # suite.addTest(cls("testOpen")) 36 # suite.addTest(cls("testValidation")) 37 # suite.addTest(cls("testLock")) 38 # suite.addTest(cls("testTimeout")) 39 # suite.addTest(cls("testConflict")) 40 # return suite 41 42 def setUp(self): 43 storage = lock_storage.LockStorageDict() 44 self.lm = lock_manager.LockManager(storage) 45 self.lm._verbose = 1 46 47 def tearDown(self): 48 del self.lm 49 50 def _acquire( 51 self, 52 url, 53 lock_type, 54 lock_scope, 55 lock_depth, 56 lock_owner, 57 timeout, 58 principal, 59 token_list, 60 ): 61 """Wrapper for lm.acquire, that returns None instead of raising DAVError.""" 62 try: 63 return self.lm.acquire( 64 url, 65 lock_type, 66 lock_scope, 67 lock_depth, 68 lock_owner, 69 timeout, 70 principal, 71 token_list, 72 ) 73 except DAVError: 74 return None 75 76 def _isLockDict(self, o): 77 try: 78 _ = o["root"] # noqa F841 79 except Exception: 80 return False 81 return True 82 83 def _isLockResultOK(self, resultTupleList): 84 """Return True, if result is [ (lock_dict, None) ].""" 85 try: 86 return ( 87 len(resultTupleList) == 1 88 and len(resultTupleList) == 2 89 and self._isLockDict(resultTupleList[0][0]) 90 and resultTupleList[0][1] is None 91 ) 92 except Exception: 93 return False 94 95 def _isLockResultFault(self, lock, conflictList, status=None): 96 """Return True, if it is a valid result tuple containing a DAVError.""" 97 try: 98 if lock is not None: 99 return False 100 if len(conflictList) < 1: 101 return False 102 resultTuple = conflictList[0] 103 if ( 104 len(resultTuple) != 2 105 or not self._isLockDict(resultTuple[0]) 106 or not isinstance(resultTuple[1], DAVError) 107 ): 108 return False 109 elif status and status != DAVError.value: 110 return False 111 return True 112 except Exception: 113 return False 114 115 def testPreconditions(self): 116 """Environment must be set.""" 117 self.assertTrue( 118 __debug__, "__debug__ must be True, otherwise asserts are ignored" 119 ) 120 121 # def testOpen(self): 122 # """Lock manager should be lazy opening on first access.""" 123 # lm = self.lm 124 # # assert not lm._loaded, "LM must only be opened after first access" 125 # lm._generate_lock(self.principal, "write", "exclusive", "infinity", 126 # self.owner, 127 # "/dav", 128 # 10) 129 # assert lm._loaded, "LM must be opened after first access" 130 131 def testValidation(self): 132 """Lock manager should raise errors on bad args.""" 133 lm = self.lm 134 self.assertRaises( 135 AssertionError, 136 lm._generate_lock, 137 lm, 138 "writeX", 139 "exclusive", 140 "infinity", 141 self.owner, 142 self.root, 143 self.timeout, 144 ) 145 self.assertRaises( 146 AssertionError, 147 lm._generate_lock, 148 lm, 149 "write", 150 "exclusiveX", 151 "infinity", 152 self.owner, 153 self.root, 154 self.timeout, 155 ) 156 self.assertRaises( 157 AssertionError, 158 lm._generate_lock, 159 lm, 160 "write", 161 "exclusive", 162 "infinityX", 163 self.owner, 164 self.root, 165 self.timeout, 166 ) 167 self.assertRaises( 168 AssertionError, 169 lm._generate_lock, 170 lm, 171 "write", 172 "exclusive", 173 "infinity", 174 None, 175 self.root, 176 self.timeout, 177 ) 178 self.assertRaises( 179 AssertionError, 180 lm._generate_lock, 181 lm, 182 "write", 183 "exclusive", 184 "infinity", 185 self.owner, 186 None, 187 self.timeout, 188 ) 189 190 # assert lm._dict is None, "No locks should have been created by this test" 191 192 def testLock(self): 193 """Lock manager should create and find locks.""" 194 lm = self.lm 195 url = "/dav/res" 196 # Create a new lock 197 lock_dict = lm._generate_lock( 198 self.principal, 199 "write", 200 "exclusive", 201 "infinity", 202 self.owner, 203 url, 204 self.timeout, 205 ) 206 # Check returned dictionary 207 assert lock_dict is not None 208 assert lock_dict["root"] == url 209 assert lock_dict["type"] == "write" 210 assert lock_dict["scope"] == "exclusive" 211 assert lock_dict["depth"] == "infinity" 212 assert lock_dict["owner"] == self.owner 213 assert lock_dict["principal"] == self.principal 214 215 # Test lookup 216 tok = lock_dict.get("token") 217 assert lm.get_lock(tok, "root") == url 218 219 lock_dict = lm.get_lock(tok) 220 221 assert lock_dict is not None 222 assert lock_dict["root"] == url 223 assert lock_dict["type"] == "write" 224 assert lock_dict["scope"] == "exclusive" 225 assert lock_dict["depth"] == "infinity" 226 assert lock_dict["owner"] == self.owner 227 assert lock_dict["principal"] == self.principal 228 229 # We locked "/dav/res", did we? 230 assert lm.is_token_locked_by_user(tok, self.principal) 231 232 # res = lm.get_url_lock_list(url, self.principal) 233 res = lm.get_url_lock_list(url) 234 self.assertEqual(len(res), 1) 235 236 # res = lm.get_url_lock_list(url, "another user") 237 # assert len(res) == 0 238 239 assert lm.is_url_locked_by_token( 240 "/dav/res", tok 241 ), "url not directly locked by lock_token." 242 assert lm.is_url_locked_by_token( 243 "/dav/res/", tok 244 ), "url not directly locked by lock_token." 245 assert lm.is_url_locked_by_token( 246 "/dav/res/sub", tok 247 ), "child url not indirectly locked" 248 249 assert not lm.is_url_locked_by_token( 250 "/dav/ressub", tok 251 ), "non-child url reported as locked" 252 assert not lm.is_url_locked_by_token( 253 "/dav", tok 254 ), "parent url reported as locked" 255 assert not lm.is_url_locked_by_token( 256 "/dav/", tok 257 ), "parent url reported as locked" 258 259 def testTimeout(self): 260 """Locks should be purged after expiration date.""" 261 lm = self.lm 262 timeout = 1 263 lock_dict = lm._generate_lock( 264 self.principal, 265 "write", 266 "exclusive", 267 "infinity", 268 self.owner, 269 self.root, 270 timeout, 271 ) 272 273 assert lock_dict is not None 274 tok = lock_dict.get("token") 275 assert lm.get_lock(tok, "root") == self.root 276 277 sleep(timeout - 0.5) 278 lock_dict = lm.get_lock(tok) 279 assert lock_dict is not None, "Lock expired too early" 280 281 sleep(1) 282 lock_dict = lm.get_lock(tok) 283 assert lock_dict is None, "Lock has not expired" 284 285 def testConflict(self): 286 """Locks should prevent conflicts.""" 287 token_list = [] 288 289 # Create a lock for '/dav/res/' 290 lock = self._acquire( 291 "/dav/res/", 292 "write", 293 "exclusive", 294 "infinity", 295 self.owner, 296 self.timeout, 297 self.principal, 298 token_list, 299 ) 300 assert lock, "Could not acquire lock" 301 302 # Try to lock with a slightly different URL (without trailing '/') 303 lock = self._acquire( 304 "/dav/res", 305 "write", 306 "exclusive", 307 "infinity", 308 self.owner, 309 self.timeout, 310 "another principal", 311 token_list, 312 ) 313 assert lock is None, "Could acquire a conflicting lock" 314 315 # Try to lock with another principal 316 lock = self._acquire( 317 "/dav/res/", 318 "write", 319 "exclusive", 320 "infinity", 321 self.owner, 322 self.timeout, 323 "another principal", 324 token_list, 325 ) 326 assert lock is None, "Could acquire a conflicting lock" 327 328 # Try to lock child with another principal 329 lock = self._acquire( 330 "/dav/res/sub", 331 "write", 332 "exclusive", 333 "infinity", 334 self.owner, 335 self.timeout, 336 "another principal", 337 token_list, 338 ) 339 assert lock is None, "Could acquire a conflicting child lock" 340 341 # Try to lock parent with same principal 342 lock = self._acquire( 343 "/dav/", 344 "write", 345 "exclusive", 346 "infinity", 347 self.owner, 348 self.timeout, 349 self.principal, 350 token_list, 351 ) 352 assert lock is None, "Could acquire a conflicting parent lock" 353 354 # Try to lock child with same principal 355 lock = self._acquire( 356 "/dav/res/sub", 357 "write", 358 "exclusive", 359 "infinity", 360 self.owner, 361 self.timeout, 362 self.principal, 363 token_list, 364 ) 365 assert lock is None, "Could acquire a conflicting child lock (same principal)" 366 367 368# ======================================================================== 369# ShelveTest 370# ======================================================================== 371class ShelveTest(BasicTest): 372 """Test lock_manager.ShelveLockManager().""" 373 374 def setUp(self): 375 if sys.version_info < (3, 0): 376 modifier = "-py2" # shelve formats are incompatible 377 else: 378 modifier = "-py3" 379 self.path = os.path.join( 380 gettempdir(), "wsgidav-locks{}.shelve".format(modifier) 381 ) 382 storage = lock_storage.LockStorageShelve(self.path) 383 self.lm = lock_manager.LockManager(storage) 384 self.lm._verbose = 2 385 386 def tearDown(self): 387 self.lm.storage.clear() 388 self.lm = None 389 # Note: os.remove(self.path) does not work, because Shelve may append 390 # a file extension. 391 392 393# if os.path.exists(self.path): 394# os.remove(self.path) 395 396 397class RedisTest(BasicTest): 398 def setUp(self): 399 try: 400 import redis 401 402 r = redis.Redis() 403 r.ping() 404 except redis.exceptions.ConnectionError: 405 raise unittest.SkipTest("Test requires a running redis instance") 406 storage = LockStorageRedis() 407 self.lm = lock_manager.LockManager(storage) 408 self.lm._verbose = 2 409 410 def tearDown(self): 411 self.lm.storage.clear() 412 self.lm = None 413 414 415# ======================================================================== 416# suite 417# ======================================================================== 418# def suite(): 419# """Return suites of all test cases.""" 420# return TestSuite([BasicTest.suite(), 421# ShelveTest.suite(), 422# ]) 423 424 425if __name__ == "__main__": 426 unittest.main() 427# suite = suite() 428# TextTestRunner(descriptions=0, verbosity=2).run(suite) 429