1""" 2local path implementation. 3""" 4from __future__ import with_statement 5 6from contextlib import contextmanager 7import sys, os, atexit, io, uuid 8import py 9from py._path import common 10from py._path.common import iswin32, fspath 11from stat import S_ISLNK, S_ISDIR, S_ISREG 12 13from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname 14 15if sys.version_info > (3,0): 16 def map_as_list(func, iter): 17 return list(map(func, iter)) 18else: 19 map_as_list = map 20 21ALLOW_IMPORTLIB_MODE = sys.version_info > (3,5) 22if ALLOW_IMPORTLIB_MODE: 23 import importlib 24 25 26class Stat(object): 27 def __getattr__(self, name): 28 return getattr(self._osstatresult, "st_" + name) 29 30 def __init__(self, path, osstatresult): 31 self.path = path 32 self._osstatresult = osstatresult 33 34 @property 35 def owner(self): 36 if iswin32: 37 raise NotImplementedError("XXX win32") 38 import pwd 39 entry = py.error.checked_call(pwd.getpwuid, self.uid) 40 return entry[0] 41 42 @property 43 def group(self): 44 """ return group name of file. """ 45 if iswin32: 46 raise NotImplementedError("XXX win32") 47 import grp 48 entry = py.error.checked_call(grp.getgrgid, self.gid) 49 return entry[0] 50 51 def isdir(self): 52 return S_ISDIR(self._osstatresult.st_mode) 53 54 def isfile(self): 55 return S_ISREG(self._osstatresult.st_mode) 56 57 def islink(self): 58 st = self.path.lstat() 59 return S_ISLNK(self._osstatresult.st_mode) 60 61class PosixPath(common.PathBase): 62 def chown(self, user, group, rec=0): 63 """ change ownership to the given user and group. 64 user and group may be specified by a number or 65 by a name. if rec is True change ownership 66 recursively. 67 """ 68 uid = getuserid(user) 69 gid = getgroupid(group) 70 if rec: 71 for x in self.visit(rec=lambda x: x.check(link=0)): 72 if x.check(link=0): 73 py.error.checked_call(os.chown, str(x), uid, gid) 74 py.error.checked_call(os.chown, str(self), uid, gid) 75 76 def readlink(self): 77 """ return value of a symbolic link. """ 78 return py.error.checked_call(os.readlink, self.strpath) 79 80 def mklinkto(self, oldname): 81 """ posix style hard link to another name. """ 82 py.error.checked_call(os.link, str(oldname), str(self)) 83 84 def mksymlinkto(self, value, absolute=1): 85 """ create a symbolic link with the given value (pointing to another name). """ 86 if absolute: 87 py.error.checked_call(os.symlink, str(value), self.strpath) 88 else: 89 base = self.common(value) 90 # with posix local paths '/' is always a common base 91 relsource = self.__class__(value).relto(base) 92 reldest = self.relto(base) 93 n = reldest.count(self.sep) 94 target = self.sep.join(('..', )*n + (relsource, )) 95 py.error.checked_call(os.symlink, target, self.strpath) 96 97def getuserid(user): 98 import pwd 99 if not isinstance(user, int): 100 user = pwd.getpwnam(user)[2] 101 return user 102 103def getgroupid(group): 104 import grp 105 if not isinstance(group, int): 106 group = grp.getgrnam(group)[2] 107 return group 108 109FSBase = not iswin32 and PosixPath or common.PathBase 110 111class LocalPath(FSBase): 112 """ object oriented interface to os.path and other local filesystem 113 related information. 114 """ 115 class ImportMismatchError(ImportError): 116 """ raised on pyimport() if there is a mismatch of __file__'s""" 117 118 sep = os.sep 119 class Checkers(common.Checkers): 120 def _stat(self): 121 try: 122 return self._statcache 123 except AttributeError: 124 try: 125 self._statcache = self.path.stat() 126 except py.error.ELOOP: 127 self._statcache = self.path.lstat() 128 return self._statcache 129 130 def dir(self): 131 return S_ISDIR(self._stat().mode) 132 133 def file(self): 134 return S_ISREG(self._stat().mode) 135 136 def exists(self): 137 return self._stat() 138 139 def link(self): 140 st = self.path.lstat() 141 return S_ISLNK(st.mode) 142 143 def __init__(self, path=None, expanduser=False): 144 """ Initialize and return a local Path instance. 145 146 Path can be relative to the current directory. 147 If path is None it defaults to the current working directory. 148 If expanduser is True, tilde-expansion is performed. 149 Note that Path instances always carry an absolute path. 150 Note also that passing in a local path object will simply return 151 the exact same path object. Use new() to get a new copy. 152 """ 153 if path is None: 154 self.strpath = py.error.checked_call(os.getcwd) 155 else: 156 try: 157 path = fspath(path) 158 except TypeError: 159 raise ValueError("can only pass None, Path instances " 160 "or non-empty strings to LocalPath") 161 if expanduser: 162 path = os.path.expanduser(path) 163 self.strpath = abspath(path) 164 165 def __hash__(self): 166 s = self.strpath 167 if iswin32: 168 s = s.lower() 169 return hash(s) 170 171 def __eq__(self, other): 172 s1 = fspath(self) 173 try: 174 s2 = fspath(other) 175 except TypeError: 176 return False 177 if iswin32: 178 s1 = s1.lower() 179 try: 180 s2 = s2.lower() 181 except AttributeError: 182 return False 183 return s1 == s2 184 185 def __ne__(self, other): 186 return not (self == other) 187 188 def __lt__(self, other): 189 return fspath(self) < fspath(other) 190 191 def __gt__(self, other): 192 return fspath(self) > fspath(other) 193 194 def samefile(self, other): 195 """ return True if 'other' references the same file as 'self'. 196 """ 197 other = fspath(other) 198 if not isabs(other): 199 other = abspath(other) 200 if self == other: 201 return True 202 if not hasattr(os.path, "samefile"): 203 return False 204 return py.error.checked_call( 205 os.path.samefile, self.strpath, other) 206 207 def remove(self, rec=1, ignore_errors=False): 208 """ remove a file or directory (or a directory tree if rec=1). 209 if ignore_errors is True, errors while removing directories will 210 be ignored. 211 """ 212 if self.check(dir=1, link=0): 213 if rec: 214 # force remove of readonly files on windows 215 if iswin32: 216 self.chmod(0o700, rec=1) 217 import shutil 218 py.error.checked_call( 219 shutil.rmtree, self.strpath, 220 ignore_errors=ignore_errors) 221 else: 222 py.error.checked_call(os.rmdir, self.strpath) 223 else: 224 if iswin32: 225 self.chmod(0o700) 226 py.error.checked_call(os.remove, self.strpath) 227 228 def computehash(self, hashtype="md5", chunksize=524288): 229 """ return hexdigest of hashvalue for this file. """ 230 try: 231 try: 232 import hashlib as mod 233 except ImportError: 234 if hashtype == "sha1": 235 hashtype = "sha" 236 mod = __import__(hashtype) 237 hash = getattr(mod, hashtype)() 238 except (AttributeError, ImportError): 239 raise ValueError("Don't know how to compute %r hash" %(hashtype,)) 240 f = self.open('rb') 241 try: 242 while 1: 243 buf = f.read(chunksize) 244 if not buf: 245 return hash.hexdigest() 246 hash.update(buf) 247 finally: 248 f.close() 249 250 def new(self, **kw): 251 """ create a modified version of this path. 252 the following keyword arguments modify various path parts:: 253 254 a:/some/path/to/a/file.ext 255 xx drive 256 xxxxxxxxxxxxxxxxx dirname 257 xxxxxxxx basename 258 xxxx purebasename 259 xxx ext 260 """ 261 obj = object.__new__(self.__class__) 262 if not kw: 263 obj.strpath = self.strpath 264 return obj 265 drive, dirname, basename, purebasename,ext = self._getbyspec( 266 "drive,dirname,basename,purebasename,ext") 267 if 'basename' in kw: 268 if 'purebasename' in kw or 'ext' in kw: 269 raise ValueError("invalid specification %r" % kw) 270 else: 271 pb = kw.setdefault('purebasename', purebasename) 272 try: 273 ext = kw['ext'] 274 except KeyError: 275 pass 276 else: 277 if ext and not ext.startswith('.'): 278 ext = '.' + ext 279 kw['basename'] = pb + ext 280 281 if ('dirname' in kw and not kw['dirname']): 282 kw['dirname'] = drive 283 else: 284 kw.setdefault('dirname', dirname) 285 kw.setdefault('sep', self.sep) 286 obj.strpath = normpath( 287 "%(dirname)s%(sep)s%(basename)s" % kw) 288 return obj 289 290 def _getbyspec(self, spec): 291 """ see new for what 'spec' can be. """ 292 res = [] 293 parts = self.strpath.split(self.sep) 294 295 args = filter(None, spec.split(',') ) 296 append = res.append 297 for name in args: 298 if name == 'drive': 299 append(parts[0]) 300 elif name == 'dirname': 301 append(self.sep.join(parts[:-1])) 302 else: 303 basename = parts[-1] 304 if name == 'basename': 305 append(basename) 306 else: 307 i = basename.rfind('.') 308 if i == -1: 309 purebasename, ext = basename, '' 310 else: 311 purebasename, ext = basename[:i], basename[i:] 312 if name == 'purebasename': 313 append(purebasename) 314 elif name == 'ext': 315 append(ext) 316 else: 317 raise ValueError("invalid part specification %r" % name) 318 return res 319 320 def dirpath(self, *args, **kwargs): 321 """ return the directory path joined with any given path arguments. """ 322 if not kwargs: 323 path = object.__new__(self.__class__) 324 path.strpath = dirname(self.strpath) 325 if args: 326 path = path.join(*args) 327 return path 328 return super(LocalPath, self).dirpath(*args, **kwargs) 329 330 def join(self, *args, **kwargs): 331 """ return a new path by appending all 'args' as path 332 components. if abs=1 is used restart from root if any 333 of the args is an absolute path. 334 """ 335 sep = self.sep 336 strargs = [fspath(arg) for arg in args] 337 strpath = self.strpath 338 if kwargs.get('abs'): 339 newargs = [] 340 for arg in reversed(strargs): 341 if isabs(arg): 342 strpath = arg 343 strargs = newargs 344 break 345 newargs.insert(0, arg) 346 # special case for when we have e.g. strpath == "/" 347 actual_sep = "" if strpath.endswith(sep) else sep 348 for arg in strargs: 349 arg = arg.strip(sep) 350 if iswin32: 351 # allow unix style paths even on windows. 352 arg = arg.strip('/') 353 arg = arg.replace('/', sep) 354 strpath = strpath + actual_sep + arg 355 actual_sep = sep 356 obj = object.__new__(self.__class__) 357 obj.strpath = normpath(strpath) 358 return obj 359 360 def open(self, mode='r', ensure=False, encoding=None): 361 """ return an opened file with the given mode. 362 363 If ensure is True, create parent directories if needed. 364 """ 365 if ensure: 366 self.dirpath().ensure(dir=1) 367 if encoding: 368 return py.error.checked_call(io.open, self.strpath, mode, encoding=encoding) 369 return py.error.checked_call(open, self.strpath, mode) 370 371 def _fastjoin(self, name): 372 child = object.__new__(self.__class__) 373 child.strpath = self.strpath + self.sep + name 374 return child 375 376 def islink(self): 377 return islink(self.strpath) 378 379 def check(self, **kw): 380 if not kw: 381 return exists(self.strpath) 382 if len(kw) == 1: 383 if "dir" in kw: 384 return not kw["dir"] ^ isdir(self.strpath) 385 if "file" in kw: 386 return not kw["file"] ^ isfile(self.strpath) 387 return super(LocalPath, self).check(**kw) 388 389 _patternchars = set("*?[" + os.path.sep) 390 def listdir(self, fil=None, sort=None): 391 """ list directory contents, possibly filter by the given fil func 392 and possibly sorted. 393 """ 394 if fil is None and sort is None: 395 names = py.error.checked_call(os.listdir, self.strpath) 396 return map_as_list(self._fastjoin, names) 397 if isinstance(fil, py.builtin._basestring): 398 if not self._patternchars.intersection(fil): 399 child = self._fastjoin(fil) 400 if exists(child.strpath): 401 return [child] 402 return [] 403 fil = common.FNMatcher(fil) 404 names = py.error.checked_call(os.listdir, self.strpath) 405 res = [] 406 for name in names: 407 child = self._fastjoin(name) 408 if fil is None or fil(child): 409 res.append(child) 410 self._sortlist(res, sort) 411 return res 412 413 def size(self): 414 """ return size of the underlying file object """ 415 return self.stat().size 416 417 def mtime(self): 418 """ return last modification time of the path. """ 419 return self.stat().mtime 420 421 def copy(self, target, mode=False, stat=False): 422 """ copy path to target. 423 424 If mode is True, will copy copy permission from path to target. 425 If stat is True, copy permission, last modification 426 time, last access time, and flags from path to target. 427 """ 428 if self.check(file=1): 429 if target.check(dir=1): 430 target = target.join(self.basename) 431 assert self!=target 432 copychunked(self, target) 433 if mode: 434 copymode(self.strpath, target.strpath) 435 if stat: 436 copystat(self, target) 437 else: 438 def rec(p): 439 return p.check(link=0) 440 for x in self.visit(rec=rec): 441 relpath = x.relto(self) 442 newx = target.join(relpath) 443 newx.dirpath().ensure(dir=1) 444 if x.check(link=1): 445 newx.mksymlinkto(x.readlink()) 446 continue 447 elif x.check(file=1): 448 copychunked(x, newx) 449 elif x.check(dir=1): 450 newx.ensure(dir=1) 451 if mode: 452 copymode(x.strpath, newx.strpath) 453 if stat: 454 copystat(x, newx) 455 456 def rename(self, target): 457 """ rename this path to target. """ 458 target = fspath(target) 459 return py.error.checked_call(os.rename, self.strpath, target) 460 461 def dump(self, obj, bin=1): 462 """ pickle object into path location""" 463 f = self.open('wb') 464 import pickle 465 try: 466 py.error.checked_call(pickle.dump, obj, f, bin) 467 finally: 468 f.close() 469 470 def mkdir(self, *args): 471 """ create & return the directory joined with args. """ 472 p = self.join(*args) 473 py.error.checked_call(os.mkdir, fspath(p)) 474 return p 475 476 def write_binary(self, data, ensure=False): 477 """ write binary data into path. If ensure is True create 478 missing parent directories. 479 """ 480 if ensure: 481 self.dirpath().ensure(dir=1) 482 with self.open('wb') as f: 483 f.write(data) 484 485 def write_text(self, data, encoding, ensure=False): 486 """ write text data into path using the specified encoding. 487 If ensure is True create missing parent directories. 488 """ 489 if ensure: 490 self.dirpath().ensure(dir=1) 491 with self.open('w', encoding=encoding) as f: 492 f.write(data) 493 494 def write(self, data, mode='w', ensure=False): 495 """ write data into path. If ensure is True create 496 missing parent directories. 497 """ 498 if ensure: 499 self.dirpath().ensure(dir=1) 500 if 'b' in mode: 501 if not py.builtin._isbytes(data): 502 raise ValueError("can only process bytes") 503 else: 504 if not py.builtin._istext(data): 505 if not py.builtin._isbytes(data): 506 data = str(data) 507 else: 508 data = py.builtin._totext(data, sys.getdefaultencoding()) 509 f = self.open(mode) 510 try: 511 f.write(data) 512 finally: 513 f.close() 514 515 def _ensuredirs(self): 516 parent = self.dirpath() 517 if parent == self: 518 return self 519 if parent.check(dir=0): 520 parent._ensuredirs() 521 if self.check(dir=0): 522 try: 523 self.mkdir() 524 except py.error.EEXIST: 525 # race condition: file/dir created by another thread/process. 526 # complain if it is not a dir 527 if self.check(dir=0): 528 raise 529 return self 530 531 def ensure(self, *args, **kwargs): 532 """ ensure that an args-joined path exists (by default as 533 a file). if you specify a keyword argument 'dir=True' 534 then the path is forced to be a directory path. 535 """ 536 p = self.join(*args) 537 if kwargs.get('dir', 0): 538 return p._ensuredirs() 539 else: 540 p.dirpath()._ensuredirs() 541 if not p.check(file=1): 542 p.open('w').close() 543 return p 544 545 def stat(self, raising=True): 546 """ Return an os.stat() tuple. """ 547 if raising == True: 548 return Stat(self, py.error.checked_call(os.stat, self.strpath)) 549 try: 550 return Stat(self, os.stat(self.strpath)) 551 except KeyboardInterrupt: 552 raise 553 except Exception: 554 return None 555 556 def lstat(self): 557 """ Return an os.lstat() tuple. """ 558 return Stat(self, py.error.checked_call(os.lstat, self.strpath)) 559 560 def setmtime(self, mtime=None): 561 """ set modification time for the given path. if 'mtime' is None 562 (the default) then the file's mtime is set to current time. 563 564 Note that the resolution for 'mtime' is platform dependent. 565 """ 566 if mtime is None: 567 return py.error.checked_call(os.utime, self.strpath, mtime) 568 try: 569 return py.error.checked_call(os.utime, self.strpath, (-1, mtime)) 570 except py.error.EINVAL: 571 return py.error.checked_call(os.utime, self.strpath, (self.atime(), mtime)) 572 573 def chdir(self): 574 """ change directory to self and return old current directory """ 575 try: 576 old = self.__class__() 577 except py.error.ENOENT: 578 old = None 579 py.error.checked_call(os.chdir, self.strpath) 580 return old 581 582 583 @contextmanager 584 def as_cwd(self): 585 """ 586 Return a context manager, which changes to the path's dir during the 587 managed "with" context. 588 On __enter__ it returns the old dir, which might be ``None``. 589 """ 590 old = self.chdir() 591 try: 592 yield old 593 finally: 594 if old is not None: 595 old.chdir() 596 597 def realpath(self): 598 """ return a new path which contains no symbolic links.""" 599 return self.__class__(os.path.realpath(self.strpath)) 600 601 def atime(self): 602 """ return last access time of the path. """ 603 return self.stat().atime 604 605 def __repr__(self): 606 return 'local(%r)' % self.strpath 607 608 def __str__(self): 609 """ return string representation of the Path. """ 610 return self.strpath 611 612 def chmod(self, mode, rec=0): 613 """ change permissions to the given mode. If mode is an 614 integer it directly encodes the os-specific modes. 615 if rec is True perform recursively. 616 """ 617 if not isinstance(mode, int): 618 raise TypeError("mode %r must be an integer" % (mode,)) 619 if rec: 620 for x in self.visit(rec=rec): 621 py.error.checked_call(os.chmod, str(x), mode) 622 py.error.checked_call(os.chmod, self.strpath, mode) 623 624 def pypkgpath(self): 625 """ return the Python package path by looking for the last 626 directory upwards which still contains an __init__.py. 627 Return None if a pkgpath can not be determined. 628 """ 629 pkgpath = None 630 for parent in self.parts(reverse=True): 631 if parent.isdir(): 632 if not parent.join('__init__.py').exists(): 633 break 634 if not isimportable(parent.basename): 635 break 636 pkgpath = parent 637 return pkgpath 638 639 def _ensuresyspath(self, ensuremode, path): 640 if ensuremode: 641 s = str(path) 642 if ensuremode == "append": 643 if s not in sys.path: 644 sys.path.append(s) 645 else: 646 if s != sys.path[0]: 647 sys.path.insert(0, s) 648 649 def pyimport(self, modname=None, ensuresyspath=True): 650 """ return path as an imported python module. 651 652 If modname is None, look for the containing package 653 and construct an according module name. 654 The module will be put/looked up in sys.modules. 655 if ensuresyspath is True then the root dir for importing 656 the file (taking __init__.py files into account) will 657 be prepended to sys.path if it isn't there already. 658 If ensuresyspath=="append" the root dir will be appended 659 if it isn't already contained in sys.path. 660 if ensuresyspath is False no modification of syspath happens. 661 662 Special value of ensuresyspath=="importlib" is intended 663 purely for using in pytest, it is capable only of importing 664 separate .py files outside packages, e.g. for test suite 665 without any __init__.py file. It effectively allows having 666 same-named test modules in different places and offers 667 mild opt-in via this option. Note that it works only in 668 recent versions of python. 669 """ 670 if not self.check(): 671 raise py.error.ENOENT(self) 672 673 if ensuresyspath == 'importlib': 674 if modname is None: 675 modname = self.purebasename 676 if not ALLOW_IMPORTLIB_MODE: 677 raise ImportError( 678 "Can't use importlib due to old version of Python") 679 spec = importlib.util.spec_from_file_location( 680 modname, str(self)) 681 if spec is None: 682 raise ImportError( 683 "Can't find module %s at location %s" % 684 (modname, str(self)) 685 ) 686 mod = importlib.util.module_from_spec(spec) 687 spec.loader.exec_module(mod) 688 return mod 689 690 pkgpath = None 691 if modname is None: 692 pkgpath = self.pypkgpath() 693 if pkgpath is not None: 694 pkgroot = pkgpath.dirpath() 695 names = self.new(ext="").relto(pkgroot).split(self.sep) 696 if names[-1] == "__init__": 697 names.pop() 698 modname = ".".join(names) 699 else: 700 pkgroot = self.dirpath() 701 modname = self.purebasename 702 703 self._ensuresyspath(ensuresyspath, pkgroot) 704 __import__(modname) 705 mod = sys.modules[modname] 706 if self.basename == "__init__.py": 707 return mod # we don't check anything as we might 708 # be in a namespace package ... too icky to check 709 modfile = mod.__file__ 710 if modfile[-4:] in ('.pyc', '.pyo'): 711 modfile = modfile[:-1] 712 elif modfile.endswith('$py.class'): 713 modfile = modfile[:-9] + '.py' 714 if modfile.endswith(os.path.sep + "__init__.py"): 715 if self.basename != "__init__.py": 716 modfile = modfile[:-12] 717 try: 718 issame = self.samefile(modfile) 719 except py.error.ENOENT: 720 issame = False 721 if not issame: 722 ignore = os.getenv('PY_IGNORE_IMPORTMISMATCH') 723 if ignore != '1': 724 raise self.ImportMismatchError(modname, modfile, self) 725 return mod 726 else: 727 try: 728 return sys.modules[modname] 729 except KeyError: 730 # we have a custom modname, do a pseudo-import 731 import types 732 mod = types.ModuleType(modname) 733 mod.__file__ = str(self) 734 sys.modules[modname] = mod 735 try: 736 py.builtin.execfile(str(self), mod.__dict__) 737 except: 738 del sys.modules[modname] 739 raise 740 return mod 741 742 def sysexec(self, *argv, **popen_opts): 743 """ return stdout text from executing a system child process, 744 where the 'self' path points to executable. 745 The process is directly invoked and not through a system shell. 746 """ 747 from subprocess import Popen, PIPE 748 argv = map_as_list(str, argv) 749 popen_opts['stdout'] = popen_opts['stderr'] = PIPE 750 proc = Popen([str(self)] + argv, **popen_opts) 751 stdout, stderr = proc.communicate() 752 ret = proc.wait() 753 if py.builtin._isbytes(stdout): 754 stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) 755 if ret != 0: 756 if py.builtin._isbytes(stderr): 757 stderr = py.builtin._totext(stderr, sys.getdefaultencoding()) 758 raise py.process.cmdexec.Error(ret, ret, str(self), 759 stdout, stderr,) 760 return stdout 761 762 def sysfind(cls, name, checker=None, paths=None): 763 """ return a path object found by looking at the systems 764 underlying PATH specification. If the checker is not None 765 it will be invoked to filter matching paths. If a binary 766 cannot be found, None is returned 767 Note: This is probably not working on plain win32 systems 768 but may work on cygwin. 769 """ 770 if isabs(name): 771 p = py.path.local(name) 772 if p.check(file=1): 773 return p 774 else: 775 if paths is None: 776 if iswin32: 777 paths = os.environ['Path'].split(';') 778 if '' not in paths and '.' not in paths: 779 paths.append('.') 780 try: 781 systemroot = os.environ['SYSTEMROOT'] 782 except KeyError: 783 pass 784 else: 785 paths = [path.replace('%SystemRoot%', systemroot) 786 for path in paths] 787 else: 788 paths = os.environ['PATH'].split(':') 789 tryadd = [] 790 if iswin32: 791 tryadd += os.environ['PATHEXT'].split(os.pathsep) 792 tryadd.append("") 793 794 for x in paths: 795 for addext in tryadd: 796 p = py.path.local(x).join(name, abs=True) + addext 797 try: 798 if p.check(file=1): 799 if checker: 800 if not checker(p): 801 continue 802 return p 803 except py.error.EACCES: 804 pass 805 return None 806 sysfind = classmethod(sysfind) 807 808 def _gethomedir(cls): 809 try: 810 x = os.environ['HOME'] 811 except KeyError: 812 try: 813 x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH'] 814 except KeyError: 815 return None 816 return cls(x) 817 _gethomedir = classmethod(_gethomedir) 818 819 # """ 820 # special class constructors for local filesystem paths 821 # """ 822 @classmethod 823 def get_temproot(cls): 824 """ return the system's temporary directory 825 (where tempfiles are usually created in) 826 """ 827 import tempfile 828 return py.path.local(tempfile.gettempdir()) 829 830 @classmethod 831 def mkdtemp(cls, rootdir=None): 832 """ return a Path object pointing to a fresh new temporary directory 833 (which we created ourself). 834 """ 835 import tempfile 836 if rootdir is None: 837 rootdir = cls.get_temproot() 838 return cls(py.error.checked_call(tempfile.mkdtemp, dir=str(rootdir))) 839 840 def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, 841 lock_timeout=172800): # two days 842 """ return unique directory with a number greater than the current 843 maximum one. The number is assumed to start directly after prefix. 844 if keep is true directories with a number less than (maxnum-keep) 845 will be removed. If .lock files are used (lock_timeout non-zero), 846 algorithm is multi-process safe. 847 """ 848 if rootdir is None: 849 rootdir = cls.get_temproot() 850 851 nprefix = prefix.lower() 852 def parse_num(path): 853 """ parse the number out of a path (if it matches the prefix) """ 854 nbasename = path.basename.lower() 855 if nbasename.startswith(nprefix): 856 try: 857 return int(nbasename[len(nprefix):]) 858 except ValueError: 859 pass 860 861 def create_lockfile(path): 862 """ exclusively create lockfile. Throws when failed """ 863 mypid = os.getpid() 864 lockfile = path.join('.lock') 865 if hasattr(lockfile, 'mksymlinkto'): 866 lockfile.mksymlinkto(str(mypid)) 867 else: 868 fd = py.error.checked_call(os.open, str(lockfile), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 869 with os.fdopen(fd, 'w') as f: 870 f.write(str(mypid)) 871 return lockfile 872 873 def atexit_remove_lockfile(lockfile): 874 """ ensure lockfile is removed at process exit """ 875 mypid = os.getpid() 876 def try_remove_lockfile(): 877 # in a fork() situation, only the last process should 878 # remove the .lock, otherwise the other processes run the 879 # risk of seeing their temporary dir disappear. For now 880 # we remove the .lock in the parent only (i.e. we assume 881 # that the children finish before the parent). 882 if os.getpid() != mypid: 883 return 884 try: 885 lockfile.remove() 886 except py.error.Error: 887 pass 888 atexit.register(try_remove_lockfile) 889 890 # compute the maximum number currently in use with the prefix 891 lastmax = None 892 while True: 893 maxnum = -1 894 for path in rootdir.listdir(): 895 num = parse_num(path) 896 if num is not None: 897 maxnum = max(maxnum, num) 898 899 # make the new directory 900 try: 901 udir = rootdir.mkdir(prefix + str(maxnum+1)) 902 if lock_timeout: 903 lockfile = create_lockfile(udir) 904 atexit_remove_lockfile(lockfile) 905 except (py.error.EEXIST, py.error.ENOENT, py.error.EBUSY): 906 # race condition (1): another thread/process created the dir 907 # in the meantime - try again 908 # race condition (2): another thread/process spuriously acquired 909 # lock treating empty directory as candidate 910 # for removal - try again 911 # race condition (3): another thread/process tried to create the lock at 912 # the same time (happened in Python 3.3 on Windows) 913 # https://ci.appveyor.com/project/pytestbot/py/build/1.0.21/job/ffi85j4c0lqwsfwa 914 if lastmax == maxnum: 915 raise 916 lastmax = maxnum 917 continue 918 break 919 920 def get_mtime(path): 921 """ read file modification time """ 922 try: 923 return path.lstat().mtime 924 except py.error.Error: 925 pass 926 927 garbage_prefix = prefix + 'garbage-' 928 929 def is_garbage(path): 930 """ check if path denotes directory scheduled for removal """ 931 bn = path.basename 932 return bn.startswith(garbage_prefix) 933 934 # prune old directories 935 udir_time = get_mtime(udir) 936 if keep and udir_time: 937 for path in rootdir.listdir(): 938 num = parse_num(path) 939 if num is not None and num <= (maxnum - keep): 940 try: 941 # try acquiring lock to remove directory as exclusive user 942 if lock_timeout: 943 create_lockfile(path) 944 except (py.error.EEXIST, py.error.ENOENT, py.error.EBUSY): 945 path_time = get_mtime(path) 946 if not path_time: 947 # assume directory doesn't exist now 948 continue 949 if abs(udir_time - path_time) < lock_timeout: 950 # assume directory with lockfile exists 951 # and lock timeout hasn't expired yet 952 continue 953 954 # path dir locked for exclusive use 955 # and scheduled for removal to avoid another thread/process 956 # treating it as a new directory or removal candidate 957 garbage_path = rootdir.join(garbage_prefix + str(uuid.uuid4())) 958 try: 959 path.rename(garbage_path) 960 garbage_path.remove(rec=1) 961 except KeyboardInterrupt: 962 raise 963 except: # this might be py.error.Error, WindowsError ... 964 pass 965 if is_garbage(path): 966 try: 967 path.remove(rec=1) 968 except KeyboardInterrupt: 969 raise 970 except: # this might be py.error.Error, WindowsError ... 971 pass 972 973 # make link... 974 try: 975 username = os.environ['USER'] #linux, et al 976 except KeyError: 977 try: 978 username = os.environ['USERNAME'] #windows 979 except KeyError: 980 username = 'current' 981 982 src = str(udir) 983 dest = src[:src.rfind('-')] + '-' + username 984 try: 985 os.unlink(dest) 986 except OSError: 987 pass 988 try: 989 os.symlink(src, dest) 990 except (OSError, AttributeError, NotImplementedError): 991 pass 992 993 return udir 994 make_numbered_dir = classmethod(make_numbered_dir) 995 996 997def copymode(src, dest): 998 """ copy permission from src to dst. """ 999 import shutil 1000 shutil.copymode(src, dest) 1001 1002 1003def copystat(src, dest): 1004 """ copy permission, last modification time, 1005 last access time, and flags from src to dst.""" 1006 import shutil 1007 shutil.copystat(str(src), str(dest)) 1008 1009 1010def copychunked(src, dest): 1011 chunksize = 524288 # half a meg of bytes 1012 fsrc = src.open('rb') 1013 try: 1014 fdest = dest.open('wb') 1015 try: 1016 while 1: 1017 buf = fsrc.read(chunksize) 1018 if not buf: 1019 break 1020 fdest.write(buf) 1021 finally: 1022 fdest.close() 1023 finally: 1024 fsrc.close() 1025 1026 1027def isimportable(name): 1028 if name and (name[0].isalpha() or name[0] == '_'): 1029 name = name.replace("_", '') 1030 return not name or name.isalnum() 1031