1# Copyright 2011 OpenStack Foundation.
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16import contextlib
17import errno
18import functools
19import logging
20import os
21import shutil
22import subprocess
23import sys
24import tempfile
25import threading
26import weakref
27
28import fasteners
29from oslo_config import cfg
30from oslo_utils import reflection
31from oslo_utils import timeutils
32
33from oslo_concurrency._i18n import _
34
35
36LOG = logging.getLogger(__name__)
37
38
39_opts = [
40    cfg.BoolOpt('disable_process_locking', default=False,
41                help='Enables or disables inter-process locks.',
42                deprecated_group='DEFAULT'),
43    cfg.StrOpt('lock_path',
44               default=os.environ.get("OSLO_LOCK_PATH"),
45               help='Directory to use for lock files.  For security, the '
46                    'specified directory should only be writable by the user '
47                    'running the processes that need locking. '
48                    'Defaults to environment variable OSLO_LOCK_PATH. '
49                    'If external locks are used, a lock path must be set.',
50               deprecated_group='DEFAULT')
51]
52
53
54def _register_opts(conf):
55    conf.register_opts(_opts, group='oslo_concurrency')
56
57
58CONF = cfg.CONF
59_register_opts(CONF)
60
61
62def set_defaults(lock_path):
63    """Set value for lock_path.
64
65    This can be used by tests to set lock_path to a temporary directory.
66    """
67    cfg.set_defaults(_opts, lock_path=lock_path)
68
69
70def get_lock_path(conf):
71    """Return the path used for external file-based locks.
72
73    :param conf: Configuration object
74    :type conf: oslo_config.cfg.ConfigOpts
75
76    .. versionadded:: 1.8
77    """
78    _register_opts(conf)
79    return conf.oslo_concurrency.lock_path
80
81
82InterProcessLock = fasteners.InterProcessLock
83ReaderWriterLock = fasteners.ReaderWriterLock
84"""A reader/writer lock.
85
86.. versionadded:: 0.4
87"""
88
89
90class FairLocks(object):
91    """A garbage collected container of fair locks.
92
93    With a fair lock, contending lockers will get the lock in the order in
94    which they tried to acquire it.
95
96    This collection internally uses a weak value dictionary so that when a
97    lock is no longer in use (by any threads) it will automatically be
98    removed from this container by the garbage collector.
99    """
100
101    def __init__(self):
102        self._locks = weakref.WeakValueDictionary()
103        self._lock = threading.Lock()
104
105    def get(self, name):
106        """Gets (or creates) a lock with a given name.
107
108        :param name: The lock name to get/create (used to associate
109                     previously created names with the same lock).
110
111        Returns an newly constructed lock (or an existing one if it was
112        already created for the given name).
113        """
114        with self._lock:
115            try:
116                return self._locks[name]
117            except KeyError:
118                # The fasteners module specifies that
119                # ReaderWriterLock.write_lock() will give FIFO behaviour,
120                # so we don't need to do anything special ourselves.
121                rwlock = ReaderWriterLock()
122                self._locks[name] = rwlock
123                return rwlock
124
125
126_fair_locks = FairLocks()
127
128
129def internal_fair_lock(name):
130    return _fair_locks.get(name)
131
132
133class Semaphores(object):
134    """A garbage collected container of semaphores.
135
136    This collection internally uses a weak value dictionary so that when a
137    semaphore is no longer in use (by any threads) it will automatically be
138    removed from this container by the garbage collector.
139
140    .. versionadded:: 0.3
141    """
142
143    def __init__(self):
144        self._semaphores = weakref.WeakValueDictionary()
145        self._lock = threading.Lock()
146
147    def get(self, name):
148        """Gets (or creates) a semaphore with a given name.
149
150        :param name: The semaphore name to get/create (used to associate
151                     previously created names with the same semaphore).
152
153        Returns an newly constructed semaphore (or an existing one if it was
154        already created for the given name).
155        """
156        with self._lock:
157            try:
158                return self._semaphores[name]
159            except KeyError:
160                sem = threading.Semaphore()
161                self._semaphores[name] = sem
162                return sem
163
164    def __len__(self):
165        """Returns how many semaphores exist at the current time."""
166        return len(self._semaphores)
167
168
169_semaphores = Semaphores()
170
171
172def _get_lock_path(name, lock_file_prefix, lock_path=None):
173    # NOTE(mikal): the lock name cannot contain directory
174    # separators
175    name = name.replace(os.sep, '_')
176    if lock_file_prefix:
177        sep = '' if lock_file_prefix.endswith('-') else '-'
178        name = '%s%s%s' % (lock_file_prefix, sep, name)
179
180    local_lock_path = lock_path or CONF.oslo_concurrency.lock_path
181
182    if not local_lock_path:
183        raise cfg.RequiredOptError('lock_path')
184
185    return os.path.join(local_lock_path, name)
186
187
188def external_lock(name, lock_file_prefix=None, lock_path=None):
189    lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
190
191    return InterProcessLock(lock_file_path)
192
193
194def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None,
195                              semaphores=None):
196    """Remove an external lock file when it's not used anymore
197    This will be helpful when we have a lot of lock files
198    """
199    with internal_lock(name, semaphores=semaphores):
200        lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
201        try:
202            os.remove(lock_file_path)
203        except OSError as exc:
204            if exc.errno != errno.ENOENT:
205                LOG.warning('Failed to remove file %(file)s',
206                            {'file': lock_file_path})
207
208
209class AcquireLockFailedException(Exception):
210    def __init__(self, lock_name):
211        self.message = "Failed to acquire the lock %s" % lock_name
212
213    def __str__(self):
214        return self.message
215
216
217def internal_lock(name, semaphores=None, blocking=True):
218    @contextlib.contextmanager
219    def nonblocking(lock):
220        """Try to acquire the internal lock without blocking."""
221        if not lock.acquire(blocking=False):
222            raise AcquireLockFailedException(name)
223        try:
224            yield lock
225        finally:
226            lock.release()
227
228    if semaphores is None:
229        semaphores = _semaphores
230    lock = semaphores.get(name)
231
232    return nonblocking(lock) if not blocking else lock
233
234
235@contextlib.contextmanager
236def lock(name, lock_file_prefix=None, external=False, lock_path=None,
237         do_log=True, semaphores=None, delay=0.01, fair=False, blocking=True):
238    """Context based lock
239
240    This function yields a `threading.Semaphore` instance (if we don't use
241    eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
242    True, in which case, it'll yield an InterProcessLock instance.
243
244    :param lock_file_prefix: The lock_file_prefix argument is used to provide
245      lock files on disk with a meaningful prefix.
246
247    :param external: The external keyword argument denotes whether this lock
248      should work across multiple processes. This means that if two different
249      workers both run a method decorated with @synchronized('mylock',
250      external=True), only one of them will execute at a time.
251
252    :param lock_path: The path in which to store external lock files.  For
253      external locking to work properly, this must be the same for all
254      references to the lock.
255
256    :param do_log: Whether to log acquire/release messages.  This is primarily
257      intended to reduce log message duplication when `lock` is used from the
258      `synchronized` decorator.
259
260    :param semaphores: Container that provides semaphores to use when locking.
261        This ensures that threads inside the same application can not collide,
262        due to the fact that external process locks are unaware of a processes
263        active threads.
264
265    :param delay: Delay between acquisition attempts (in seconds).
266
267    :param fair: Whether or not we want a "fair" lock where contending lockers
268        will get the lock in the order in which they tried to acquire it.
269
270    :param blocking: Whether to wait forever to try to acquire the lock.
271        Incompatible with fair locks because those provided by the fasteners
272        module doesn't implements a non-blocking behavior.
273
274    .. versionchanged:: 0.2
275       Added *do_log* optional parameter.
276
277    .. versionchanged:: 0.3
278       Added *delay* and *semaphores* optional parameters.
279    """
280    if fair:
281        if semaphores is not None:
282            raise NotImplementedError(_('Specifying semaphores is not '
283                                        'supported when using fair locks.'))
284        if blocking is not True:
285            raise NotImplementedError(_('Disabling blocking is not supported '
286                                        'when using fair locks.'))
287        # The fasteners module specifies that write_lock() provides fairness.
288        int_lock = internal_fair_lock(name).write_lock()
289    else:
290        int_lock = internal_lock(name, semaphores=semaphores,
291                                 blocking=blocking)
292    with int_lock:
293        if do_log:
294            LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
295        try:
296            if external and not CONF.oslo_concurrency.disable_process_locking:
297                ext_lock = external_lock(name, lock_file_prefix, lock_path)
298                gotten = ext_lock.acquire(delay=delay, blocking=blocking)
299                if not gotten:
300                    raise AcquireLockFailedException(name)
301                if do_log:
302                    LOG.debug('Acquired external semaphore "%(lock)s"',
303                              {'lock': name})
304                try:
305                    yield ext_lock
306                finally:
307                    ext_lock.release()
308            else:
309                yield int_lock
310        finally:
311            if do_log:
312                LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
313
314
315def lock_with_prefix(lock_file_prefix):
316    """Partial object generator for the lock context manager.
317
318    Redefine lock in each project like so::
319
320        (in nova/utils.py)
321        from oslo_concurrency import lockutils
322
323        _prefix = 'nova'
324        lock = lockutils.lock_with_prefix(_prefix)
325        lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix)
326
327
328        (in nova/foo.py)
329        from nova import utils
330
331        with utils.lock('mylock'):
332           ...
333
334    Eventually clean up with::
335
336        lock_cleanup('mylock')
337
338    :param lock_file_prefix: A string used to provide lock files on disk with a
339        meaningful prefix. Will be separated from the lock name with a hyphen,
340        which may optionally be included in the lock_file_prefix (e.g.
341        ``'nova'`` and ``'nova-'`` are equivalent).
342    """
343    return functools.partial(lock, lock_file_prefix=lock_file_prefix)
344
345
346def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
347                 semaphores=None, delay=0.01, fair=False, blocking=True):
348    """Synchronization decorator.
349
350    Decorating a method like so::
351
352        @synchronized('mylock')
353        def foo(self, *args):
354           ...
355
356    ensures that only one thread will execute the foo method at a time.
357
358    Different methods can share the same lock::
359
360        @synchronized('mylock')
361        def foo(self, *args):
362           ...
363
364        @synchronized('mylock')
365        def bar(self, *args):
366           ...
367
368    This way only one of either foo or bar can be executing at a time.
369
370    .. versionchanged:: 0.3
371       Added *delay* and *semaphores* optional parameter.
372    """
373
374    def wrap(f):
375
376        @functools.wraps(f)
377        def inner(*args, **kwargs):
378            t1 = timeutils.now()
379            t2 = None
380            gotten = True
381            try:
382                with lock(name, lock_file_prefix, external, lock_path,
383                          do_log=False, semaphores=semaphores, delay=delay,
384                          fair=fair, blocking=blocking):
385                    t2 = timeutils.now()
386                    LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
387                              'waited %(wait_secs)0.3fs',
388                              {'name': name,
389                               'function': reflection.get_callable_name(f),
390                               'wait_secs': (t2 - t1)})
391                    return f(*args, **kwargs)
392            except AcquireLockFailedException:
393                gotten = False
394            finally:
395                t3 = timeutils.now()
396                if t2 is None:
397                    held_secs = "N/A"
398                else:
399                    held_secs = "%0.3fs" % (t3 - t2)
400                LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::'
401                          ' held %(held_secs)s',
402                          {'name': name,
403                           'gotten': 'released' if gotten else 'unacquired',
404                           'function': reflection.get_callable_name(f),
405                           'held_secs': held_secs})
406        return inner
407
408    return wrap
409
410
411def synchronized_with_prefix(lock_file_prefix):
412    """Partial object generator for the synchronization decorator.
413
414    Redefine @synchronized in each project like so::
415
416        (in nova/utils.py)
417        from oslo_concurrency import lockutils
418
419        _prefix = 'nova'
420        synchronized = lockutils.synchronized_with_prefix(_prefix)
421        lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix)
422
423
424        (in nova/foo.py)
425        from nova import utils
426
427        @utils.synchronized('mylock')
428        def bar(self, *args):
429           ...
430
431    Eventually clean up with::
432
433        lock_cleanup('mylock')
434
435    :param lock_file_prefix: A string used to provide lock files on disk with a
436        meaningful prefix. Will be separated from the lock name with a hyphen,
437        which may optionally be included in the lock_file_prefix (e.g.
438        ``'nova'`` and ``'nova-'`` are equivalent).
439    """
440
441    return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
442
443
444def remove_external_lock_file_with_prefix(lock_file_prefix):
445    """Partial object generator for the remove lock file function.
446
447    Redefine remove_external_lock_file_with_prefix in each project like so::
448
449        (in nova/utils.py)
450        from oslo_concurrency import lockutils
451
452        _prefix = 'nova'
453        synchronized = lockutils.synchronized_with_prefix(_prefix)
454        lock = lockutils.lock_with_prefix(_prefix)
455        lock_cleanup = lockutils.remove_external_lock_file_with_prefix(_prefix)
456
457        (in nova/foo.py)
458        from nova import utils
459
460        @utils.synchronized('mylock')
461        def bar(self, *args):
462            ...
463
464        def baz(self, *args):
465            ...
466            with utils.lock('mylock'):
467                ...
468            ...
469
470        <eventually call lock_cleanup('mylock') to clean up>
471
472    The lock_file_prefix argument is used to provide lock files on disk with a
473    meaningful prefix.
474    """
475    return functools.partial(remove_external_lock_file,
476                             lock_file_prefix=lock_file_prefix)
477
478
479def _lock_wrapper(argv):
480    """Create a dir for locks and pass it to command from arguments
481
482    This is exposed as a console script entry point named
483    lockutils-wrapper
484
485    If you run this:
486        lockutils-wrapper stestr run <etc>
487
488    a temporary directory will be created for all your locks and passed to all
489    your tests in an environment variable. The temporary dir will be deleted
490    afterwards and the return value will be preserved.
491    """
492
493    lock_dir = tempfile.mkdtemp()
494    os.environ["OSLO_LOCK_PATH"] = lock_dir
495    try:
496        ret_val = subprocess.call(argv[1:])
497    finally:
498        shutil.rmtree(lock_dir, ignore_errors=True)
499    return ret_val
500
501
502def main():
503    sys.exit(_lock_wrapper(sys.argv))
504
505
506if __name__ == '__main__':
507    raise NotImplementedError(_('Calling lockutils directly is no longer '
508                                'supported.  Please use the '
509                                'lockutils-wrapper console script instead.'))
510