1# -*- coding: utf-8 -*-
2
3# Copyright 2011 OpenStack Foundation.
4# Copyright 2011 Justin Santa Barbara
5#
6#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7#    not use this file except in compliance with the License. You may obtain
8#    a copy of the License at
9#
10#         http://www.apache.org/licenses/LICENSE-2.0
11#
12#    Unless required by applicable law or agreed to in writing, software
13#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15#    License for the specific language governing permissions and limitations
16#    under the License.
17
18import contextlib
19import errno
20import multiprocessing
21import os
22import shutil
23import sys
24import tempfile
25import threading
26import time
27
28from fasteners import process_lock as pl
29from fasteners import test
30
31WIN32 = os.name == 'nt'
32
33
34class BrokenLock(pl.InterProcessLock):
35    def __init__(self, name, errno_code):
36        super(BrokenLock, self).__init__(name)
37        self.errno_code = errno_code
38
39    def unlock(self):
40        pass
41
42    def trylock(self):
43        err = IOError()
44        err.errno = self.errno_code
45        raise err
46
47
48@contextlib.contextmanager
49def scoped_child_processes(children, timeout=0.1, exitcode=0):
50    for child in children:
51        child.daemon = True
52        child.start()
53    yield
54    start = time.time()
55    timed_out = 0
56
57    for child in children:
58        child.join(max(timeout - (time.time() - start), 0))
59        if child.is_alive():
60            timed_out += 1
61        child.terminate()
62
63    if timed_out:
64        msg = "{} child processes killed due to timeout\n".format(timed_out)
65        sys.stderr.write(msg)
66
67    if exitcode is not None:
68        for child in children:
69            c_code = child.exitcode
70            msg = "Child exitcode {} != {}"
71            assert c_code == exitcode, msg.format(c_code, exitcode)
72
73
74def try_lock(lock_file):
75    try:
76        my_lock = pl.InterProcessLock(lock_file)
77        my_lock.lockfile = open(lock_file, 'w')
78        my_lock.trylock()
79        my_lock.unlock()
80        os._exit(1)
81    except IOError:
82        os._exit(0)
83
84
85def lock_files(lock_path, handles_dir, num_handles=50):
86    with pl.InterProcessLock(lock_path):
87
88        # Open some files we can use for locking
89        handles = []
90        for n in range(num_handles):
91            path = os.path.join(handles_dir, ('file-%s' % n))
92            handles.append(open(path, 'w'))
93
94        # Loop over all the handles and try locking the file
95        # without blocking, keep a count of how many files we
96        # were able to lock and then unlock. If the lock fails
97        # we get an IOError and bail out with bad exit code
98        count = 0
99        for handle in handles:
100            try:
101                pl.InterProcessLock._trylock(handle)
102                count += 1
103                pl.InterProcessLock._unlock(handle)
104            except IOError:
105                os._exit(2)
106            finally:
107                handle.close()
108
109        # Check if we were able to open all files
110        if count != num_handles:
111            raise AssertionError("Unable to open all handles")
112
113
114def inter_processlock_helper(lockname, lock_filename, pipe):
115    lock2 = pl.InterProcessLock(lockname)
116    lock2.lockfile = open(lock_filename, 'w')
117    have_lock = False
118    while not have_lock:
119        try:
120            lock2.trylock()
121            have_lock = True
122        except IOError:
123            pass
124    # Hold the lock and wait for the parent
125    pipe.send(None)
126    pipe.recv()
127
128
129class ProcessLockTest(test.TestCase):
130    def setUp(self):
131        super(ProcessLockTest, self).setUp()
132        self.lock_dir = tempfile.mkdtemp()
133        self.tmp_dirs = [self.lock_dir]
134
135    def tearDown(self):
136        super(ProcessLockTest, self).tearDown()
137        for a_dir in reversed(self.tmp_dirs):
138            if os.path.exists(a_dir):
139                shutil.rmtree(a_dir, ignore_errors=True)
140
141    def test_lock_acquire_release_file_lock(self):
142        lock_file = os.path.join(self.lock_dir, 'lock')
143        lock = pl.InterProcessLock(lock_file)
144
145        def attempt_acquire(count):
146            children = [
147                multiprocessing.Process(target=try_lock, args=(lock_file,))
148                for i in range(count)]
149            with scoped_child_processes(children, timeout=10, exitcode=None):
150                pass
151            return sum(c.exitcode for c in children)
152
153        self.assertTrue(lock.acquire())
154        try:
155            acquired_children = attempt_acquire(10)
156            self.assertEqual(0, acquired_children)
157        finally:
158            lock.release()
159
160        acquired_children = attempt_acquire(5)
161        self.assertNotEqual(0, acquired_children)
162
163    def test_nested_synchronized_external_works(self):
164        sentinel = object()
165
166        @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-1'))
167        def outer_lock():
168
169            @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-2'))
170            def inner_lock():
171                return sentinel
172
173            return inner_lock()
174
175        self.assertEqual(sentinel, outer_lock())
176
177    def _do_test_lock_externally(self, lock_dir):
178        lock_path = os.path.join(lock_dir, "lock")
179
180        handles_dir = tempfile.mkdtemp()
181        self.tmp_dirs.append(handles_dir)
182
183        num_handles = 50
184        num_processes = 50
185        args = [lock_path, handles_dir, num_handles]
186        children = [multiprocessing.Process(target=lock_files, args=args)
187                    for _ in range(num_processes)]
188
189        with scoped_child_processes(children, timeout=30, exitcode=0):
190            pass
191
192    def test_lock_externally(self):
193        self._do_test_lock_externally(self.lock_dir)
194
195    def test_lock_externally_lock_dir_not_exist(self):
196        os.rmdir(self.lock_dir)
197        self._do_test_lock_externally(self.lock_dir)
198
199    def test_lock_file_exists(self):
200        lock_file = os.path.join(self.lock_dir, 'lock')
201
202        @pl.interprocess_locked(lock_file)
203        def foo():
204            self.assertTrue(os.path.exists(lock_file))
205
206        foo()
207
208    def test_bad_acquire(self):
209        lock_file = os.path.join(self.lock_dir, 'lock')
210        lock = BrokenLock(lock_file, errno.EBUSY)
211        self.assertRaises(threading.ThreadError, lock.acquire)
212
213    def test_bad_release(self):
214        lock_file = os.path.join(self.lock_dir, 'lock')
215        lock = pl.InterProcessLock(lock_file)
216        self.assertRaises(threading.ThreadError, lock.release)
217
218    def test_interprocess_lock(self):
219        lock_file = os.path.join(self.lock_dir, 'lock')
220        lock_name = 'foo'
221
222        child_pipe, them = multiprocessing.Pipe()
223        child = multiprocessing.Process(
224            target=inter_processlock_helper, args=(lock_name, lock_file, them))
225
226        with scoped_child_processes((child,)):
227
228            # Make sure the child grabs the lock first
229            if not child_pipe.poll(5):
230                self.fail('Timed out waiting for child to grab lock')
231
232            start = time.time()
233            lock1 = pl.InterProcessLock(lock_name)
234            lock1.lockfile = open(lock_file, 'w')
235            # NOTE(bnemec): There is a brief window between when the lock file
236            # is created and when it actually becomes locked.  If we happen to
237            # context switch in that window we may succeed in locking the
238            # file. Keep retrying until we either get the expected exception
239            # or timeout waiting.
240            while time.time() - start < 5:
241                try:
242                    lock1.trylock()
243                    lock1.unlock()
244                    time.sleep(0)
245                except IOError:
246                    # This is what we expect to happen
247                    break
248            else:
249                self.fail('Never caught expected lock exception')
250
251            child_pipe.send(None)
252
253    @test.testtools.skipIf(WIN32, "Windows cannot open file handles twice")
254    def test_non_destructive(self):
255        lock_file = os.path.join(self.lock_dir, 'not-destroyed')
256        with open(lock_file, 'w') as f:
257            f.write('test')
258        with pl.InterProcessLock(lock_file):
259            with open(lock_file) as f:
260                self.assertEqual(f.read(), 'test')
261