1#    Copyright 2011 Justin Santa Barbara
2#
3#    Licensed under the Apache License, Version 2.0 (the "License"); you may
4#    not use this file except in compliance with the License. You may obtain
5#    a copy of the License at
6#
7#         http://www.apache.org/licenses/LICENSE-2.0
8#
9#    Unless required by applicable law or agreed to in writing, software
10#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12#    License for the specific language governing permissions and limitations
13#    under the License.
14
15import collections
16import errno
17import multiprocessing
18import os
19import signal
20import subprocess
21import sys
22import tempfile
23import threading
24import time
25from unittest import mock
26
27from oslo_config import cfg
28from oslotest import base as test_base
29
30from oslo_concurrency.fixture import lockutils as fixtures
31from oslo_concurrency import lockutils
32from oslo_config import fixture as config
33
34if sys.platform == 'win32':
35    import msvcrt
36else:
37    import fcntl
38
39
40def lock_file(handle):
41    if sys.platform == 'win32':
42        msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
43    else:
44        fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
45
46
47def unlock_file(handle):
48    if sys.platform == 'win32':
49        msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1)
50    else:
51        fcntl.flock(handle, fcntl.LOCK_UN)
52
53
54def lock_files(handles_dir, out_queue):
55    with lockutils.lock('external', 'test-', external=True):
56        # Open some files we can use for locking
57        handles = []
58        for n in range(50):
59            path = os.path.join(handles_dir, ('file-%s' % n))
60            handles.append(open(path, 'w'))
61
62        # Loop over all the handles and try locking the file
63        # without blocking, keep a count of how many files we
64        # were able to lock and then unlock. If the lock fails
65        # we get an IOError and bail out with bad exit code
66        count = 0
67        for handle in handles:
68            try:
69                lock_file(handle)
70                count += 1
71                unlock_file(handle)
72            except IOError:
73                os._exit(2)
74            finally:
75                handle.close()
76        return out_queue.put(count)
77
78
79class LockTestCase(test_base.BaseTestCase):
80
81    def setUp(self):
82        super(LockTestCase, self).setUp()
83        self.config = self.useFixture(config.Config(lockutils.CONF)).config
84
85    def test_synchronized_wrapped_function_metadata(self):
86        @lockutils.synchronized('whatever', 'test-')
87        def foo():
88            """Bar."""
89            pass
90
91        self.assertEqual('Bar.', foo.__doc__, "Wrapped function's docstring "
92                                              "got lost")
93        self.assertEqual('foo', foo.__name__, "Wrapped function's name "
94                                              "got mangled")
95
96    def test_lock_internally_different_collections(self):
97        s1 = lockutils.Semaphores()
98        s2 = lockutils.Semaphores()
99        trigger = threading.Event()
100        who_ran = collections.deque()
101
102        def f(name, semaphores, pull_trigger):
103            with lockutils.internal_lock('testing', semaphores=semaphores):
104                if pull_trigger:
105                    trigger.set()
106                else:
107                    trigger.wait()
108                who_ran.append(name)
109
110        threads = [
111            threading.Thread(target=f, args=(1, s1, True)),
112            threading.Thread(target=f, args=(2, s2, False)),
113        ]
114        for thread in threads:
115            thread.start()
116        for thread in threads:
117            thread.join()
118        self.assertEqual([1, 2], sorted(who_ran))
119
120    def test_lock_internally(self):
121        """We can lock across multiple threads."""
122        saved_sem_num = len(lockutils._semaphores)
123        seen_threads = list()
124
125        def f(_id):
126            with lockutils.lock('testlock2', 'test-', external=False):
127                for x in range(10):
128                    seen_threads.append(_id)
129
130        threads = []
131        for i in range(10):
132            thread = threading.Thread(target=f, args=(i,))
133            threads.append(thread)
134            thread.start()
135
136        for thread in threads:
137            thread.join()
138
139        self.assertEqual(100, len(seen_threads))
140        # Looking at the seen threads, split it into chunks of 10, and verify
141        # that the last 9 match the first in each chunk.
142        for i in range(10):
143            for j in range(9):
144                self.assertEqual(seen_threads[i * 10],
145                                 seen_threads[i * 10 + 1 + j])
146
147        self.assertEqual(saved_sem_num, len(lockutils._semaphores),
148                         "Semaphore leak detected")
149
150    def test_lock_internal_fair(self):
151        """Check that we're actually fair."""
152
153        def f(_id):
154            with lockutils.lock('testlock', 'test-',
155                                external=False, fair=True):
156                lock_holder.append(_id)
157
158        lock_holder = []
159        threads = []
160        # While holding the fair lock, spawn a bunch of threads that all try
161        # to acquire the lock.  They will all block.  Then release the lock
162        # and see what happens.
163        with lockutils.lock('testlock', 'test-', external=False, fair=True):
164            for i in range(10):
165                thread = threading.Thread(target=f, args=(i,))
166                threads.append(thread)
167                thread.start()
168                # Allow some time for the new thread to get queued onto the
169                # list of pending writers before continuing.  This is gross
170                # but there's no way around it without using knowledge of
171                # fasteners internals.
172                time.sleep(0.5)
173        # Wait for all threads.
174        for thread in threads:
175            thread.join()
176
177        self.assertEqual(10, len(lock_holder))
178        # Check that the threads each got the lock in fair order.
179        for i in range(10):
180            self.assertEqual(i, lock_holder[i])
181
182    def test_fair_lock_with_semaphore(self):
183        def do_test():
184            s = lockutils.Semaphores()
185            with lockutils.lock('testlock', 'test-', semaphores=s, fair=True):
186                pass
187        self.assertRaises(NotImplementedError, do_test)
188
189    def test_fair_lock_with_nonblocking(self):
190        def do_test():
191            with lockutils.lock('testlock', 'test-', fair=True,
192                                blocking=False):
193                pass
194        self.assertRaises(NotImplementedError, do_test)
195
196    def test_nested_synchronized_external_works(self):
197        """We can nest external syncs."""
198        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
199        sentinel = object()
200
201        @lockutils.synchronized('testlock1', 'test-', external=True)
202        def outer_lock():
203
204            @lockutils.synchronized('testlock2', 'test-', external=True)
205            def inner_lock():
206                return sentinel
207            return inner_lock()
208
209        self.assertEqual(sentinel, outer_lock())
210
211    def _do_test_lock_externally(self):
212        """We can lock across multiple processes."""
213        children = []
214        for n in range(50):
215            queue = multiprocessing.Queue()
216            proc = multiprocessing.Process(
217                target=lock_files,
218                args=(tempfile.mkdtemp(), queue))
219            proc.start()
220            children.append((proc, queue))
221        for child, queue in children:
222            child.join()
223            count = queue.get(block=False)
224            self.assertEqual(50, count)
225
226    def test_lock_externally(self):
227        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
228
229        self._do_test_lock_externally()
230
231    def test_lock_externally_lock_dir_not_exist(self):
232        lock_dir = tempfile.mkdtemp()
233        os.rmdir(lock_dir)
234        self.config(lock_path=lock_dir, group='oslo_concurrency')
235
236        self._do_test_lock_externally()
237
238    def test_lock_with_prefix(self):
239        # TODO(efried): Embetter this test
240        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
241        foo = lockutils.lock_with_prefix('mypfix-')
242
243        with foo('mylock', external=True):
244            # We can't check much
245            pass
246
247    def test_synchronized_with_prefix(self):
248        lock_name = 'mylock'
249        lock_pfix = 'mypfix-'
250
251        foo = lockutils.synchronized_with_prefix(lock_pfix)
252
253        @foo(lock_name, external=True)
254        def bar(dirpath, pfix, name):
255            return True
256
257        lock_dir = tempfile.mkdtemp()
258        self.config(lock_path=lock_dir, group='oslo_concurrency')
259
260        self.assertTrue(bar(lock_dir, lock_pfix, lock_name))
261
262    def test_synchronized_without_prefix(self):
263        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
264
265        @lockutils.synchronized('lock', external=True)
266        def test_without_prefix():
267            # We can't check much
268            pass
269
270        test_without_prefix()
271
272    def test_synchronized_prefix_without_hypen(self):
273        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
274
275        @lockutils.synchronized('lock', 'hypen', True)
276        def test_without_hypen():
277            # We can't check much
278            pass
279
280        test_without_hypen()
281
282    def test_contextlock(self):
283        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
284
285        # Note(flaper87): Lock is not external, which means
286        # a semaphore will be yielded
287        with lockutils.lock("test") as sem:
288            self.assertIsInstance(sem, threading.Semaphore)
289
290            # NOTE(flaper87): Lock is external so an InterProcessLock
291            # will be yielded.
292            with lockutils.lock("test2", external=True) as lock:
293                self.assertTrue(lock.exists())
294
295            with lockutils.lock("test1", external=True) as lock1:
296                self.assertIsInstance(lock1, lockutils.InterProcessLock)
297
298    def test_contextlock_unlocks(self):
299        self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
300
301        with lockutils.lock("test") as sem:
302            self.assertIsInstance(sem, threading.Semaphore)
303
304            with lockutils.lock("test2", external=True) as lock:
305                self.assertTrue(lock.exists())
306
307            # NOTE(flaper87): Lock should be free
308            with lockutils.lock("test2", external=True) as lock:
309                self.assertTrue(lock.exists())
310
311        # NOTE(flaper87): Lock should be free
312        # but semaphore should already exist.
313        with lockutils.lock("test") as sem2:
314            self.assertEqual(sem, sem2)
315
316    @mock.patch('logging.Logger.info')
317    @mock.patch('os.remove')
318    @mock.patch('oslo_concurrency.lockutils._get_lock_path')
319    def test_remove_lock_external_file_exists(self, path_mock, remove_mock,
320                                              log_mock):
321        lockutils.remove_external_lock_file(mock.sentinel.name,
322                                            mock.sentinel.prefix,
323                                            mock.sentinel.lock_path)
324
325        path_mock.assert_called_once_with(mock.sentinel.name,
326                                          mock.sentinel.prefix,
327                                          mock.sentinel.lock_path)
328        remove_mock.assert_called_once_with(path_mock.return_value)
329        log_mock.assert_not_called()
330
331    @mock.patch('logging.Logger.warning')
332    @mock.patch('os.remove', side_effect=OSError(errno.ENOENT, None))
333    @mock.patch('oslo_concurrency.lockutils._get_lock_path')
334    def test_remove_lock_external_file_doesnt_exists(self, path_mock,
335                                                     remove_mock, log_mock):
336        lockutils.remove_external_lock_file(mock.sentinel.name,
337                                            mock.sentinel.prefix,
338                                            mock.sentinel.lock_path)
339        path_mock.assert_called_once_with(mock.sentinel.name,
340                                          mock.sentinel.prefix,
341                                          mock.sentinel.lock_path)
342        remove_mock.assert_called_once_with(path_mock.return_value)
343        log_mock.assert_not_called()
344
345    @mock.patch('logging.Logger.warning')
346    @mock.patch('os.remove', side_effect=OSError(errno.EPERM, None))
347    @mock.patch('oslo_concurrency.lockutils._get_lock_path')
348    def test_remove_lock_external_file_permission_error(
349            self, path_mock, remove_mock, log_mock):
350        lockutils.remove_external_lock_file(mock.sentinel.name,
351                                            mock.sentinel.prefix,
352                                            mock.sentinel.lock_path)
353        path_mock.assert_called_once_with(mock.sentinel.name,
354                                          mock.sentinel.prefix,
355                                          mock.sentinel.lock_path)
356        remove_mock.assert_called_once_with(path_mock.return_value)
357        log_mock.assert_called()
358
359    def test_no_slash_in_b64(self):
360        # base64(sha1(foobar)) has a slash in it
361        with lockutils.lock("foobar"):
362            pass
363
364    def test_deprecated_names(self):
365        paths = self.create_tempfiles([['fake.conf', '\n'.join([
366            '[DEFAULT]',
367            'lock_path=foo',
368            'disable_process_locking=True'])
369        ]])
370        conf = cfg.ConfigOpts()
371        conf(['--config-file', paths[0]])
372        conf.register_opts(lockutils._opts, 'oslo_concurrency')
373        self.assertEqual('foo', conf.oslo_concurrency.lock_path)
374        self.assertTrue(conf.oslo_concurrency.disable_process_locking)
375
376
377class FileBasedLockingTestCase(test_base.BaseTestCase):
378    def setUp(self):
379        super(FileBasedLockingTestCase, self).setUp()
380        self.lock_dir = tempfile.mkdtemp()
381
382    def test_lock_file_exists(self):
383        lock_file = os.path.join(self.lock_dir, 'lock-file')
384
385        @lockutils.synchronized('lock-file', external=True,
386                                lock_path=self.lock_dir)
387        def foo():
388            self.assertTrue(os.path.exists(lock_file))
389
390        foo()
391
392    def test_interprocess_lock(self):
393        lock_file = os.path.join(self.lock_dir, 'processlock')
394
395        pid = os.fork()
396        if pid:
397            # Make sure the child grabs the lock first
398            start = time.time()
399            while not os.path.exists(lock_file):
400                if time.time() - start > 5:
401                    self.fail('Timed out waiting for child to grab lock')
402                time.sleep(0)
403            lock1 = lockutils.InterProcessLock('foo')
404            lock1.lockfile = open(lock_file, 'w')
405            # NOTE(bnemec): There is a brief window between when the lock file
406            # is created and when it actually becomes locked.  If we happen to
407            # context switch in that window we may succeed in locking the
408            # file.  Keep retrying until we either get the expected exception
409            # or timeout waiting.
410            while time.time() - start < 5:
411                try:
412                    lock1.trylock()
413                    lock1.unlock()
414                    time.sleep(0)
415                except IOError:
416                    # This is what we expect to happen
417                    break
418            else:
419                self.fail('Never caught expected lock exception')
420            # We don't need to wait for the full sleep in the child here
421            os.kill(pid, signal.SIGKILL)
422        else:
423            try:
424                lock2 = lockutils.InterProcessLock('foo')
425                lock2.lockfile = open(lock_file, 'w')
426                have_lock = False
427                while not have_lock:
428                    try:
429                        lock2.trylock()
430                        have_lock = True
431                    except IOError:
432                        pass
433            finally:
434                # NOTE(bnemec): This is racy, but I don't want to add any
435                # synchronization primitives that might mask a problem
436                # with the one we're trying to test here.
437                time.sleep(.5)
438                os._exit(0)
439
440    def test_interprocess_nonblocking_external_lock(self):
441        """Check that we're not actually blocking between processes."""
442
443        nb_calls = multiprocessing.Value('i', 0)
444
445        @lockutils.synchronized('foo', blocking=False, external=True,
446                                lock_path=self.lock_dir)
447        def foo(param):
448            """Simulate a long-running operation in a process."""
449            param.value += 1
450            time.sleep(.5)
451
452        def other(param):
453            foo(param)
454
455        process = multiprocessing.Process(target=other, args=(nb_calls, ))
456        process.start()
457        # Make sure the other process grabs the lock
458        start = time.time()
459        while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
460            if time.time() - start > 5:
461                self.fail('Timed out waiting for process to grab lock')
462            time.sleep(0)
463        process1 = multiprocessing.Process(target=other, args=(nb_calls, ))
464        process1.start()
465        process1.join()
466        process.join()
467        self.assertEqual(1, nb_calls.value)
468
469    def test_interthread_external_lock(self):
470        call_list = []
471
472        @lockutils.synchronized('foo', external=True, lock_path=self.lock_dir)
473        def foo(param):
474            """Simulate a long-running threaded operation."""
475            call_list.append(param)
476            # NOTE(bnemec): This is racy, but I don't want to add any
477            # synchronization primitives that might mask a problem
478            # with the one we're trying to test here.
479            time.sleep(.5)
480            call_list.append(param)
481
482        def other(param):
483            foo(param)
484
485        thread = threading.Thread(target=other, args=('other',))
486        thread.start()
487        # Make sure the other thread grabs the lock
488        # NOTE(bnemec): File locks do not actually work between threads, so
489        # this test is verifying that the local semaphore is still enforcing
490        # external locks in that case.  This means this test does not have
491        # the same race problem as the process test above because when the
492        # file is created the semaphore has already been grabbed.
493        start = time.time()
494        while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
495            if time.time() - start > 5:
496                self.fail('Timed out waiting for thread to grab lock')
497            time.sleep(0)
498        thread1 = threading.Thread(target=other, args=('main',))
499        thread1.start()
500        thread1.join()
501        thread.join()
502        self.assertEqual(['other', 'other', 'main', 'main'], call_list)
503
504    def test_interthread_nonblocking_external_lock(self):
505        call_list = []
506
507        @lockutils.synchronized('foo', external=True, blocking=False,
508                                lock_path=self.lock_dir)
509        def foo(param):
510            """Simulate a long-running threaded operation."""
511            call_list.append(param)
512            time.sleep(.5)
513            call_list.append(param)
514
515        def other(param):
516            foo(param)
517
518        thread = threading.Thread(target=other, args=('other',))
519        thread.start()
520        # Make sure the other thread grabs the lock
521        start = time.time()
522        while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
523            if time.time() - start > 5:
524                self.fail('Timed out waiting for thread to grab lock')
525            time.sleep(0)
526        thread1 = threading.Thread(target=other, args=('main',))
527        thread1.start()
528        thread1.join()
529        thread.join()
530        self.assertEqual(['other', 'other'], call_list)
531
532    def test_interthread_nonblocking_internal_lock(self):
533        call_list = []
534
535        @lockutils.synchronized('foo', blocking=False,
536                                lock_path=self.lock_dir)
537        def foo(param):
538            # Simulate a long-running threaded operation.
539            call_list.append(param)
540            time.sleep(.5)
541            call_list.append(param)
542
543        def other(param):
544            foo(param)
545
546        thread = threading.Thread(target=other, args=('other',))
547        thread.start()
548        # Make sure the other thread grabs the lock
549        start = time.time()
550        while not call_list:
551            if time.time() - start > 5:
552                self.fail('Timed out waiting for thread to grab lock')
553            time.sleep(0)
554        thread1 = threading.Thread(target=other, args=('main',))
555        thread1.start()
556        thread1.join()
557        thread.join()
558        self.assertEqual(['other', 'other'], call_list)
559
560    def test_non_destructive(self):
561        lock_file = os.path.join(self.lock_dir, 'not-destroyed')
562        with open(lock_file, 'w') as f:
563            f.write('test')
564        with lockutils.lock('not-destroyed', external=True,
565                            lock_path=self.lock_dir):
566            with open(lock_file) as f:
567                self.assertEqual('test', f.read())
568
569
570class LockutilsModuleTestCase(test_base.BaseTestCase):
571
572    def setUp(self):
573        super(LockutilsModuleTestCase, self).setUp()
574        self.old_env = os.environ.get('OSLO_LOCK_PATH')
575        if self.old_env is not None:
576            del os.environ['OSLO_LOCK_PATH']
577
578    def tearDown(self):
579        if self.old_env is not None:
580            os.environ['OSLO_LOCK_PATH'] = self.old_env
581        super(LockutilsModuleTestCase, self).tearDown()
582
583    def test_main(self):
584        script = '\n'.join([
585            'import os',
586            'lock_path = os.environ.get("OSLO_LOCK_PATH")',
587            'assert lock_path is not None',
588            'assert os.path.isdir(lock_path)',
589        ])
590        argv = ['', sys.executable, '-c', script]
591        retval = lockutils._lock_wrapper(argv)
592        self.assertEqual(0, retval, "Bad OSLO_LOCK_PATH has been set")
593
594    def test_return_value_maintained(self):
595        script = '\n'.join([
596            'import sys',
597            'sys.exit(1)',
598        ])
599        argv = ['', sys.executable, '-c', script]
600        retval = lockutils._lock_wrapper(argv)
601        self.assertEqual(1, retval)
602
603    def test_direct_call_explodes(self):
604        cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils']
605        with open(os.devnull, 'w') as devnull:
606            retval = subprocess.call(cmd, stderr=devnull)
607            self.assertEqual(1, retval)
608
609
610class TestLockFixture(test_base.BaseTestCase):
611
612    def setUp(self):
613        super(TestLockFixture, self).setUp()
614        self.config = self.useFixture(config.Config(lockutils.CONF)).config
615        self.tempdir = tempfile.mkdtemp()
616
617    def _check_in_lock(self):
618        self.assertTrue(self.lock.exists())
619
620    def tearDown(self):
621        self._check_in_lock()
622        super(TestLockFixture, self).tearDown()
623
624    def test_lock_fixture(self):
625        # Setup lock fixture to test that teardown is inside the lock
626        self.config(lock_path=self.tempdir, group='oslo_concurrency')
627        fixture = fixtures.LockFixture('test-lock')
628        self.useFixture(fixture)
629        self.lock = fixture.lock
630
631
632class TestGetLockPath(test_base.BaseTestCase):
633
634    def setUp(self):
635        super(TestGetLockPath, self).setUp()
636        self.conf = self.useFixture(config.Config(lockutils.CONF)).conf
637
638    def test_get_default(self):
639        lockutils.set_defaults(lock_path='/the/path')
640        self.assertEqual('/the/path', lockutils.get_lock_path(self.conf))
641
642    def test_get_override(self):
643        lockutils._register_opts(self.conf)
644        self.conf.set_override('lock_path', '/alternate/path',
645                               group='oslo_concurrency')
646        self.assertEqual('/alternate/path', lockutils.get_lock_path(self.conf))
647