1# -*- coding: utf-8 -*- 2 3# Copyright 2011 OpenStack Foundation. 4# All Rights Reserved. 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); you may 7# not use this file except in compliance with the License. You may obtain 8# a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 15# License for the specific language governing permissions and limitations 16# under the License. 17from contextlib import contextmanager 18import errno 19import logging 20import os 21import threading 22import time 23 24import six 25 26from fasteners import _utils 27 28LOG = logging.getLogger(__name__) 29 30 31def _ensure_tree(path): 32 """Create a directory (and any ancestor directories required). 33 34 :param path: Directory to create 35 """ 36 try: 37 os.makedirs(path) 38 except OSError as e: 39 if e.errno == errno.EEXIST: 40 if not os.path.isdir(path): 41 raise 42 else: 43 return False 44 elif e.errno == errno.EISDIR: 45 return False 46 else: 47 raise 48 else: 49 return True 50 51 52class _InterProcessLock(object): 53 """An interprocess lock.""" 54 55 MAX_DELAY = 0.1 56 """ 57 Default maximum delay we will wait to try to acquire the lock (when 58 it's busy/being held by another process). 59 """ 60 61 DELAY_INCREMENT = 0.01 62 """ 63 Default increment we will use (up to max delay) after each attempt before 64 next attempt to acquire the lock. For example if 3 attempts have been made 65 the calling thread will sleep (0.01 * 3) before the next attempt to 66 acquire the lock (and repeat). 67 """ 68 69 def __init__(self, path, sleep_func=time.sleep, logger=None): 70 self.lockfile = None 71 self.path = _utils.canonicalize_path(path) 72 self.acquired = False 73 self.sleep_func = sleep_func 74 self.logger = _utils.pick_first_not_none(logger, LOG) 75 76 def _try_acquire(self, blocking, watch): 77 try: 78 self.trylock() 79 except IOError as e: 80 if e.errno in (errno.EACCES, errno.EAGAIN): 81 if not blocking or watch.expired(): 82 return False 83 else: 84 raise _utils.RetryAgain() 85 else: 86 raise threading.ThreadError("Unable to acquire lock on" 87 " `%(path)s` due to" 88 " %(exception)s" % 89 { 90 'path': self.path, 91 'exception': e, 92 }) 93 else: 94 return True 95 96 def _do_open(self): 97 basedir = os.path.dirname(self.path) 98 if basedir: 99 made_basedir = _ensure_tree(basedir) 100 if made_basedir: 101 self.logger.log(_utils.BLATHER, 102 'Created lock base path `%s`', basedir) 103 # Open in append mode so we don't overwrite any potential contents of 104 # the target file. This eliminates the possibility of an attacker 105 # creating a symlink to an important file in our lock path. 106 if self.lockfile is None or self.lockfile.closed: 107 self.lockfile = open(self.path, 'a') 108 109 def acquire(self, blocking=True, 110 delay=DELAY_INCREMENT, max_delay=MAX_DELAY, 111 timeout=None): 112 """Attempt to acquire the given lock. 113 114 :param blocking: whether to wait forever to try to acquire the lock 115 :type blocking: bool 116 :param delay: when blocking this is the delay time in seconds that 117 will be added after each failed acquisition 118 :type delay: int/float 119 :param max_delay: the maximum delay to have (this limits the 120 accumulated delay(s) added after each failed 121 acquisition) 122 :type max_delay: int/float 123 :param timeout: an optional timeout (limits how long blocking 124 will occur for) 125 :type timeout: int/float 126 :returns: whether or not the acquisition succeeded 127 :rtype: bool 128 """ 129 if delay < 0: 130 raise ValueError("Delay must be greater than or equal to zero") 131 if timeout is not None and timeout < 0: 132 raise ValueError("Timeout must be greater than or equal to zero") 133 if delay >= max_delay: 134 max_delay = delay 135 self._do_open() 136 watch = _utils.StopWatch(duration=timeout) 137 r = _utils.Retry(delay, max_delay, 138 sleep_func=self.sleep_func, watch=watch) 139 with watch: 140 gotten = r(self._try_acquire, blocking, watch) 141 if not gotten: 142 self.acquired = False 143 return False 144 else: 145 self.acquired = True 146 self.logger.log(_utils.BLATHER, 147 "Acquired file lock `%s` after waiting %0.3fs [%s" 148 " attempts were required]", self.path, 149 watch.elapsed(), r.attempts) 150 return True 151 152 def _do_close(self): 153 if self.lockfile is not None: 154 self.lockfile.close() 155 self.lockfile = None 156 157 def __enter__(self): 158 gotten = self.acquire() 159 if not gotten: 160 # This shouldn't happen, but just incase... 161 raise threading.ThreadError("Unable to acquire a file lock" 162 " on `%s` (when used as a" 163 " context manager)" % self.path) 164 return self 165 166 def release(self): 167 """Release the previously acquired lock.""" 168 if not self.acquired: 169 raise threading.ThreadError("Unable to release an unacquired" 170 " lock") 171 try: 172 self.unlock() 173 except IOError: 174 self.logger.exception("Could not unlock the acquired lock opened" 175 " on `%s`", self.path) 176 else: 177 self.acquired = False 178 try: 179 self._do_close() 180 except IOError: 181 self.logger.exception("Could not close the file handle" 182 " opened on `%s`", self.path) 183 else: 184 self.logger.log(_utils.BLATHER, 185 "Unlocked and closed file lock open on" 186 " `%s`", self.path) 187 188 def __exit__(self, exc_type, exc_val, exc_tb): 189 self.release() 190 191 def exists(self): 192 """Checks if the path that this lock exists at actually exists.""" 193 return os.path.exists(self.path) 194 195 def trylock(self): 196 self._trylock(self.lockfile) 197 198 def unlock(self): 199 self._unlock(self.lockfile) 200 201 @staticmethod 202 def _trylock(lockfile): 203 raise NotImplementedError() 204 205 @staticmethod 206 def _unlock(lockfile): 207 raise NotImplementedError() 208 209 210class _InterProcessReaderWriterLock(object): 211 """An interprocess readers writer lock.""" 212 213 MAX_DELAY = 0.1 214 """ 215 Default maximum delay we will wait to try to acquire the lock (when 216 it's busy/being held by another process). 217 """ 218 219 DELAY_INCREMENT = 0.01 220 """ 221 Default increment we will use (up to max delay) after each attempt before 222 next attempt to acquire the lock. For example if 3 attempts have been made 223 the calling thread will sleep (0.01 * 3) before the next attempt to 224 acquire the lock (and repeat). 225 """ 226 227 def __init__(self, path, sleep_func=time.sleep, logger=None): 228 self.lockfile = None 229 self.path = _utils.canonicalize_path(path) 230 self.sleep_func = sleep_func 231 self.logger = _utils.pick_first_not_none(logger, LOG) 232 233 def _try_acquire(self, blocking, watch, exclusive): 234 try: 235 gotten = self._trylock(self.lockfile, exclusive) 236 except Exception as e: 237 raise threading.ThreadError( 238 "Unable to acquire lock on {} due to {}!".format(self.path, e)) 239 240 if gotten: 241 return True 242 243 if not blocking or watch.expired(): 244 return False 245 246 raise _utils.RetryAgain() 247 248 def _do_open(self): 249 basedir = os.path.dirname(self.path) 250 if basedir: 251 made_basedir = _ensure_tree(basedir) 252 if made_basedir: 253 self.logger.log(_utils.BLATHER, 254 'Created lock base path `%s`', basedir) 255 if self.lockfile is None: 256 self.lockfile = self._get_handle(self.path) 257 258 def acquire_read_lock(self, blocking=True, 259 delay=DELAY_INCREMENT, max_delay=MAX_DELAY, 260 timeout=None): 261 262 """Attempt to acquire a reader's lock. 263 264 :param blocking: whether to wait forever to try to acquire the lock 265 :type blocking: bool 266 :param delay: when blocking this is the delay time in seconds that 267 will be added after each failed acquisition 268 :type delay: int/float 269 :param max_delay: the maximum delay to have (this limits the 270 accumulated delay(s) added after each failed 271 acquisition) 272 :type max_delay: int/float 273 :param timeout: an optional timeout (limits how long blocking 274 will occur for) 275 :type timeout: int/float 276 :returns: whether or not the acquisition succeeded 277 :rtype: bool 278 """ 279 return self._acquire(blocking, delay, max_delay, timeout, exclusive=False) 280 281 def acquire_write_lock(self, blocking=True, 282 delay=DELAY_INCREMENT, max_delay=MAX_DELAY, 283 timeout=None): 284 285 """Attempt to acquire a writer's lock. 286 287 :param blocking: whether to wait forever to try to acquire the lock 288 :type blocking: bool 289 :param delay: when blocking this is the delay time in seconds that 290 will be added after each failed acquisition 291 :type delay: int/float 292 :param max_delay: the maximum delay to have (this limits the 293 accumulated delay(s) added after each failed 294 acquisition) 295 :type max_delay: int/float 296 :param timeout: an optional timeout (limits how long blocking 297 will occur for) 298 :type timeout: int/float 299 :returns: whether or not the acquisition succeeded 300 :rtype: bool 301 """ 302 return self._acquire(blocking, delay, max_delay, timeout, exclusive=True) 303 304 def _acquire(self, blocking=True, 305 delay=DELAY_INCREMENT, max_delay=MAX_DELAY, 306 timeout=None, exclusive=True): 307 308 if delay < 0: 309 raise ValueError("Delay must be greater than or equal to zero") 310 if timeout is not None and timeout < 0: 311 raise ValueError("Timeout must be greater than or equal to zero") 312 if delay >= max_delay: 313 max_delay = delay 314 self._do_open() 315 watch = _utils.StopWatch(duration=timeout) 316 r = _utils.Retry(delay, max_delay, 317 sleep_func=self.sleep_func, watch=watch) 318 with watch: 319 gotten = r(self._try_acquire, blocking, watch, exclusive) 320 if not gotten: 321 return False 322 else: 323 self.logger.log(_utils.BLATHER, 324 "Acquired file lock `%s` after waiting %0.3fs [%s" 325 " attempts were required]", self.path, 326 watch.elapsed(), r.attempts) 327 return True 328 329 def _do_close(self): 330 if self.lockfile is not None: 331 self._close_handle(self.lockfile) 332 self.lockfile = None 333 334 def release_write_lock(self): 335 """Release the writer's lock.""" 336 try: 337 self._unlock(self.lockfile) 338 except IOError: 339 self.logger.exception("Could not unlock the acquired lock opened" 340 " on `%s`", self.path) 341 else: 342 try: 343 self._do_close() 344 except IOError: 345 self.logger.exception("Could not close the file handle" 346 " opened on `%s`", self.path) 347 else: 348 self.logger.log(_utils.BLATHER, 349 "Unlocked and closed file lock open on" 350 " `%s`", self.path) 351 352 def release_read_lock(self): 353 """Release the reader's lock.""" 354 try: 355 self._unlock(self.lockfile) 356 except IOError: 357 self.logger.exception("Could not unlock the acquired lock opened" 358 " on `%s`", self.path) 359 else: 360 try: 361 self._do_close() 362 except IOError: 363 self.logger.exception("Could not close the file handle" 364 " opened on `%s`", self.path) 365 else: 366 self.logger.log(_utils.BLATHER, 367 "Unlocked and closed file lock open on" 368 " `%s`", self.path) 369 370 @contextmanager 371 def write_lock(self, delay=DELAY_INCREMENT, max_delay=MAX_DELAY): 372 373 gotten = self.acquire_write_lock(blocking=True, delay=delay, 374 max_delay=max_delay, timeout=None) 375 376 if not gotten: 377 # This shouldn't happen, but just in case... 378 raise threading.ThreadError("Unable to acquire a file lock" 379 " on `%s` (when used as a" 380 " context manager)" % self.path) 381 try: 382 yield 383 finally: 384 self.release_write_lock() 385 386 @contextmanager 387 def read_lock(self, delay=DELAY_INCREMENT, max_delay=MAX_DELAY): 388 389 self.acquire_read_lock(blocking=True, delay=delay, 390 max_delay=max_delay, timeout=None) 391 try: 392 yield 393 finally: 394 self.release_read_lock() 395 396 @staticmethod 397 def _trylock(lockfile, exclusive): 398 raise NotImplementedError() 399 400 @staticmethod 401 def _unlock(lockfile): 402 raise NotImplementedError() 403 404 @staticmethod 405 def _get_handle(path): 406 raise NotImplementedError() 407 408 @staticmethod 409 def _close_handle(lockfile): 410 raise NotImplementedError() 411 412 413class _WindowsLock(_InterProcessLock): 414 """Interprocess lock implementation that works on windows systems.""" 415 416 @staticmethod 417 def _trylock(lockfile): 418 fileno = lockfile.fileno() 419 msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1) 420 421 @staticmethod 422 def _unlock(lockfile): 423 fileno = lockfile.fileno() 424 msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1) 425 426 427class _FcntlLock(_InterProcessLock): 428 """Interprocess lock implementation that works on posix systems.""" 429 430 @staticmethod 431 def _trylock(lockfile): 432 fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) 433 434 @staticmethod 435 def _unlock(lockfile): 436 fcntl.lockf(lockfile, fcntl.LOCK_UN) 437 438 439class _WindowsInterProcessReaderWriterLock(_InterProcessReaderWriterLock): 440 """Interprocess readers writer lock implementation that works on windows 441 systems.""" 442 443 @staticmethod 444 def _trylock(lockfile, exclusive): 445 446 if exclusive: 447 flags = win32con.LOCKFILE_FAIL_IMMEDIATELY | win32con.LOCKFILE_EXCLUSIVE_LOCK 448 else: 449 flags = win32con.LOCKFILE_FAIL_IMMEDIATELY 450 451 handle = msvcrt.get_osfhandle(lockfile.fileno()) 452 ok = win32file.LockFileEx(handle, flags, 0, 1, 0, win32file.pointer(pywintypes.OVERLAPPED())) 453 if ok: 454 return True 455 else: 456 last_error = win32file.GetLastError() 457 if last_error == win32file.ERROR_LOCK_VIOLATION: 458 return False 459 else: 460 raise OSError(last_error) 461 462 @staticmethod 463 def _unlock(lockfile): 464 handle = msvcrt.get_osfhandle(lockfile.fileno()) 465 ok = win32file.UnlockFileEx(handle, 0, 1, 0, win32file.pointer(pywintypes.OVERLAPPED())) 466 if not ok: 467 raise OSError(win32file.GetLastError()) 468 469 @staticmethod 470 def _get_handle(path): 471 return open(path, 'a+') 472 473 @staticmethod 474 def _close_handle(lockfile): 475 lockfile.close() 476 477 478class _FcntlInterProcessReaderWriterLock(_InterProcessReaderWriterLock): 479 """Interprocess readers writer lock implementation that works on posix 480 systems.""" 481 482 @staticmethod 483 def _trylock(lockfile, exclusive): 484 485 if exclusive: 486 flags = fcntl.LOCK_EX | fcntl.LOCK_NB 487 else: 488 flags = fcntl.LOCK_SH | fcntl.LOCK_NB 489 490 try: 491 fcntl.lockf(lockfile, flags) 492 return True 493 except (IOError, OSError) as e: 494 if e.errno in (errno.EACCES, errno.EAGAIN): 495 return False 496 else: 497 raise e 498 499 @staticmethod 500 def _unlock(lockfile): 501 fcntl.lockf(lockfile, fcntl.LOCK_UN) 502 503 @staticmethod 504 def _get_handle(path): 505 return open(path, 'a+') 506 507 @staticmethod 508 def _close_handle(lockfile): 509 lockfile.close() 510 511 512if os.name == 'nt': 513 import msvcrt 514 import fasteners.pywin32.pywintypes as pywintypes 515 import fasteners.pywin32.win32con as win32con 516 import fasteners.pywin32.win32file as win32file 517 518 InterProcessLock = _WindowsLock 519 InterProcessReaderWriterLock = _WindowsInterProcessReaderWriterLock 520 521else: 522 import fcntl 523 524 InterProcessLock = _FcntlLock 525 InterProcessReaderWriterLock = _FcntlInterProcessReaderWriterLock 526 527 528def interprocess_write_locked(path): 529 """Acquires & releases an interprocess read lock around the call into 530 the decorated function""" 531 532 lock = InterProcessReaderWriterLock(path) 533 534 def decorator(f): 535 @six.wraps(f) 536 def wrapper(*args, **kwargs): 537 with lock.write_lock(): 538 return f(*args, **kwargs) 539 540 return wrapper 541 542 return decorator 543 544 545def interprocess_read_locked(path): 546 """Acquires & releases an interprocess read lock around the call into 547 the decorated function""" 548 549 lock = InterProcessReaderWriterLock(path) 550 551 def decorator(f): 552 @six.wraps(f) 553 def wrapper(*args, **kwargs): 554 with lock.read_lock(): 555 return f(*args, **kwargs) 556 557 return wrapper 558 559 return decorator 560 561 562def interprocess_locked(path): 563 """Acquires & releases a interprocess lock around call into 564 decorated function.""" 565 566 lock = InterProcessLock(path) 567 568 def decorator(f): 569 @six.wraps(f) 570 def wrapper(*args, **kwargs): 571 with lock: 572 return f(*args, **kwargs) 573 574 return wrapper 575 576 return decorator 577