1# Copyright 2011 OpenStack Foundation. 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16import contextlib 17import errno 18import functools 19import logging 20import os 21import shutil 22import subprocess 23import sys 24import tempfile 25import threading 26import weakref 27 28import fasteners 29from oslo_config import cfg 30from oslo_utils import reflection 31from oslo_utils import timeutils 32 33from oslo_concurrency._i18n import _ 34 35 36LOG = logging.getLogger(__name__) 37 38 39_opts = [ 40 cfg.BoolOpt('disable_process_locking', default=False, 41 help='Enables or disables inter-process locks.', 42 deprecated_group='DEFAULT'), 43 cfg.StrOpt('lock_path', 44 default=os.environ.get("OSLO_LOCK_PATH"), 45 help='Directory to use for lock files. For security, the ' 46 'specified directory should only be writable by the user ' 47 'running the processes that need locking. ' 48 'Defaults to environment variable OSLO_LOCK_PATH. ' 49 'If external locks are used, a lock path must be set.', 50 deprecated_group='DEFAULT') 51] 52 53 54def _register_opts(conf): 55 conf.register_opts(_opts, group='oslo_concurrency') 56 57 58CONF = cfg.CONF 59_register_opts(CONF) 60 61 62def set_defaults(lock_path): 63 """Set value for lock_path. 64 65 This can be used by tests to set lock_path to a temporary directory. 66 """ 67 cfg.set_defaults(_opts, lock_path=lock_path) 68 69 70def get_lock_path(conf): 71 """Return the path used for external file-based locks. 72 73 :param conf: Configuration object 74 :type conf: oslo_config.cfg.ConfigOpts 75 76 .. versionadded:: 1.8 77 """ 78 _register_opts(conf) 79 return conf.oslo_concurrency.lock_path 80 81 82InterProcessLock = fasteners.InterProcessLock 83ReaderWriterLock = fasteners.ReaderWriterLock 84"""A reader/writer lock. 85 86.. versionadded:: 0.4 87""" 88 89 90class FairLocks(object): 91 """A garbage collected container of fair locks. 92 93 With a fair lock, contending lockers will get the lock in the order in 94 which they tried to acquire it. 95 96 This collection internally uses a weak value dictionary so that when a 97 lock is no longer in use (by any threads) it will automatically be 98 removed from this container by the garbage collector. 99 """ 100 101 def __init__(self): 102 self._locks = weakref.WeakValueDictionary() 103 self._lock = threading.Lock() 104 105 def get(self, name): 106 """Gets (or creates) a lock with a given name. 107 108 :param name: The lock name to get/create (used to associate 109 previously created names with the same lock). 110 111 Returns an newly constructed lock (or an existing one if it was 112 already created for the given name). 113 """ 114 with self._lock: 115 try: 116 return self._locks[name] 117 except KeyError: 118 # The fasteners module specifies that 119 # ReaderWriterLock.write_lock() will give FIFO behaviour, 120 # so we don't need to do anything special ourselves. 121 rwlock = ReaderWriterLock() 122 self._locks[name] = rwlock 123 return rwlock 124 125 126_fair_locks = FairLocks() 127 128 129def internal_fair_lock(name): 130 return _fair_locks.get(name) 131 132 133class Semaphores(object): 134 """A garbage collected container of semaphores. 135 136 This collection internally uses a weak value dictionary so that when a 137 semaphore is no longer in use (by any threads) it will automatically be 138 removed from this container by the garbage collector. 139 140 .. versionadded:: 0.3 141 """ 142 143 def __init__(self): 144 self._semaphores = weakref.WeakValueDictionary() 145 self._lock = threading.Lock() 146 147 def get(self, name): 148 """Gets (or creates) a semaphore with a given name. 149 150 :param name: The semaphore name to get/create (used to associate 151 previously created names with the same semaphore). 152 153 Returns an newly constructed semaphore (or an existing one if it was 154 already created for the given name). 155 """ 156 with self._lock: 157 try: 158 return self._semaphores[name] 159 except KeyError: 160 sem = threading.Semaphore() 161 self._semaphores[name] = sem 162 return sem 163 164 def __len__(self): 165 """Returns how many semaphores exist at the current time.""" 166 return len(self._semaphores) 167 168 169_semaphores = Semaphores() 170 171 172def _get_lock_path(name, lock_file_prefix, lock_path=None): 173 # NOTE(mikal): the lock name cannot contain directory 174 # separators 175 name = name.replace(os.sep, '_') 176 if lock_file_prefix: 177 sep = '' if lock_file_prefix.endswith('-') else '-' 178 name = '%s%s%s' % (lock_file_prefix, sep, name) 179 180 local_lock_path = lock_path or CONF.oslo_concurrency.lock_path 181 182 if not local_lock_path: 183 raise cfg.RequiredOptError('lock_path') 184 185 return os.path.join(local_lock_path, name) 186 187 188def external_lock(name, lock_file_prefix=None, lock_path=None): 189 lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) 190 191 return InterProcessLock(lock_file_path) 192 193 194def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None, 195 semaphores=None): 196 """Remove an external lock file when it's not used anymore 197 This will be helpful when we have a lot of lock files 198 """ 199 with internal_lock(name, semaphores=semaphores): 200 lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) 201 try: 202 os.remove(lock_file_path) 203 except OSError as exc: 204 if exc.errno != errno.ENOENT: 205 LOG.warning('Failed to remove file %(file)s', 206 {'file': lock_file_path}) 207 208 209class AcquireLockFailedException(Exception): 210 def __init__(self, lock_name): 211 self.message = "Failed to acquire the lock %s" % lock_name 212 213 def __str__(self): 214 return self.message 215 216 217def internal_lock(name, semaphores=None, blocking=True): 218 @contextlib.contextmanager 219 def nonblocking(lock): 220 """Try to acquire the internal lock without blocking.""" 221 if not lock.acquire(blocking=False): 222 raise AcquireLockFailedException(name) 223 try: 224 yield lock 225 finally: 226 lock.release() 227 228 if semaphores is None: 229 semaphores = _semaphores 230 lock = semaphores.get(name) 231 232 return nonblocking(lock) if not blocking else lock 233 234 235@contextlib.contextmanager 236def lock(name, lock_file_prefix=None, external=False, lock_path=None, 237 do_log=True, semaphores=None, delay=0.01, fair=False, blocking=True): 238 """Context based lock 239 240 This function yields a `threading.Semaphore` instance (if we don't use 241 eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is 242 True, in which case, it'll yield an InterProcessLock instance. 243 244 :param lock_file_prefix: The lock_file_prefix argument is used to provide 245 lock files on disk with a meaningful prefix. 246 247 :param external: The external keyword argument denotes whether this lock 248 should work across multiple processes. This means that if two different 249 workers both run a method decorated with @synchronized('mylock', 250 external=True), only one of them will execute at a time. 251 252 :param lock_path: The path in which to store external lock files. For 253 external locking to work properly, this must be the same for all 254 references to the lock. 255 256 :param do_log: Whether to log acquire/release messages. This is primarily 257 intended to reduce log message duplication when `lock` is used from the 258 `synchronized` decorator. 259 260 :param semaphores: Container that provides semaphores to use when locking. 261 This ensures that threads inside the same application can not collide, 262 due to the fact that external process locks are unaware of a processes 263 active threads. 264 265 :param delay: Delay between acquisition attempts (in seconds). 266 267 :param fair: Whether or not we want a "fair" lock where contending lockers 268 will get the lock in the order in which they tried to acquire it. 269 270 :param blocking: Whether to wait forever to try to acquire the lock. 271 Incompatible with fair locks because those provided by the fasteners 272 module doesn't implements a non-blocking behavior. 273 274 .. versionchanged:: 0.2 275 Added *do_log* optional parameter. 276 277 .. versionchanged:: 0.3 278 Added *delay* and *semaphores* optional parameters. 279 """ 280 if fair: 281 if semaphores is not None: 282 raise NotImplementedError(_('Specifying semaphores is not ' 283 'supported when using fair locks.')) 284 if blocking is not True: 285 raise NotImplementedError(_('Disabling blocking is not supported ' 286 'when using fair locks.')) 287 # The fasteners module specifies that write_lock() provides fairness. 288 int_lock = internal_fair_lock(name).write_lock() 289 else: 290 int_lock = internal_lock(name, semaphores=semaphores, 291 blocking=blocking) 292 with int_lock: 293 if do_log: 294 LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) 295 try: 296 if external and not CONF.oslo_concurrency.disable_process_locking: 297 ext_lock = external_lock(name, lock_file_prefix, lock_path) 298 gotten = ext_lock.acquire(delay=delay, blocking=blocking) 299 if not gotten: 300 raise AcquireLockFailedException(name) 301 if do_log: 302 LOG.debug('Acquired external semaphore "%(lock)s"', 303 {'lock': name}) 304 try: 305 yield ext_lock 306 finally: 307 ext_lock.release() 308 else: 309 yield int_lock 310 finally: 311 if do_log: 312 LOG.debug('Releasing lock "%(lock)s"', {'lock': name}) 313 314 315def lock_with_prefix(lock_file_prefix): 316 """Partial object generator for the lock context manager. 317 318 Redefine lock in each project like so:: 319 320 (in nova/utils.py) 321 from oslo_concurrency import lockutils 322 323 _prefix = 'nova' 324 lock = lockutils.lock_with_prefix(_prefix) 325 lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix) 326 327 328 (in nova/foo.py) 329 from nova import utils 330 331 with utils.lock('mylock'): 332 ... 333 334 Eventually clean up with:: 335 336 lock_cleanup('mylock') 337 338 :param lock_file_prefix: A string used to provide lock files on disk with a 339 meaningful prefix. Will be separated from the lock name with a hyphen, 340 which may optionally be included in the lock_file_prefix (e.g. 341 ``'nova'`` and ``'nova-'`` are equivalent). 342 """ 343 return functools.partial(lock, lock_file_prefix=lock_file_prefix) 344 345 346def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, 347 semaphores=None, delay=0.01, fair=False, blocking=True): 348 """Synchronization decorator. 349 350 Decorating a method like so:: 351 352 @synchronized('mylock') 353 def foo(self, *args): 354 ... 355 356 ensures that only one thread will execute the foo method at a time. 357 358 Different methods can share the same lock:: 359 360 @synchronized('mylock') 361 def foo(self, *args): 362 ... 363 364 @synchronized('mylock') 365 def bar(self, *args): 366 ... 367 368 This way only one of either foo or bar can be executing at a time. 369 370 .. versionchanged:: 0.3 371 Added *delay* and *semaphores* optional parameter. 372 """ 373 374 def wrap(f): 375 376 @functools.wraps(f) 377 def inner(*args, **kwargs): 378 t1 = timeutils.now() 379 t2 = None 380 gotten = True 381 try: 382 with lock(name, lock_file_prefix, external, lock_path, 383 do_log=False, semaphores=semaphores, delay=delay, 384 fair=fair, blocking=blocking): 385 t2 = timeutils.now() 386 LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 387 'waited %(wait_secs)0.3fs', 388 {'name': name, 389 'function': reflection.get_callable_name(f), 390 'wait_secs': (t2 - t1)}) 391 return f(*args, **kwargs) 392 except AcquireLockFailedException: 393 gotten = False 394 finally: 395 t3 = timeutils.now() 396 if t2 is None: 397 held_secs = "N/A" 398 else: 399 held_secs = "%0.3fs" % (t3 - t2) 400 LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::' 401 ' held %(held_secs)s', 402 {'name': name, 403 'gotten': 'released' if gotten else 'unacquired', 404 'function': reflection.get_callable_name(f), 405 'held_secs': held_secs}) 406 return inner 407 408 return wrap 409 410 411def synchronized_with_prefix(lock_file_prefix): 412 """Partial object generator for the synchronization decorator. 413 414 Redefine @synchronized in each project like so:: 415 416 (in nova/utils.py) 417 from oslo_concurrency import lockutils 418 419 _prefix = 'nova' 420 synchronized = lockutils.synchronized_with_prefix(_prefix) 421 lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix) 422 423 424 (in nova/foo.py) 425 from nova import utils 426 427 @utils.synchronized('mylock') 428 def bar(self, *args): 429 ... 430 431 Eventually clean up with:: 432 433 lock_cleanup('mylock') 434 435 :param lock_file_prefix: A string used to provide lock files on disk with a 436 meaningful prefix. Will be separated from the lock name with a hyphen, 437 which may optionally be included in the lock_file_prefix (e.g. 438 ``'nova'`` and ``'nova-'`` are equivalent). 439 """ 440 441 return functools.partial(synchronized, lock_file_prefix=lock_file_prefix) 442 443 444def remove_external_lock_file_with_prefix(lock_file_prefix): 445 """Partial object generator for the remove lock file function. 446 447 Redefine remove_external_lock_file_with_prefix in each project like so:: 448 449 (in nova/utils.py) 450 from oslo_concurrency import lockutils 451 452 _prefix = 'nova' 453 synchronized = lockutils.synchronized_with_prefix(_prefix) 454 lock = lockutils.lock_with_prefix(_prefix) 455 lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix) 456 457 (in nova/foo.py) 458 from nova import utils 459 460 @utils.synchronized('mylock') 461 def bar(self, *args): 462 ... 463 464 def baz(self, *args): 465 ... 466 with utils.lock('mylock'): 467 ... 468 ... 469 470 <eventually call lock_cleanup('mylock') to clean up> 471 472 The lock_file_prefix argument is used to provide lock files on disk with a 473 meaningful prefix. 474 """ 475 return functools.partial(remove_external_lock_file, 476 lock_file_prefix=lock_file_prefix) 477 478 479def _lock_wrapper(argv): 480 """Create a dir for locks and pass it to command from arguments 481 482 This is exposed as a console script entry point named 483 lockutils-wrapper 484 485 If you run this: 486 lockutils-wrapper stestr run <etc> 487 488 a temporary directory will be created for all your locks and passed to all 489 your tests in an environment variable. The temporary dir will be deleted 490 afterwards and the return value will be preserved. 491 """ 492 493 lock_dir = tempfile.mkdtemp() 494 os.environ["OSLO_LOCK_PATH"] = lock_dir 495 try: 496 ret_val = subprocess.call(argv[1:]) 497 finally: 498 shutil.rmtree(lock_dir, ignore_errors=True) 499 return ret_val 500 501 502def main(): 503 sys.exit(_lock_wrapper(sys.argv)) 504 505 506if __name__ == '__main__': 507 raise NotImplementedError(_('Calling lockutils directly is no longer ' 508 'supported. Please use the ' 509 'lockutils-wrapper console script instead.')) 510