1# -*- coding: utf-8 -*-
2"""
3Atomic file operations
4"""
5
6import atexit
7import os
8import sys
9import threading
10from contextlib import contextmanager
11
12from plumbum.lib import six
13from plumbum.machines.local import local
14
15if not hasattr(threading, "get_ident"):
16    try:
17        import thread
18    except ImportError:
19        import _thread as thread
20    threading.get_ident = thread.get_ident
21    del thread
22
23try:
24    import fcntl
25except ImportError:
26    import msvcrt
27
28    try:
29        from pywintypes import error as WinError
30        from win32con import LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY
31        from win32file import OVERLAPPED, LockFileEx, UnlockFile
32    except ImportError:
33        raise ImportError(
34            "On Windows, we require Python for Windows Extensions (pywin32)"
35        )
36
37    @contextmanager
38    def locked_file(fileno, blocking=True):
39        hndl = msvcrt.get_osfhandle(fileno)
40        try:
41            LockFileEx(
42                hndl,
43                LOCKFILE_EXCLUSIVE_LOCK
44                | (0 if blocking else LOCKFILE_FAIL_IMMEDIATELY),
45                0xFFFFFFFF,
46                0xFFFFFFFF,
47                OVERLAPPED(),
48            )
49        except WinError:
50            _, ex, _ = sys.exc_info()
51            raise WindowsError(*ex.args)
52        try:
53            yield
54        finally:
55            UnlockFile(hndl, 0, 0, 0xFFFFFFFF, 0xFFFFFFFF)
56
57
58else:
59    if hasattr(fcntl, "lockf"):
60
61        @contextmanager
62        def locked_file(fileno, blocking=True):
63            fcntl.lockf(fileno, fcntl.LOCK_EX | (0 if blocking else fcntl.LOCK_NB))
64            try:
65                yield
66            finally:
67                fcntl.lockf(fileno, fcntl.LOCK_UN)
68
69    else:
70
71        @contextmanager
72        def locked_file(fileno, blocking=True):
73            fcntl.flock(fileno, fcntl.LOCK_EX | (0 if blocking else fcntl.LOCK_NB))
74            try:
75                yield
76            finally:
77                fcntl.flock(fileno, fcntl.LOCK_UN)
78
79
80class AtomicFile(object):
81    """
82    Atomic file operations implemented using file-system advisory locks (``flock`` on POSIX,
83    ``LockFile`` on Windows).
84
85    .. note::
86        On Linux, the manpage says ``flock`` might have issues with NFS mounts. You should
87        take this into account.
88
89    .. versionadded:: 1.3
90    """
91
92    CHUNK_SIZE = 32 * 1024
93
94    def __init__(self, filename, ignore_deletion=False):
95        self.path = local.path(filename)
96        self._ignore_deletion = ignore_deletion
97        self._thdlock = threading.Lock()
98        self._owned_by = None
99        self._fileobj = None
100        self.reopen()
101
102    def __repr__(self):
103        return (
104            "<AtomicFile: {}>".format(self.path)
105            if self._fileobj
106            else "<AtomicFile: closed>"
107        )
108
109    def __del__(self):
110        self.close()
111
112    def __enter__(self):
113        return self
114
115    def __exit__(self, t, v, tb):
116        self.close()
117
118    def close(self):
119        if self._fileobj is not None:
120            self._fileobj.close()
121            self._fileobj = None
122
123    def reopen(self):
124        """
125        Close and reopen the file; useful when the file was deleted from the file system
126        by a different process
127        """
128        self.close()
129        self._fileobj = os.fdopen(
130            os.open(str(self.path), os.O_CREAT | os.O_RDWR, 384), "r+b", 0
131        )
132
133    @contextmanager
134    def locked(self, blocking=True):
135        """
136        A context manager that locks the file; this function is reentrant by the thread currently
137        holding the lock.
138
139        :param blocking: if ``True``, the call will block until we can grab the file system lock.
140                         if ``False``, the call may fail immediately with the underlying exception
141                         (``IOError`` or ``WindowsError``)
142        """
143        if self._owned_by == threading.get_ident():
144            yield
145            return
146        with self._thdlock:
147            with locked_file(self._fileobj.fileno(), blocking):
148                if not self.path.exists() and not self._ignore_deletion:
149                    raise ValueError("Atomic file removed from filesystem")
150                self._owned_by = threading.get_ident()
151                try:
152                    yield
153                finally:
154                    self._owned_by = None
155
156    def delete(self):
157        """
158        Atomically delete the file (holds the lock while doing it)
159        """
160        with self.locked():
161            self.path.delete()
162
163    def _read_all(self):
164        self._fileobj.seek(0)
165        data = []
166        while True:
167            buf = self._fileobj.read(self.CHUNK_SIZE)
168            data.append(buf)
169            if len(buf) < self.CHUNK_SIZE:
170                break
171        return six.b("").join(data)
172
173    def read_atomic(self):
174        """Atomically read the entire file"""
175        with self.locked():
176            return self._read_all()
177
178    def read_shared(self):
179        """Read the file **without** holding the lock"""
180        return self._read_all()
181
182    def write_atomic(self, data):
183        """Writes the given data atomically to the file. Note that it overwrites the entire file;
184        ``write_atomic("foo")`` followed by ``write_atomic("bar")`` will result in only ``"bar"``.
185        """
186        with self.locked():
187            self._fileobj.seek(0)
188            while data:
189                chunk = data[: self.CHUNK_SIZE]
190                self._fileobj.write(chunk)
191                data = data[len(chunk) :]
192            self._fileobj.flush()
193            self._fileobj.truncate()
194
195
196class AtomicCounterFile(object):
197    """
198    An atomic counter based on AtomicFile. Each time you call ``next()``, it will
199    atomically read and increment the counter's value, returning its previous value
200
201    Example::
202
203        acf = AtomicCounterFile.open("/some/file")
204        print acf.next()   # e.g., 7
205        print acf.next()   # 8
206        print acf.next()   # 9
207
208    .. versionadded:: 1.3
209    """
210
211    def __init__(self, atomicfile, initial=0):
212        """
213        :param atomicfile: an :class:`AtomicFile <plumbum.atomic.AtomicFile>` instance
214        :param initial: the initial value (used when the first time the file is created)
215        """
216        self.atomicfile = atomicfile
217        self.initial = initial
218
219    def __enter__(self):
220        return self
221
222    def __exit__(self, t, v, tb):
223        self.close()
224
225    def close(self):
226        self.atomicfile.close()
227
228    @classmethod
229    def open(cls, filename):
230        """
231        Shortcut for ``AtomicCounterFile(AtomicFile(filename))``
232        """
233        return cls(AtomicFile(filename))
234
235    def reset(self, value=None):
236        """
237        Reset the counter's value to the one given. If ``None``, it will default to the
238        initial value provided to the constructor
239        """
240        if value is None:
241            value = self.initial
242        if not isinstance(value, six.integer_types):
243            raise TypeError("value must be an integer, not {!r}".format(type(value)))
244        self.atomicfile.write_atomic(str(value).encode("utf8"))
245
246    def next(self):
247        """
248        Read and increment the counter, returning its previous value
249        """
250        with self.atomicfile.locked():
251            curr = self.atomicfile.read_atomic().decode("utf8")
252            if not curr:
253                curr = self.initial
254            else:
255                curr = int(curr)
256            self.atomicfile.write_atomic(str(curr + 1).encode("utf8"))
257            return curr
258
259
260class PidFileTaken(SystemExit):
261    """
262    This exception is raised when PidFile.acquire fails to lock the pid file. Note that it
263    derives from ``SystemExit``, so unless explicitly handled, it will terminate the process
264    cleanly
265    """
266
267    def __init__(self, msg, pid):
268        SystemExit.__init__(self, msg)
269        self.pid = pid
270
271
272class PidFile(object):
273    """
274    A PID file is a file that's locked by some process from the moment it starts until it dies
275    (the OS will clear the lock when the process exits). It is used to prevent two instances
276    of the same process (normally a daemon) from running concurrently. The PID file holds its
277    process' PID, so you know who's holding it.
278
279    .. versionadded:: 1.3
280    """
281
282    def __init__(self, filename):
283        self.atomicfile = AtomicFile(filename)
284        self._ctx = None
285
286    def __enter__(self):
287        self.acquire()
288
289    def __exit__(self, t, v, tb):
290        self.release()
291
292    def __del__(self):
293        try:
294            self.release()
295        except Exception:
296            pass
297
298    def close(self):
299        self.atomicfile.close()
300
301    def acquire(self):
302        """
303        Attempt to acquire the PID file. If it's already locked, raises
304        :class:`PidFileTaken <plumbum.atomic.PidFileTaken>`. You should normally acquire
305        the file as early as possible when the program starts
306        """
307        if self._ctx is not None:
308            return
309        self._ctx = self.atomicfile.locked(blocking=False)
310        try:
311            self._ctx.__enter__()
312        except (IOError, OSError):
313            self._ctx = None
314            try:
315                pid = self.atomicfile.read_shared().strip().decode("utf8")
316            except (IOError, OSError):
317                pid = "Unknown"
318            raise PidFileTaken(
319                "PID file {!r} taken by process {}".format(self.atomicfile.path, pid),
320                pid,
321            )
322        else:
323            self.atomicfile.write_atomic(str(os.getpid()).encode("utf8"))
324            atexit.register(self.release)
325
326    def release(self):
327        """
328        Release the PID file (should only happen when the program terminates)
329        """
330        if self._ctx is None:
331            return
332        self.atomicfile.delete()
333        try:
334            self._ctx.__exit__(None, None, None)
335        finally:
336            self._ctx = None
337