1# Copyright 2011 Justin Santa Barbara 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import collections 16import errno 17import multiprocessing 18import os 19import signal 20import subprocess 21import sys 22import tempfile 23import threading 24import time 25from unittest import mock 26 27from oslo_config import cfg 28from oslotest import base as test_base 29 30from oslo_concurrency.fixture import lockutils as fixtures 31from oslo_concurrency import lockutils 32from oslo_config import fixture as config 33 34if sys.platform == 'win32': 35 import msvcrt 36else: 37 import fcntl 38 39 40def lock_file(handle): 41 if sys.platform == 'win32': 42 msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1) 43 else: 44 fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB) 45 46 47def unlock_file(handle): 48 if sys.platform == 'win32': 49 msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1) 50 else: 51 fcntl.flock(handle, fcntl.LOCK_UN) 52 53 54def lock_files(handles_dir, out_queue): 55 with lockutils.lock('external', 'test-', external=True): 56 # Open some files we can use for locking 57 handles = [] 58 for n in range(50): 59 path = os.path.join(handles_dir, ('file-%s' % n)) 60 handles.append(open(path, 'w')) 61 62 # Loop over all the handles and try locking the file 63 # without blocking, keep a count of how many files we 64 # were able to lock and then unlock. If the lock fails 65 # we get an IOError and bail out with bad exit code 66 count = 0 67 for handle in handles: 68 try: 69 lock_file(handle) 70 count += 1 71 unlock_file(handle) 72 except IOError: 73 os._exit(2) 74 finally: 75 handle.close() 76 return out_queue.put(count) 77 78 79class LockTestCase(test_base.BaseTestCase): 80 81 def setUp(self): 82 super(LockTestCase, self).setUp() 83 self.config = self.useFixture(config.Config(lockutils.CONF)).config 84 85 def test_synchronized_wrapped_function_metadata(self): 86 @lockutils.synchronized('whatever', 'test-') 87 def foo(): 88 """Bar.""" 89 pass 90 91 self.assertEqual('Bar.', foo.__doc__, "Wrapped function's docstring " 92 "got lost") 93 self.assertEqual('foo', foo.__name__, "Wrapped function's name " 94 "got mangled") 95 96 def test_lock_internally_different_collections(self): 97 s1 = lockutils.Semaphores() 98 s2 = lockutils.Semaphores() 99 trigger = threading.Event() 100 who_ran = collections.deque() 101 102 def f(name, semaphores, pull_trigger): 103 with lockutils.internal_lock('testing', semaphores=semaphores): 104 if pull_trigger: 105 trigger.set() 106 else: 107 trigger.wait() 108 who_ran.append(name) 109 110 threads = [ 111 threading.Thread(target=f, args=(1, s1, True)), 112 threading.Thread(target=f, args=(2, s2, False)), 113 ] 114 for thread in threads: 115 thread.start() 116 for thread in threads: 117 thread.join() 118 self.assertEqual([1, 2], sorted(who_ran)) 119 120 def test_lock_internally(self): 121 """We can lock across multiple threads.""" 122 saved_sem_num = len(lockutils._semaphores) 123 seen_threads = list() 124 125 def f(_id): 126 with lockutils.lock('testlock2', 'test-', external=False): 127 for x in range(10): 128 seen_threads.append(_id) 129 130 threads = [] 131 for i in range(10): 132 thread = threading.Thread(target=f, args=(i,)) 133 threads.append(thread) 134 thread.start() 135 136 for thread in threads: 137 thread.join() 138 139 self.assertEqual(100, len(seen_threads)) 140 # Looking at the seen threads, split it into chunks of 10, and verify 141 # that the last 9 match the first in each chunk. 142 for i in range(10): 143 for j in range(9): 144 self.assertEqual(seen_threads[i * 10], 145 seen_threads[i * 10 + 1 + j]) 146 147 self.assertEqual(saved_sem_num, len(lockutils._semaphores), 148 "Semaphore leak detected") 149 150 def test_lock_internal_fair(self): 151 """Check that we're actually fair.""" 152 153 def f(_id): 154 with lockutils.lock('testlock', 'test-', 155 external=False, fair=True): 156 lock_holder.append(_id) 157 158 lock_holder = [] 159 threads = [] 160 # While holding the fair lock, spawn a bunch of threads that all try 161 # to acquire the lock. They will all block. Then release the lock 162 # and see what happens. 163 with lockutils.lock('testlock', 'test-', external=False, fair=True): 164 for i in range(10): 165 thread = threading.Thread(target=f, args=(i,)) 166 threads.append(thread) 167 thread.start() 168 # Allow some time for the new thread to get queued onto the 169 # list of pending writers before continuing. This is gross 170 # but there's no way around it without using knowledge of 171 # fasteners internals. 172 time.sleep(0.5) 173 # Wait for all threads. 174 for thread in threads: 175 thread.join() 176 177 self.assertEqual(10, len(lock_holder)) 178 # Check that the threads each got the lock in fair order. 179 for i in range(10): 180 self.assertEqual(i, lock_holder[i]) 181 182 def test_fair_lock_with_semaphore(self): 183 def do_test(): 184 s = lockutils.Semaphores() 185 with lockutils.lock('testlock', 'test-', semaphores=s, fair=True): 186 pass 187 self.assertRaises(NotImplementedError, do_test) 188 189 def test_fair_lock_with_nonblocking(self): 190 def do_test(): 191 with lockutils.lock('testlock', 'test-', fair=True, 192 blocking=False): 193 pass 194 self.assertRaises(NotImplementedError, do_test) 195 196 def test_nested_synchronized_external_works(self): 197 """We can nest external syncs.""" 198 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 199 sentinel = object() 200 201 @lockutils.synchronized('testlock1', 'test-', external=True) 202 def outer_lock(): 203 204 @lockutils.synchronized('testlock2', 'test-', external=True) 205 def inner_lock(): 206 return sentinel 207 return inner_lock() 208 209 self.assertEqual(sentinel, outer_lock()) 210 211 def _do_test_lock_externally(self): 212 """We can lock across multiple processes.""" 213 children = [] 214 for n in range(50): 215 queue = multiprocessing.Queue() 216 proc = multiprocessing.Process( 217 target=lock_files, 218 args=(tempfile.mkdtemp(), queue)) 219 proc.start() 220 children.append((proc, queue)) 221 for child, queue in children: 222 child.join() 223 count = queue.get(block=False) 224 self.assertEqual(50, count) 225 226 def test_lock_externally(self): 227 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 228 229 self._do_test_lock_externally() 230 231 def test_lock_externally_lock_dir_not_exist(self): 232 lock_dir = tempfile.mkdtemp() 233 os.rmdir(lock_dir) 234 self.config(lock_path=lock_dir, group='oslo_concurrency') 235 236 self._do_test_lock_externally() 237 238 def test_lock_with_prefix(self): 239 # TODO(efried): Embetter this test 240 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 241 foo = lockutils.lock_with_prefix('mypfix-') 242 243 with foo('mylock', external=True): 244 # We can't check much 245 pass 246 247 def test_synchronized_with_prefix(self): 248 lock_name = 'mylock' 249 lock_pfix = 'mypfix-' 250 251 foo = lockutils.synchronized_with_prefix(lock_pfix) 252 253 @foo(lock_name, external=True) 254 def bar(dirpath, pfix, name): 255 return True 256 257 lock_dir = tempfile.mkdtemp() 258 self.config(lock_path=lock_dir, group='oslo_concurrency') 259 260 self.assertTrue(bar(lock_dir, lock_pfix, lock_name)) 261 262 def test_synchronized_without_prefix(self): 263 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 264 265 @lockutils.synchronized('lock', external=True) 266 def test_without_prefix(): 267 # We can't check much 268 pass 269 270 test_without_prefix() 271 272 def test_synchronized_prefix_without_hypen(self): 273 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 274 275 @lockutils.synchronized('lock', 'hypen', True) 276 def test_without_hypen(): 277 # We can't check much 278 pass 279 280 test_without_hypen() 281 282 def test_contextlock(self): 283 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 284 285 # Note(flaper87): Lock is not external, which means 286 # a semaphore will be yielded 287 with lockutils.lock("test") as sem: 288 self.assertIsInstance(sem, threading.Semaphore) 289 290 # NOTE(flaper87): Lock is external so an InterProcessLock 291 # will be yielded. 292 with lockutils.lock("test2", external=True) as lock: 293 self.assertTrue(lock.exists()) 294 295 with lockutils.lock("test1", external=True) as lock1: 296 self.assertIsInstance(lock1, lockutils.InterProcessLock) 297 298 def test_contextlock_unlocks(self): 299 self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') 300 301 with lockutils.lock("test") as sem: 302 self.assertIsInstance(sem, threading.Semaphore) 303 304 with lockutils.lock("test2", external=True) as lock: 305 self.assertTrue(lock.exists()) 306 307 # NOTE(flaper87): Lock should be free 308 with lockutils.lock("test2", external=True) as lock: 309 self.assertTrue(lock.exists()) 310 311 # NOTE(flaper87): Lock should be free 312 # but semaphore should already exist. 313 with lockutils.lock("test") as sem2: 314 self.assertEqual(sem, sem2) 315 316 @mock.patch('logging.Logger.info') 317 @mock.patch('os.remove') 318 @mock.patch('oslo_concurrency.lockutils._get_lock_path') 319 def test_remove_lock_external_file_exists(self, path_mock, remove_mock, 320 log_mock): 321 lockutils.remove_external_lock_file(mock.sentinel.name, 322 mock.sentinel.prefix, 323 mock.sentinel.lock_path) 324 325 path_mock.assert_called_once_with(mock.sentinel.name, 326 mock.sentinel.prefix, 327 mock.sentinel.lock_path) 328 remove_mock.assert_called_once_with(path_mock.return_value) 329 log_mock.assert_not_called() 330 331 @mock.patch('logging.Logger.warning') 332 @mock.patch('os.remove', side_effect=OSError(errno.ENOENT, None)) 333 @mock.patch('oslo_concurrency.lockutils._get_lock_path') 334 def test_remove_lock_external_file_doesnt_exists(self, path_mock, 335 remove_mock, log_mock): 336 lockutils.remove_external_lock_file(mock.sentinel.name, 337 mock.sentinel.prefix, 338 mock.sentinel.lock_path) 339 path_mock.assert_called_once_with(mock.sentinel.name, 340 mock.sentinel.prefix, 341 mock.sentinel.lock_path) 342 remove_mock.assert_called_once_with(path_mock.return_value) 343 log_mock.assert_not_called() 344 345 @mock.patch('logging.Logger.warning') 346 @mock.patch('os.remove', side_effect=OSError(errno.EPERM, None)) 347 @mock.patch('oslo_concurrency.lockutils._get_lock_path') 348 def test_remove_lock_external_file_permission_error( 349 self, path_mock, remove_mock, log_mock): 350 lockutils.remove_external_lock_file(mock.sentinel.name, 351 mock.sentinel.prefix, 352 mock.sentinel.lock_path) 353 path_mock.assert_called_once_with(mock.sentinel.name, 354 mock.sentinel.prefix, 355 mock.sentinel.lock_path) 356 remove_mock.assert_called_once_with(path_mock.return_value) 357 log_mock.assert_called() 358 359 def test_no_slash_in_b64(self): 360 # base64(sha1(foobar)) has a slash in it 361 with lockutils.lock("foobar"): 362 pass 363 364 def test_deprecated_names(self): 365 paths = self.create_tempfiles([['fake.conf', '\n'.join([ 366 '[DEFAULT]', 367 'lock_path=foo', 368 'disable_process_locking=True']) 369 ]]) 370 conf = cfg.ConfigOpts() 371 conf(['--config-file', paths[0]]) 372 conf.register_opts(lockutils._opts, 'oslo_concurrency') 373 self.assertEqual('foo', conf.oslo_concurrency.lock_path) 374 self.assertTrue(conf.oslo_concurrency.disable_process_locking) 375 376 377class FileBasedLockingTestCase(test_base.BaseTestCase): 378 def setUp(self): 379 super(FileBasedLockingTestCase, self).setUp() 380 self.lock_dir = tempfile.mkdtemp() 381 382 def test_lock_file_exists(self): 383 lock_file = os.path.join(self.lock_dir, 'lock-file') 384 385 @lockutils.synchronized('lock-file', external=True, 386 lock_path=self.lock_dir) 387 def foo(): 388 self.assertTrue(os.path.exists(lock_file)) 389 390 foo() 391 392 def test_interprocess_lock(self): 393 lock_file = os.path.join(self.lock_dir, 'processlock') 394 395 pid = os.fork() 396 if pid: 397 # Make sure the child grabs the lock first 398 start = time.time() 399 while not os.path.exists(lock_file): 400 if time.time() - start > 5: 401 self.fail('Timed out waiting for child to grab lock') 402 time.sleep(0) 403 lock1 = lockutils.InterProcessLock('foo') 404 lock1.lockfile = open(lock_file, 'w') 405 # NOTE(bnemec): There is a brief window between when the lock file 406 # is created and when it actually becomes locked. If we happen to 407 # context switch in that window we may succeed in locking the 408 # file. Keep retrying until we either get the expected exception 409 # or timeout waiting. 410 while time.time() - start < 5: 411 try: 412 lock1.trylock() 413 lock1.unlock() 414 time.sleep(0) 415 except IOError: 416 # This is what we expect to happen 417 break 418 else: 419 self.fail('Never caught expected lock exception') 420 # We don't need to wait for the full sleep in the child here 421 os.kill(pid, signal.SIGKILL) 422 else: 423 try: 424 lock2 = lockutils.InterProcessLock('foo') 425 lock2.lockfile = open(lock_file, 'w') 426 have_lock = False 427 while not have_lock: 428 try: 429 lock2.trylock() 430 have_lock = True 431 except IOError: 432 pass 433 finally: 434 # NOTE(bnemec): This is racy, but I don't want to add any 435 # synchronization primitives that might mask a problem 436 # with the one we're trying to test here. 437 time.sleep(.5) 438 os._exit(0) 439 440 def test_interprocess_nonblocking_external_lock(self): 441 """Check that we're not actually blocking between processes.""" 442 443 nb_calls = multiprocessing.Value('i', 0) 444 445 @lockutils.synchronized('foo', blocking=False, external=True, 446 lock_path=self.lock_dir) 447 def foo(param): 448 """Simulate a long-running operation in a process.""" 449 param.value += 1 450 time.sleep(.5) 451 452 def other(param): 453 foo(param) 454 455 process = multiprocessing.Process(target=other, args=(nb_calls, )) 456 process.start() 457 # Make sure the other process grabs the lock 458 start = time.time() 459 while not os.path.exists(os.path.join(self.lock_dir, 'foo')): 460 if time.time() - start > 5: 461 self.fail('Timed out waiting for process to grab lock') 462 time.sleep(0) 463 process1 = multiprocessing.Process(target=other, args=(nb_calls, )) 464 process1.start() 465 process1.join() 466 process.join() 467 self.assertEqual(1, nb_calls.value) 468 469 def test_interthread_external_lock(self): 470 call_list = [] 471 472 @lockutils.synchronized('foo', external=True, lock_path=self.lock_dir) 473 def foo(param): 474 """Simulate a long-running threaded operation.""" 475 call_list.append(param) 476 # NOTE(bnemec): This is racy, but I don't want to add any 477 # synchronization primitives that might mask a problem 478 # with the one we're trying to test here. 479 time.sleep(.5) 480 call_list.append(param) 481 482 def other(param): 483 foo(param) 484 485 thread = threading.Thread(target=other, args=('other',)) 486 thread.start() 487 # Make sure the other thread grabs the lock 488 # NOTE(bnemec): File locks do not actually work between threads, so 489 # this test is verifying that the local semaphore is still enforcing 490 # external locks in that case. This means this test does not have 491 # the same race problem as the process test above because when the 492 # file is created the semaphore has already been grabbed. 493 start = time.time() 494 while not os.path.exists(os.path.join(self.lock_dir, 'foo')): 495 if time.time() - start > 5: 496 self.fail('Timed out waiting for thread to grab lock') 497 time.sleep(0) 498 thread1 = threading.Thread(target=other, args=('main',)) 499 thread1.start() 500 thread1.join() 501 thread.join() 502 self.assertEqual(['other', 'other', 'main', 'main'], call_list) 503 504 def test_interthread_nonblocking_external_lock(self): 505 call_list = [] 506 507 @lockutils.synchronized('foo', external=True, blocking=False, 508 lock_path=self.lock_dir) 509 def foo(param): 510 """Simulate a long-running threaded operation.""" 511 call_list.append(param) 512 time.sleep(.5) 513 call_list.append(param) 514 515 def other(param): 516 foo(param) 517 518 thread = threading.Thread(target=other, args=('other',)) 519 thread.start() 520 # Make sure the other thread grabs the lock 521 start = time.time() 522 while not os.path.exists(os.path.join(self.lock_dir, 'foo')): 523 if time.time() - start > 5: 524 self.fail('Timed out waiting for thread to grab lock') 525 time.sleep(0) 526 thread1 = threading.Thread(target=other, args=('main',)) 527 thread1.start() 528 thread1.join() 529 thread.join() 530 self.assertEqual(['other', 'other'], call_list) 531 532 def test_interthread_nonblocking_internal_lock(self): 533 call_list = [] 534 535 @lockutils.synchronized('foo', blocking=False, 536 lock_path=self.lock_dir) 537 def foo(param): 538 # Simulate a long-running threaded operation. 539 call_list.append(param) 540 time.sleep(.5) 541 call_list.append(param) 542 543 def other(param): 544 foo(param) 545 546 thread = threading.Thread(target=other, args=('other',)) 547 thread.start() 548 # Make sure the other thread grabs the lock 549 start = time.time() 550 while not call_list: 551 if time.time() - start > 5: 552 self.fail('Timed out waiting for thread to grab lock') 553 time.sleep(0) 554 thread1 = threading.Thread(target=other, args=('main',)) 555 thread1.start() 556 thread1.join() 557 thread.join() 558 self.assertEqual(['other', 'other'], call_list) 559 560 def test_non_destructive(self): 561 lock_file = os.path.join(self.lock_dir, 'not-destroyed') 562 with open(lock_file, 'w') as f: 563 f.write('test') 564 with lockutils.lock('not-destroyed', external=True, 565 lock_path=self.lock_dir): 566 with open(lock_file) as f: 567 self.assertEqual('test', f.read()) 568 569 570class LockutilsModuleTestCase(test_base.BaseTestCase): 571 572 def setUp(self): 573 super(LockutilsModuleTestCase, self).setUp() 574 self.old_env = os.environ.get('OSLO_LOCK_PATH') 575 if self.old_env is not None: 576 del os.environ['OSLO_LOCK_PATH'] 577 578 def tearDown(self): 579 if self.old_env is not None: 580 os.environ['OSLO_LOCK_PATH'] = self.old_env 581 super(LockutilsModuleTestCase, self).tearDown() 582 583 def test_main(self): 584 script = '\n'.join([ 585 'import os', 586 'lock_path = os.environ.get("OSLO_LOCK_PATH")', 587 'assert lock_path is not None', 588 'assert os.path.isdir(lock_path)', 589 ]) 590 argv = ['', sys.executable, '-c', script] 591 retval = lockutils._lock_wrapper(argv) 592 self.assertEqual(0, retval, "Bad OSLO_LOCK_PATH has been set") 593 594 def test_return_value_maintained(self): 595 script = '\n'.join([ 596 'import sys', 597 'sys.exit(1)', 598 ]) 599 argv = ['', sys.executable, '-c', script] 600 retval = lockutils._lock_wrapper(argv) 601 self.assertEqual(1, retval) 602 603 def test_direct_call_explodes(self): 604 cmd = [sys.executable, '-m', 'oslo_concurrency.lockutils'] 605 with open(os.devnull, 'w') as devnull: 606 retval = subprocess.call(cmd, stderr=devnull) 607 self.assertEqual(1, retval) 608 609 610class TestLockFixture(test_base.BaseTestCase): 611 612 def setUp(self): 613 super(TestLockFixture, self).setUp() 614 self.config = self.useFixture(config.Config(lockutils.CONF)).config 615 self.tempdir = tempfile.mkdtemp() 616 617 def _check_in_lock(self): 618 self.assertTrue(self.lock.exists()) 619 620 def tearDown(self): 621 self._check_in_lock() 622 super(TestLockFixture, self).tearDown() 623 624 def test_lock_fixture(self): 625 # Setup lock fixture to test that teardown is inside the lock 626 self.config(lock_path=self.tempdir, group='oslo_concurrency') 627 fixture = fixtures.LockFixture('test-lock') 628 self.useFixture(fixture) 629 self.lock = fixture.lock 630 631 632class TestGetLockPath(test_base.BaseTestCase): 633 634 def setUp(self): 635 super(TestGetLockPath, self).setUp() 636 self.conf = self.useFixture(config.Config(lockutils.CONF)).conf 637 638 def test_get_default(self): 639 lockutils.set_defaults(lock_path='/the/path') 640 self.assertEqual('/the/path', lockutils.get_lock_path(self.conf)) 641 642 def test_get_override(self): 643 lockutils._register_opts(self.conf) 644 self.conf.set_override('lock_path', '/alternate/path', 645 group='oslo_concurrency') 646 self.assertEqual('/alternate/path', lockutils.get_lock_path(self.conf)) 647