1# -*- coding: utf-8 -*-
2
3# Copyright 2011 OpenStack Foundation.
4# All Rights Reserved.
5#
6#    Licensed under the Apache License, Version 2.0 (the "License"); you may
7#    not use this file except in compliance with the License. You may obtain
8#    a copy of the License at
9#
10#         http://www.apache.org/licenses/LICENSE-2.0
11#
12#    Unless required by applicable law or agreed to in writing, software
13#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15#    License for the specific language governing permissions and limitations
16#    under the License.
17from contextlib import contextmanager
18import errno
19import logging
20import os
21import threading
22import time
23
24import six
25
26from fasteners import _utils
27
28LOG = logging.getLogger(__name__)
29
30
31def _ensure_tree(path):
32    """Create a directory (and any ancestor directories required).
33
34    :param path: Directory to create
35    """
36    try:
37        os.makedirs(path)
38    except OSError as e:
39        if e.errno == errno.EEXIST:
40            if not os.path.isdir(path):
41                raise
42            else:
43                return False
44        elif e.errno == errno.EISDIR:
45            return False
46        else:
47            raise
48    else:
49        return True
50
51
52class _InterProcessLock(object):
53    """An interprocess lock."""
54
55    MAX_DELAY = 0.1
56    """
57    Default maximum delay we will wait to try to acquire the lock (when
58    it's busy/being held by another process).
59    """
60
61    DELAY_INCREMENT = 0.01
62    """
63    Default increment we will use (up to max delay) after each attempt before
64    next attempt to acquire the lock. For example if 3 attempts have been made
65    the calling thread will sleep (0.01 * 3) before the next attempt to
66    acquire the lock (and repeat).
67    """
68
69    def __init__(self, path, sleep_func=time.sleep, logger=None):
70        self.lockfile = None
71        self.path = _utils.canonicalize_path(path)
72        self.acquired = False
73        self.sleep_func = sleep_func
74        self.logger = _utils.pick_first_not_none(logger, LOG)
75
76    def _try_acquire(self, blocking, watch):
77        try:
78            self.trylock()
79        except IOError as e:
80            if e.errno in (errno.EACCES, errno.EAGAIN):
81                if not blocking or watch.expired():
82                    return False
83                else:
84                    raise _utils.RetryAgain()
85            else:
86                raise threading.ThreadError("Unable to acquire lock on"
87                                            " `%(path)s` due to"
88                                            " %(exception)s" %
89                                            {
90                                                'path': self.path,
91                                                'exception': e,
92                                            })
93        else:
94            return True
95
96    def _do_open(self):
97        basedir = os.path.dirname(self.path)
98        if basedir:
99            made_basedir = _ensure_tree(basedir)
100            if made_basedir:
101                self.logger.log(_utils.BLATHER,
102                                'Created lock base path `%s`', basedir)
103        # Open in append mode so we don't overwrite any potential contents of
104        # the target file. This eliminates the possibility of an attacker
105        # creating a symlink to an important file in our lock path.
106        if self.lockfile is None or self.lockfile.closed:
107            self.lockfile = open(self.path, 'a')
108
109    def acquire(self, blocking=True,
110                delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
111                timeout=None):
112        """Attempt to acquire the given lock.
113
114        :param blocking: whether to wait forever to try to acquire the lock
115        :type blocking: bool
116        :param delay: when blocking this is the delay time in seconds that
117                      will be added after each failed acquisition
118        :type delay: int/float
119        :param max_delay: the maximum delay to have (this limits the
120                          accumulated delay(s) added after each failed
121                          acquisition)
122        :type max_delay: int/float
123        :param timeout: an optional timeout (limits how long blocking
124                        will occur for)
125        :type timeout: int/float
126        :returns: whether or not the acquisition succeeded
127        :rtype: bool
128        """
129        if delay < 0:
130            raise ValueError("Delay must be greater than or equal to zero")
131        if timeout is not None and timeout < 0:
132            raise ValueError("Timeout must be greater than or equal to zero")
133        if delay >= max_delay:
134            max_delay = delay
135        self._do_open()
136        watch = _utils.StopWatch(duration=timeout)
137        r = _utils.Retry(delay, max_delay,
138                         sleep_func=self.sleep_func, watch=watch)
139        with watch:
140            gotten = r(self._try_acquire, blocking, watch)
141        if not gotten:
142            self.acquired = False
143            return False
144        else:
145            self.acquired = True
146            self.logger.log(_utils.BLATHER,
147                            "Acquired file lock `%s` after waiting %0.3fs [%s"
148                            " attempts were required]", self.path,
149                            watch.elapsed(), r.attempts)
150            return True
151
152    def _do_close(self):
153        if self.lockfile is not None:
154            self.lockfile.close()
155            self.lockfile = None
156
157    def __enter__(self):
158        gotten = self.acquire()
159        if not gotten:
160            # This shouldn't happen, but just incase...
161            raise threading.ThreadError("Unable to acquire a file lock"
162                                        " on `%s` (when used as a"
163                                        " context manager)" % self.path)
164        return self
165
166    def release(self):
167        """Release the previously acquired lock."""
168        if not self.acquired:
169            raise threading.ThreadError("Unable to release an unacquired"
170                                        " lock")
171        try:
172            self.unlock()
173        except IOError:
174            self.logger.exception("Could not unlock the acquired lock opened"
175                                  " on `%s`", self.path)
176        else:
177            self.acquired = False
178            try:
179                self._do_close()
180            except IOError:
181                self.logger.exception("Could not close the file handle"
182                                      " opened on `%s`", self.path)
183            else:
184                self.logger.log(_utils.BLATHER,
185                                "Unlocked and closed file lock open on"
186                                " `%s`", self.path)
187
188    def __exit__(self, exc_type, exc_val, exc_tb):
189        self.release()
190
191    def exists(self):
192        """Checks if the path that this lock exists at actually exists."""
193        return os.path.exists(self.path)
194
195    def trylock(self):
196        self._trylock(self.lockfile)
197
198    def unlock(self):
199        self._unlock(self.lockfile)
200
201    @staticmethod
202    def _trylock(lockfile):
203        raise NotImplementedError()
204
205    @staticmethod
206    def _unlock(lockfile):
207        raise NotImplementedError()
208
209
210class _InterProcessReaderWriterLock(object):
211    """An interprocess readers writer lock."""
212
213    MAX_DELAY = 0.1
214    """
215    Default maximum delay we will wait to try to acquire the lock (when
216    it's busy/being held by another process).
217    """
218
219    DELAY_INCREMENT = 0.01
220    """
221    Default increment we will use (up to max delay) after each attempt before
222    next attempt to acquire the lock. For example if 3 attempts have been made
223    the calling thread will sleep (0.01 * 3) before the next attempt to
224    acquire the lock (and repeat).
225    """
226
227    def __init__(self, path, sleep_func=time.sleep, logger=None):
228        self.lockfile = None
229        self.path = _utils.canonicalize_path(path)
230        self.sleep_func = sleep_func
231        self.logger = _utils.pick_first_not_none(logger, LOG)
232
233    def _try_acquire(self, blocking, watch, exclusive):
234        try:
235            gotten = self._trylock(self.lockfile, exclusive)
236        except Exception as e:
237            raise threading.ThreadError(
238                "Unable to acquire lock on {} due to {}!".format(self.path, e))
239
240        if gotten:
241            return True
242
243        if not blocking or watch.expired():
244            return False
245
246        raise _utils.RetryAgain()
247
248    def _do_open(self):
249        basedir = os.path.dirname(self.path)
250        if basedir:
251            made_basedir = _ensure_tree(basedir)
252            if made_basedir:
253                self.logger.log(_utils.BLATHER,
254                                'Created lock base path `%s`', basedir)
255        if self.lockfile is None:
256            self.lockfile = self._get_handle(self.path)
257
258    def acquire_read_lock(self, blocking=True,
259                          delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
260                          timeout=None):
261
262        """Attempt to acquire a reader's lock.
263
264        :param blocking: whether to wait forever to try to acquire the lock
265        :type blocking: bool
266        :param delay: when blocking this is the delay time in seconds that
267                      will be added after each failed acquisition
268        :type delay: int/float
269        :param max_delay: the maximum delay to have (this limits the
270                          accumulated delay(s) added after each failed
271                          acquisition)
272        :type max_delay: int/float
273        :param timeout: an optional timeout (limits how long blocking
274                        will occur for)
275        :type timeout: int/float
276        :returns: whether or not the acquisition succeeded
277        :rtype: bool
278        """
279        return self._acquire(blocking, delay, max_delay, timeout, exclusive=False)
280
281    def acquire_write_lock(self, blocking=True,
282                           delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
283                           timeout=None):
284
285        """Attempt to acquire a writer's lock.
286
287        :param blocking: whether to wait forever to try to acquire the lock
288        :type blocking: bool
289        :param delay: when blocking this is the delay time in seconds that
290                      will be added after each failed acquisition
291        :type delay: int/float
292        :param max_delay: the maximum delay to have (this limits the
293                          accumulated delay(s) added after each failed
294                          acquisition)
295        :type max_delay: int/float
296        :param timeout: an optional timeout (limits how long blocking
297                        will occur for)
298        :type timeout: int/float
299        :returns: whether or not the acquisition succeeded
300        :rtype: bool
301        """
302        return self._acquire(blocking, delay, max_delay, timeout, exclusive=True)
303
304    def _acquire(self, blocking=True,
305                 delay=DELAY_INCREMENT, max_delay=MAX_DELAY,
306                 timeout=None, exclusive=True):
307
308        if delay < 0:
309            raise ValueError("Delay must be greater than or equal to zero")
310        if timeout is not None and timeout < 0:
311            raise ValueError("Timeout must be greater than or equal to zero")
312        if delay >= max_delay:
313            max_delay = delay
314        self._do_open()
315        watch = _utils.StopWatch(duration=timeout)
316        r = _utils.Retry(delay, max_delay,
317                         sleep_func=self.sleep_func, watch=watch)
318        with watch:
319            gotten = r(self._try_acquire, blocking, watch, exclusive)
320        if not gotten:
321            return False
322        else:
323            self.logger.log(_utils.BLATHER,
324                            "Acquired file lock `%s` after waiting %0.3fs [%s"
325                            " attempts were required]", self.path,
326                            watch.elapsed(), r.attempts)
327            return True
328
329    def _do_close(self):
330        if self.lockfile is not None:
331            self._close_handle(self.lockfile)
332            self.lockfile = None
333
334    def release_write_lock(self):
335        """Release the writer's lock."""
336        try:
337            self._unlock(self.lockfile)
338        except IOError:
339            self.logger.exception("Could not unlock the acquired lock opened"
340                                  " on `%s`", self.path)
341        else:
342            try:
343                self._do_close()
344            except IOError:
345                self.logger.exception("Could not close the file handle"
346                                      " opened on `%s`", self.path)
347            else:
348                self.logger.log(_utils.BLATHER,
349                                "Unlocked and closed file lock open on"
350                                " `%s`", self.path)
351
352    def release_read_lock(self):
353        """Release the reader's lock."""
354        try:
355            self._unlock(self.lockfile)
356        except IOError:
357            self.logger.exception("Could not unlock the acquired lock opened"
358                                  " on `%s`", self.path)
359        else:
360            try:
361                self._do_close()
362            except IOError:
363                self.logger.exception("Could not close the file handle"
364                                      " opened on `%s`", self.path)
365            else:
366                self.logger.log(_utils.BLATHER,
367                                "Unlocked and closed file lock open on"
368                                " `%s`", self.path)
369
370    @contextmanager
371    def write_lock(self, delay=DELAY_INCREMENT, max_delay=MAX_DELAY):
372
373        gotten = self.acquire_write_lock(blocking=True, delay=delay,
374                                         max_delay=max_delay, timeout=None)
375
376        if not gotten:
377            # This shouldn't happen, but just in case...
378            raise threading.ThreadError("Unable to acquire a file lock"
379                                        " on `%s` (when used as a"
380                                        " context manager)" % self.path)
381        try:
382            yield
383        finally:
384            self.release_write_lock()
385
386    @contextmanager
387    def read_lock(self, delay=DELAY_INCREMENT, max_delay=MAX_DELAY):
388
389        self.acquire_read_lock(blocking=True, delay=delay,
390                               max_delay=max_delay, timeout=None)
391        try:
392            yield
393        finally:
394            self.release_read_lock()
395
396    @staticmethod
397    def _trylock(lockfile, exclusive):
398        raise NotImplementedError()
399
400    @staticmethod
401    def _unlock(lockfile):
402        raise NotImplementedError()
403
404    @staticmethod
405    def _get_handle(path):
406        raise NotImplementedError()
407
408    @staticmethod
409    def _close_handle(lockfile):
410        raise NotImplementedError()
411
412
413class _WindowsLock(_InterProcessLock):
414    """Interprocess lock implementation that works on windows systems."""
415
416    @staticmethod
417    def _trylock(lockfile):
418        fileno = lockfile.fileno()
419        msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)
420
421    @staticmethod
422    def _unlock(lockfile):
423        fileno = lockfile.fileno()
424        msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)
425
426
427class _FcntlLock(_InterProcessLock):
428    """Interprocess lock implementation that works on posix systems."""
429
430    @staticmethod
431    def _trylock(lockfile):
432        fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
433
434    @staticmethod
435    def _unlock(lockfile):
436        fcntl.lockf(lockfile, fcntl.LOCK_UN)
437
438
439class _WindowsInterProcessReaderWriterLock(_InterProcessReaderWriterLock):
440    """Interprocess readers writer lock implementation that works on windows
441    systems."""
442
443    @staticmethod
444    def _trylock(lockfile, exclusive):
445
446        if exclusive:
447            flags = win32con.LOCKFILE_FAIL_IMMEDIATELY | win32con.LOCKFILE_EXCLUSIVE_LOCK
448        else:
449            flags = win32con.LOCKFILE_FAIL_IMMEDIATELY
450
451        handle = msvcrt.get_osfhandle(lockfile.fileno())
452        ok = win32file.LockFileEx(handle, flags, 0, 1, 0, win32file.pointer(pywintypes.OVERLAPPED()))
453        if ok:
454            return True
455        else:
456            last_error = win32file.GetLastError()
457            if last_error == win32file.ERROR_LOCK_VIOLATION:
458                return False
459            else:
460                raise OSError(last_error)
461
462    @staticmethod
463    def _unlock(lockfile):
464        handle = msvcrt.get_osfhandle(lockfile.fileno())
465        ok = win32file.UnlockFileEx(handle, 0, 1, 0, win32file.pointer(pywintypes.OVERLAPPED()))
466        if not ok:
467            raise OSError(win32file.GetLastError())
468
469    @staticmethod
470    def _get_handle(path):
471        return open(path, 'a+')
472
473    @staticmethod
474    def _close_handle(lockfile):
475        lockfile.close()
476
477
478class _FcntlInterProcessReaderWriterLock(_InterProcessReaderWriterLock):
479    """Interprocess readers writer lock implementation that works on posix
480    systems."""
481
482    @staticmethod
483    def _trylock(lockfile, exclusive):
484
485        if exclusive:
486            flags = fcntl.LOCK_EX | fcntl.LOCK_NB
487        else:
488            flags = fcntl.LOCK_SH | fcntl.LOCK_NB
489
490        try:
491            fcntl.lockf(lockfile, flags)
492            return True
493        except (IOError, OSError) as e:
494            if e.errno in (errno.EACCES, errno.EAGAIN):
495                return False
496            else:
497                raise e
498
499    @staticmethod
500    def _unlock(lockfile):
501        fcntl.lockf(lockfile, fcntl.LOCK_UN)
502
503    @staticmethod
504    def _get_handle(path):
505        return open(path, 'a+')
506
507    @staticmethod
508    def _close_handle(lockfile):
509        lockfile.close()
510
511
512if os.name == 'nt':
513    import msvcrt
514    import fasteners.pywin32.pywintypes as pywintypes
515    import fasteners.pywin32.win32con as win32con
516    import fasteners.pywin32.win32file as win32file
517
518    InterProcessLock = _WindowsLock
519    InterProcessReaderWriterLock = _WindowsInterProcessReaderWriterLock
520
521else:
522    import fcntl
523
524    InterProcessLock = _FcntlLock
525    InterProcessReaderWriterLock = _FcntlInterProcessReaderWriterLock
526
527
528def interprocess_write_locked(path):
529    """Acquires & releases an interprocess read lock around the call into
530    the decorated function"""
531
532    lock = InterProcessReaderWriterLock(path)
533
534    def decorator(f):
535        @six.wraps(f)
536        def wrapper(*args, **kwargs):
537            with lock.write_lock():
538                return f(*args, **kwargs)
539
540        return wrapper
541
542    return decorator
543
544
545def interprocess_read_locked(path):
546    """Acquires & releases an interprocess read lock around the call into
547    the decorated function"""
548
549    lock = InterProcessReaderWriterLock(path)
550
551    def decorator(f):
552        @six.wraps(f)
553        def wrapper(*args, **kwargs):
554            with lock.read_lock():
555                return f(*args, **kwargs)
556
557        return wrapper
558
559    return decorator
560
561
562def interprocess_locked(path):
563    """Acquires & releases a interprocess lock around call into
564       decorated function."""
565
566    lock = InterProcessLock(path)
567
568    def decorator(f):
569        @six.wraps(f)
570        def wrapper(*args, **kwargs):
571            with lock:
572                return f(*args, **kwargs)
573
574        return wrapper
575
576    return decorator
577