1# -*- coding: utf-8 -*- 2 3# Copyright 2011 OpenStack Foundation. 4# Copyright 2011 Justin Santa Barbara 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); you may 7# not use this file except in compliance with the License. You may obtain 8# a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 15# License for the specific language governing permissions and limitations 16# under the License. 17 18import contextlib 19import errno 20import multiprocessing 21import os 22import shutil 23import sys 24import tempfile 25import threading 26import time 27 28from fasteners import process_lock as pl 29from fasteners import test 30 31WIN32 = os.name == 'nt' 32 33 34class BrokenLock(pl.InterProcessLock): 35 def __init__(self, name, errno_code): 36 super(BrokenLock, self).__init__(name) 37 self.errno_code = errno_code 38 39 def unlock(self): 40 pass 41 42 def trylock(self): 43 err = IOError() 44 err.errno = self.errno_code 45 raise err 46 47 48@contextlib.contextmanager 49def scoped_child_processes(children, timeout=0.1, exitcode=0): 50 for child in children: 51 child.daemon = True 52 child.start() 53 yield 54 start = time.time() 55 timed_out = 0 56 57 for child in children: 58 child.join(max(timeout - (time.time() - start), 0)) 59 if child.is_alive(): 60 timed_out += 1 61 child.terminate() 62 63 if timed_out: 64 msg = "{} child processes killed due to timeout\n".format(timed_out) 65 sys.stderr.write(msg) 66 67 if exitcode is not None: 68 for child in children: 69 c_code = child.exitcode 70 msg = "Child exitcode {} != {}" 71 assert c_code == exitcode, msg.format(c_code, exitcode) 72 73 74def try_lock(lock_file): 75 try: 76 my_lock = pl.InterProcessLock(lock_file) 77 my_lock.lockfile = open(lock_file, 'w') 78 my_lock.trylock() 79 my_lock.unlock() 80 os._exit(1) 81 except IOError: 82 os._exit(0) 83 84 85def lock_files(lock_path, handles_dir, num_handles=50): 86 with pl.InterProcessLock(lock_path): 87 88 # Open some files we can use for locking 89 handles = [] 90 for n in range(num_handles): 91 path = os.path.join(handles_dir, ('file-%s' % n)) 92 handles.append(open(path, 'w')) 93 94 # Loop over all the handles and try locking the file 95 # without blocking, keep a count of how many files we 96 # were able to lock and then unlock. If the lock fails 97 # we get an IOError and bail out with bad exit code 98 count = 0 99 for handle in handles: 100 try: 101 pl.InterProcessLock._trylock(handle) 102 count += 1 103 pl.InterProcessLock._unlock(handle) 104 except IOError: 105 os._exit(2) 106 finally: 107 handle.close() 108 109 # Check if we were able to open all files 110 if count != num_handles: 111 raise AssertionError("Unable to open all handles") 112 113 114def inter_processlock_helper(lockname, lock_filename, pipe): 115 lock2 = pl.InterProcessLock(lockname) 116 lock2.lockfile = open(lock_filename, 'w') 117 have_lock = False 118 while not have_lock: 119 try: 120 lock2.trylock() 121 have_lock = True 122 except IOError: 123 pass 124 # Hold the lock and wait for the parent 125 pipe.send(None) 126 pipe.recv() 127 128 129class ProcessLockTest(test.TestCase): 130 def setUp(self): 131 super(ProcessLockTest, self).setUp() 132 self.lock_dir = tempfile.mkdtemp() 133 self.tmp_dirs = [self.lock_dir] 134 135 def tearDown(self): 136 super(ProcessLockTest, self).tearDown() 137 for a_dir in reversed(self.tmp_dirs): 138 if os.path.exists(a_dir): 139 shutil.rmtree(a_dir, ignore_errors=True) 140 141 def test_lock_acquire_release_file_lock(self): 142 lock_file = os.path.join(self.lock_dir, 'lock') 143 lock = pl.InterProcessLock(lock_file) 144 145 def attempt_acquire(count): 146 children = [ 147 multiprocessing.Process(target=try_lock, args=(lock_file,)) 148 for i in range(count)] 149 with scoped_child_processes(children, timeout=10, exitcode=None): 150 pass 151 return sum(c.exitcode for c in children) 152 153 self.assertTrue(lock.acquire()) 154 try: 155 acquired_children = attempt_acquire(10) 156 self.assertEqual(0, acquired_children) 157 finally: 158 lock.release() 159 160 acquired_children = attempt_acquire(5) 161 self.assertNotEqual(0, acquired_children) 162 163 def test_nested_synchronized_external_works(self): 164 sentinel = object() 165 166 @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-1')) 167 def outer_lock(): 168 169 @pl.interprocess_locked(os.path.join(self.lock_dir, 'test-lock-2')) 170 def inner_lock(): 171 return sentinel 172 173 return inner_lock() 174 175 self.assertEqual(sentinel, outer_lock()) 176 177 def _do_test_lock_externally(self, lock_dir): 178 lock_path = os.path.join(lock_dir, "lock") 179 180 handles_dir = tempfile.mkdtemp() 181 self.tmp_dirs.append(handles_dir) 182 183 num_handles = 50 184 num_processes = 50 185 args = [lock_path, handles_dir, num_handles] 186 children = [multiprocessing.Process(target=lock_files, args=args) 187 for _ in range(num_processes)] 188 189 with scoped_child_processes(children, timeout=30, exitcode=0): 190 pass 191 192 def test_lock_externally(self): 193 self._do_test_lock_externally(self.lock_dir) 194 195 def test_lock_externally_lock_dir_not_exist(self): 196 os.rmdir(self.lock_dir) 197 self._do_test_lock_externally(self.lock_dir) 198 199 def test_lock_file_exists(self): 200 lock_file = os.path.join(self.lock_dir, 'lock') 201 202 @pl.interprocess_locked(lock_file) 203 def foo(): 204 self.assertTrue(os.path.exists(lock_file)) 205 206 foo() 207 208 def test_bad_acquire(self): 209 lock_file = os.path.join(self.lock_dir, 'lock') 210 lock = BrokenLock(lock_file, errno.EBUSY) 211 self.assertRaises(threading.ThreadError, lock.acquire) 212 213 def test_bad_release(self): 214 lock_file = os.path.join(self.lock_dir, 'lock') 215 lock = pl.InterProcessLock(lock_file) 216 self.assertRaises(threading.ThreadError, lock.release) 217 218 def test_interprocess_lock(self): 219 lock_file = os.path.join(self.lock_dir, 'lock') 220 lock_name = 'foo' 221 222 child_pipe, them = multiprocessing.Pipe() 223 child = multiprocessing.Process( 224 target=inter_processlock_helper, args=(lock_name, lock_file, them)) 225 226 with scoped_child_processes((child,)): 227 228 # Make sure the child grabs the lock first 229 if not child_pipe.poll(5): 230 self.fail('Timed out waiting for child to grab lock') 231 232 start = time.time() 233 lock1 = pl.InterProcessLock(lock_name) 234 lock1.lockfile = open(lock_file, 'w') 235 # NOTE(bnemec): There is a brief window between when the lock file 236 # is created and when it actually becomes locked. If we happen to 237 # context switch in that window we may succeed in locking the 238 # file. Keep retrying until we either get the expected exception 239 # or timeout waiting. 240 while time.time() - start < 5: 241 try: 242 lock1.trylock() 243 lock1.unlock() 244 time.sleep(0) 245 except IOError: 246 # This is what we expect to happen 247 break 248 else: 249 self.fail('Never caught expected lock exception') 250 251 child_pipe.send(None) 252 253 @test.testtools.skipIf(WIN32, "Windows cannot open file handles twice") 254 def test_non_destructive(self): 255 lock_file = os.path.join(self.lock_dir, 'not-destroyed') 256 with open(lock_file, 'w') as f: 257 f.write('test') 258 with pl.InterProcessLock(lock_file): 259 with open(lock_file) as f: 260 self.assertEqual(f.read(), 'test') 261