1# -*- coding: utf-8 -*-
2# (c) 2009-2020 Martin Wendt and contributors; see WsgiDAV https://github.com/mar10/wsgidav
3# Licensed under the MIT license:
4# http://www.opensource.org/licenses/mit-license.php
5"""Unit test for lock_manager.py"""
6from tempfile import gettempdir
7from time import sleep
8from wsgidav import lock_manager, lock_storage
9from wsgidav.dav_error import DAVError
10from wsgidav.lock_storage_redis import LockStorageRedis
11
12import os
13import sys
14import unittest
15
16
17# ========================================================================
18# BasicTest
19# ========================================================================
20
21
22class BasicTest(unittest.TestCase):
23    """Test lock_manager.LockManager()."""
24
25    principal = "Joe Tester"
26    owner = b"joe.tester@example.com"
27    root = "/dav/res"
28    timeout = 10 * 60  # Default lock timeout 10 minutes
29
30    #     @classmethod
31    #     def suite(cls):
32    #         """Return test case suite (so we can control the order)."""
33    #         suite = TestSuite()
34    #         suite.addTest(cls("testPreconditions"))
35    # #        suite.addTest(cls("testOpen"))
36    #         suite.addTest(cls("testValidation"))
37    #         suite.addTest(cls("testLock"))
38    #         suite.addTest(cls("testTimeout"))
39    #         suite.addTest(cls("testConflict"))
40    #         return suite
41
42    def setUp(self):
43        storage = lock_storage.LockStorageDict()
44        self.lm = lock_manager.LockManager(storage)
45        self.lm._verbose = 1
46
47    def tearDown(self):
48        del self.lm
49
50    def _acquire(
51        self,
52        url,
53        lock_type,
54        lock_scope,
55        lock_depth,
56        lock_owner,
57        timeout,
58        principal,
59        token_list,
60    ):
61        """Wrapper for lm.acquire, that returns None instead of raising DAVError."""
62        try:
63            return self.lm.acquire(
64                url,
65                lock_type,
66                lock_scope,
67                lock_depth,
68                lock_owner,
69                timeout,
70                principal,
71                token_list,
72            )
73        except DAVError:
74            return None
75
76    def _isLockDict(self, o):
77        try:
78            _ = o["root"]  # noqa F841
79        except Exception:
80            return False
81        return True
82
83    def _isLockResultOK(self, resultTupleList):
84        """Return True, if result is [ (lock_dict, None) ]."""
85        try:
86            return (
87                len(resultTupleList) == 1
88                and len(resultTupleList) == 2
89                and self._isLockDict(resultTupleList[0][0])
90                and resultTupleList[0][1] is None
91            )
92        except Exception:
93            return False
94
95    def _isLockResultFault(self, lock, conflictList, status=None):
96        """Return True, if it is a valid result tuple containing a DAVError."""
97        try:
98            if lock is not None:
99                return False
100            if len(conflictList) < 1:
101                return False
102            resultTuple = conflictList[0]
103            if (
104                len(resultTuple) != 2
105                or not self._isLockDict(resultTuple[0])
106                or not isinstance(resultTuple[1], DAVError)
107            ):
108                return False
109            elif status and status != DAVError.value:
110                return False
111            return True
112        except Exception:
113            return False
114
115    def testPreconditions(self):
116        """Environment must be set."""
117        self.assertTrue(
118            __debug__, "__debug__ must be True, otherwise asserts are ignored"
119        )
120
121    #    def testOpen(self):
122    #        """Lock manager should be lazy opening on first access."""
123    #        lm = self.lm
124    # #        assert not lm._loaded, "LM must only be opened after first access"
125    #        lm._generate_lock(self.principal, "write", "exclusive", "infinity",
126    #                        self.owner,
127    #                        "/dav",
128    #                        10)
129    #        assert lm._loaded, "LM must be opened after first access"
130
131    def testValidation(self):
132        """Lock manager should raise errors on bad args."""
133        lm = self.lm
134        self.assertRaises(
135            AssertionError,
136            lm._generate_lock,
137            lm,
138            "writeX",
139            "exclusive",
140            "infinity",
141            self.owner,
142            self.root,
143            self.timeout,
144        )
145        self.assertRaises(
146            AssertionError,
147            lm._generate_lock,
148            lm,
149            "write",
150            "exclusiveX",
151            "infinity",
152            self.owner,
153            self.root,
154            self.timeout,
155        )
156        self.assertRaises(
157            AssertionError,
158            lm._generate_lock,
159            lm,
160            "write",
161            "exclusive",
162            "infinityX",
163            self.owner,
164            self.root,
165            self.timeout,
166        )
167        self.assertRaises(
168            AssertionError,
169            lm._generate_lock,
170            lm,
171            "write",
172            "exclusive",
173            "infinity",
174            None,
175            self.root,
176            self.timeout,
177        )
178        self.assertRaises(
179            AssertionError,
180            lm._generate_lock,
181            lm,
182            "write",
183            "exclusive",
184            "infinity",
185            self.owner,
186            None,
187            self.timeout,
188        )
189
190    #        assert lm._dict is None, "No locks should have been created by this test"
191
192    def testLock(self):
193        """Lock manager should create and find locks."""
194        lm = self.lm
195        url = "/dav/res"
196        # Create a new lock
197        lock_dict = lm._generate_lock(
198            self.principal,
199            "write",
200            "exclusive",
201            "infinity",
202            self.owner,
203            url,
204            self.timeout,
205        )
206        # Check returned dictionary
207        assert lock_dict is not None
208        assert lock_dict["root"] == url
209        assert lock_dict["type"] == "write"
210        assert lock_dict["scope"] == "exclusive"
211        assert lock_dict["depth"] == "infinity"
212        assert lock_dict["owner"] == self.owner
213        assert lock_dict["principal"] == self.principal
214
215        # Test lookup
216        tok = lock_dict.get("token")
217        assert lm.get_lock(tok, "root") == url
218
219        lock_dict = lm.get_lock(tok)
220
221        assert lock_dict is not None
222        assert lock_dict["root"] == url
223        assert lock_dict["type"] == "write"
224        assert lock_dict["scope"] == "exclusive"
225        assert lock_dict["depth"] == "infinity"
226        assert lock_dict["owner"] == self.owner
227        assert lock_dict["principal"] == self.principal
228
229        # We locked "/dav/res", did we?
230        assert lm.is_token_locked_by_user(tok, self.principal)
231
232        #        res = lm.get_url_lock_list(url, self.principal)
233        res = lm.get_url_lock_list(url)
234        self.assertEqual(len(res), 1)
235
236        #        res = lm.get_url_lock_list(url, "another user")
237        #        assert len(res) == 0
238
239        assert lm.is_url_locked_by_token(
240            "/dav/res", tok
241        ), "url not directly locked by lock_token."
242        assert lm.is_url_locked_by_token(
243            "/dav/res/", tok
244        ), "url not directly locked by lock_token."
245        assert lm.is_url_locked_by_token(
246            "/dav/res/sub", tok
247        ), "child url not indirectly locked"
248
249        assert not lm.is_url_locked_by_token(
250            "/dav/ressub", tok
251        ), "non-child url reported as locked"
252        assert not lm.is_url_locked_by_token(
253            "/dav", tok
254        ), "parent url reported as locked"
255        assert not lm.is_url_locked_by_token(
256            "/dav/", tok
257        ), "parent url reported as locked"
258
259    def testTimeout(self):
260        """Locks should be purged after expiration date."""
261        lm = self.lm
262        timeout = 1
263        lock_dict = lm._generate_lock(
264            self.principal,
265            "write",
266            "exclusive",
267            "infinity",
268            self.owner,
269            self.root,
270            timeout,
271        )
272
273        assert lock_dict is not None
274        tok = lock_dict.get("token")
275        assert lm.get_lock(tok, "root") == self.root
276
277        sleep(timeout - 0.5)
278        lock_dict = lm.get_lock(tok)
279        assert lock_dict is not None, "Lock expired too early"
280
281        sleep(1)
282        lock_dict = lm.get_lock(tok)
283        assert lock_dict is None, "Lock has not expired"
284
285    def testConflict(self):
286        """Locks should prevent conflicts."""
287        token_list = []
288
289        # Create a lock for '/dav/res/'
290        lock = self._acquire(
291            "/dav/res/",
292            "write",
293            "exclusive",
294            "infinity",
295            self.owner,
296            self.timeout,
297            self.principal,
298            token_list,
299        )
300        assert lock, "Could not acquire lock"
301
302        # Try to lock with a slightly different URL (without trailing '/')
303        lock = self._acquire(
304            "/dav/res",
305            "write",
306            "exclusive",
307            "infinity",
308            self.owner,
309            self.timeout,
310            "another principal",
311            token_list,
312        )
313        assert lock is None, "Could acquire a conflicting lock"
314
315        # Try to lock with another principal
316        lock = self._acquire(
317            "/dav/res/",
318            "write",
319            "exclusive",
320            "infinity",
321            self.owner,
322            self.timeout,
323            "another principal",
324            token_list,
325        )
326        assert lock is None, "Could acquire a conflicting lock"
327
328        # Try to lock child with another principal
329        lock = self._acquire(
330            "/dav/res/sub",
331            "write",
332            "exclusive",
333            "infinity",
334            self.owner,
335            self.timeout,
336            "another principal",
337            token_list,
338        )
339        assert lock is None, "Could acquire a conflicting child lock"
340
341        # Try to lock parent with same principal
342        lock = self._acquire(
343            "/dav/",
344            "write",
345            "exclusive",
346            "infinity",
347            self.owner,
348            self.timeout,
349            self.principal,
350            token_list,
351        )
352        assert lock is None, "Could acquire a conflicting parent lock"
353
354        # Try to lock child with same principal
355        lock = self._acquire(
356            "/dav/res/sub",
357            "write",
358            "exclusive",
359            "infinity",
360            self.owner,
361            self.timeout,
362            self.principal,
363            token_list,
364        )
365        assert lock is None, "Could acquire a conflicting child lock (same principal)"
366
367
368# ========================================================================
369# ShelveTest
370# ========================================================================
371class ShelveTest(BasicTest):
372    """Test lock_manager.ShelveLockManager()."""
373
374    def setUp(self):
375        if sys.version_info < (3, 0):
376            modifier = "-py2"  # shelve formats are incompatible
377        else:
378            modifier = "-py3"
379        self.path = os.path.join(
380            gettempdir(), "wsgidav-locks{}.shelve".format(modifier)
381        )
382        storage = lock_storage.LockStorageShelve(self.path)
383        self.lm = lock_manager.LockManager(storage)
384        self.lm._verbose = 2
385
386    def tearDown(self):
387        self.lm.storage.clear()
388        self.lm = None
389        # Note: os.remove(self.path) does not work, because Shelve may append
390        # a file extension.
391
392
393#         if os.path.exists(self.path):
394#             os.remove(self.path)
395
396
397class RedisTest(BasicTest):
398    def setUp(self):
399        try:
400            import redis
401
402            r = redis.Redis()
403            r.ping()
404        except redis.exceptions.ConnectionError:
405            raise unittest.SkipTest("Test requires a running redis instance")
406        storage = LockStorageRedis()
407        self.lm = lock_manager.LockManager(storage)
408        self.lm._verbose = 2
409
410    def tearDown(self):
411        self.lm.storage.clear()
412        self.lm = None
413
414
415# ========================================================================
416# suite
417# ========================================================================
418# def suite():
419#     """Return suites of all test cases."""
420#     return TestSuite([BasicTest.suite(),
421#                       ShelveTest.suite(),
422#                       ])
423
424
425if __name__ == "__main__":
426    unittest.main()
427#     suite = suite()
428#     TextTestRunner(descriptions=0, verbosity=2).run(suite)
429