1# -*- coding: utf-8 -*- 2""" 3Atomic file operations 4""" 5 6import atexit 7import os 8import sys 9import threading 10from contextlib import contextmanager 11 12from plumbum.lib import six 13from plumbum.machines.local import local 14 15if not hasattr(threading, "get_ident"): 16 try: 17 import thread 18 except ImportError: 19 import _thread as thread 20 threading.get_ident = thread.get_ident 21 del thread 22 23try: 24 import fcntl 25except ImportError: 26 import msvcrt 27 28 try: 29 from pywintypes import error as WinError 30 from win32con import LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY 31 from win32file import OVERLAPPED, LockFileEx, UnlockFile 32 except ImportError: 33 raise ImportError( 34 "On Windows, we require Python for Windows Extensions (pywin32)" 35 ) 36 37 @contextmanager 38 def locked_file(fileno, blocking=True): 39 hndl = msvcrt.get_osfhandle(fileno) 40 try: 41 LockFileEx( 42 hndl, 43 LOCKFILE_EXCLUSIVE_LOCK 44 | (0 if blocking else LOCKFILE_FAIL_IMMEDIATELY), 45 0xFFFFFFFF, 46 0xFFFFFFFF, 47 OVERLAPPED(), 48 ) 49 except WinError: 50 _, ex, _ = sys.exc_info() 51 raise WindowsError(*ex.args) 52 try: 53 yield 54 finally: 55 UnlockFile(hndl, 0, 0, 0xFFFFFFFF, 0xFFFFFFFF) 56 57 58else: 59 if hasattr(fcntl, "lockf"): 60 61 @contextmanager 62 def locked_file(fileno, blocking=True): 63 fcntl.lockf(fileno, fcntl.LOCK_EX | (0 if blocking else fcntl.LOCK_NB)) 64 try: 65 yield 66 finally: 67 fcntl.lockf(fileno, fcntl.LOCK_UN) 68 69 else: 70 71 @contextmanager 72 def locked_file(fileno, blocking=True): 73 fcntl.flock(fileno, fcntl.LOCK_EX | (0 if blocking else fcntl.LOCK_NB)) 74 try: 75 yield 76 finally: 77 fcntl.flock(fileno, fcntl.LOCK_UN) 78 79 80class AtomicFile(object): 81 """ 82 Atomic file operations implemented using file-system advisory locks (``flock`` on POSIX, 83 ``LockFile`` on Windows). 84 85 .. note:: 86 On Linux, the manpage says ``flock`` might have issues with NFS mounts. You should 87 take this into account. 88 89 .. versionadded:: 1.3 90 """ 91 92 CHUNK_SIZE = 32 * 1024 93 94 def __init__(self, filename, ignore_deletion=False): 95 self.path = local.path(filename) 96 self._ignore_deletion = ignore_deletion 97 self._thdlock = threading.Lock() 98 self._owned_by = None 99 self._fileobj = None 100 self.reopen() 101 102 def __repr__(self): 103 return ( 104 "<AtomicFile: {}>".format(self.path) 105 if self._fileobj 106 else "<AtomicFile: closed>" 107 ) 108 109 def __del__(self): 110 self.close() 111 112 def __enter__(self): 113 return self 114 115 def __exit__(self, t, v, tb): 116 self.close() 117 118 def close(self): 119 if self._fileobj is not None: 120 self._fileobj.close() 121 self._fileobj = None 122 123 def reopen(self): 124 """ 125 Close and reopen the file; useful when the file was deleted from the file system 126 by a different process 127 """ 128 self.close() 129 self._fileobj = os.fdopen( 130 os.open(str(self.path), os.O_CREAT | os.O_RDWR, 384), "r+b", 0 131 ) 132 133 @contextmanager 134 def locked(self, blocking=True): 135 """ 136 A context manager that locks the file; this function is reentrant by the thread currently 137 holding the lock. 138 139 :param blocking: if ``True``, the call will block until we can grab the file system lock. 140 if ``False``, the call may fail immediately with the underlying exception 141 (``IOError`` or ``WindowsError``) 142 """ 143 if self._owned_by == threading.get_ident(): 144 yield 145 return 146 with self._thdlock: 147 with locked_file(self._fileobj.fileno(), blocking): 148 if not self.path.exists() and not self._ignore_deletion: 149 raise ValueError("Atomic file removed from filesystem") 150 self._owned_by = threading.get_ident() 151 try: 152 yield 153 finally: 154 self._owned_by = None 155 156 def delete(self): 157 """ 158 Atomically delete the file (holds the lock while doing it) 159 """ 160 with self.locked(): 161 self.path.delete() 162 163 def _read_all(self): 164 self._fileobj.seek(0) 165 data = [] 166 while True: 167 buf = self._fileobj.read(self.CHUNK_SIZE) 168 data.append(buf) 169 if len(buf) < self.CHUNK_SIZE: 170 break 171 return six.b("").join(data) 172 173 def read_atomic(self): 174 """Atomically read the entire file""" 175 with self.locked(): 176 return self._read_all() 177 178 def read_shared(self): 179 """Read the file **without** holding the lock""" 180 return self._read_all() 181 182 def write_atomic(self, data): 183 """Writes the given data atomically to the file. Note that it overwrites the entire file; 184 ``write_atomic("foo")`` followed by ``write_atomic("bar")`` will result in only ``"bar"``. 185 """ 186 with self.locked(): 187 self._fileobj.seek(0) 188 while data: 189 chunk = data[: self.CHUNK_SIZE] 190 self._fileobj.write(chunk) 191 data = data[len(chunk) :] 192 self._fileobj.flush() 193 self._fileobj.truncate() 194 195 196class AtomicCounterFile(object): 197 """ 198 An atomic counter based on AtomicFile. Each time you call ``next()``, it will 199 atomically read and increment the counter's value, returning its previous value 200 201 Example:: 202 203 acf = AtomicCounterFile.open("/some/file") 204 print acf.next() # e.g., 7 205 print acf.next() # 8 206 print acf.next() # 9 207 208 .. versionadded:: 1.3 209 """ 210 211 def __init__(self, atomicfile, initial=0): 212 """ 213 :param atomicfile: an :class:`AtomicFile <plumbum.atomic.AtomicFile>` instance 214 :param initial: the initial value (used when the first time the file is created) 215 """ 216 self.atomicfile = atomicfile 217 self.initial = initial 218 219 def __enter__(self): 220 return self 221 222 def __exit__(self, t, v, tb): 223 self.close() 224 225 def close(self): 226 self.atomicfile.close() 227 228 @classmethod 229 def open(cls, filename): 230 """ 231 Shortcut for ``AtomicCounterFile(AtomicFile(filename))`` 232 """ 233 return cls(AtomicFile(filename)) 234 235 def reset(self, value=None): 236 """ 237 Reset the counter's value to the one given. If ``None``, it will default to the 238 initial value provided to the constructor 239 """ 240 if value is None: 241 value = self.initial 242 if not isinstance(value, six.integer_types): 243 raise TypeError("value must be an integer, not {!r}".format(type(value))) 244 self.atomicfile.write_atomic(str(value).encode("utf8")) 245 246 def next(self): 247 """ 248 Read and increment the counter, returning its previous value 249 """ 250 with self.atomicfile.locked(): 251 curr = self.atomicfile.read_atomic().decode("utf8") 252 if not curr: 253 curr = self.initial 254 else: 255 curr = int(curr) 256 self.atomicfile.write_atomic(str(curr + 1).encode("utf8")) 257 return curr 258 259 260class PidFileTaken(SystemExit): 261 """ 262 This exception is raised when PidFile.acquire fails to lock the pid file. Note that it 263 derives from ``SystemExit``, so unless explicitly handled, it will terminate the process 264 cleanly 265 """ 266 267 def __init__(self, msg, pid): 268 SystemExit.__init__(self, msg) 269 self.pid = pid 270 271 272class PidFile(object): 273 """ 274 A PID file is a file that's locked by some process from the moment it starts until it dies 275 (the OS will clear the lock when the process exits). It is used to prevent two instances 276 of the same process (normally a daemon) from running concurrently. The PID file holds its 277 process' PID, so you know who's holding it. 278 279 .. versionadded:: 1.3 280 """ 281 282 def __init__(self, filename): 283 self.atomicfile = AtomicFile(filename) 284 self._ctx = None 285 286 def __enter__(self): 287 self.acquire() 288 289 def __exit__(self, t, v, tb): 290 self.release() 291 292 def __del__(self): 293 try: 294 self.release() 295 except Exception: 296 pass 297 298 def close(self): 299 self.atomicfile.close() 300 301 def acquire(self): 302 """ 303 Attempt to acquire the PID file. If it's already locked, raises 304 :class:`PidFileTaken <plumbum.atomic.PidFileTaken>`. You should normally acquire 305 the file as early as possible when the program starts 306 """ 307 if self._ctx is not None: 308 return 309 self._ctx = self.atomicfile.locked(blocking=False) 310 try: 311 self._ctx.__enter__() 312 except (IOError, OSError): 313 self._ctx = None 314 try: 315 pid = self.atomicfile.read_shared().strip().decode("utf8") 316 except (IOError, OSError): 317 pid = "Unknown" 318 raise PidFileTaken( 319 "PID file {!r} taken by process {}".format(self.atomicfile.path, pid), 320 pid, 321 ) 322 else: 323 self.atomicfile.write_atomic(str(os.getpid()).encode("utf8")) 324 atexit.register(self.release) 325 326 def release(self): 327 """ 328 Release the PID file (should only happen when the program terminates) 329 """ 330 if self._ctx is None: 331 return 332 self.atomicfile.delete() 333 try: 334 self._ctx.__exit__(None, None, None) 335 finally: 336 self._ctx = None 337