1import py 2import pytest 3 4__version__ = '1.0' 5 6def pytest_addoption(parser): 7 group = parser.getgroup("general") 8 group.addoption('--lf', action='store_true', dest="lf", 9 help="rerun only the tests that failed at the last run (or all if none failed)") 10 group.addoption('--ff', action='store_true', dest="failedfirst", 11 help="run all tests but run the last failures first. This may re-order " 12 "tests and thus lead to repeated fixture setup/teardown") 13 group.addoption('--cache', action='store_true', dest="showcache", 14 help="show cache contents, don't perform collection or tests") 15 group.addoption('--clearcache', action='store_true', dest="clearcache", 16 help="remove all cache contents at start of test run.") 17 18def pytest_cmdline_main(config): 19 if config.option.showcache: 20 from _pytest.main import wrap_session 21 return wrap_session(config, showcache) 22 23@pytest.mark.tryfirst 24def pytest_configure(config): 25 config.cache = cache = Cache(config) 26 config.pluginmanager.register(LFPlugin(config), "lfplugin") 27 28def pytest_report_header(config): 29 if config.option.verbose: 30 relpath = py.path.local().bestrelpath(config.cache._cachedir) 31 return "cachedir: %s" % config.cache._cachedir 32 33class Cache: 34 def __init__(self, config): 35 self.config = config 36 self._cachedir = getrootdir(config, ".cache") 37 self.trace = config.trace.root.get("cache") 38 if config.getvalue("clearcache"): 39 self.trace("clearing cachedir") 40 if self._cachedir.check(): 41 self._cachedir.remove() 42 self._cachedir.mkdir() 43 44 def makedir(self, name): 45 """ return a directory path object with the given name. If the 46 directory does not yet exist, it will be created. You can use it 47 to manage files likes e. g. store/retrieve database 48 dumps across test sessions. 49 50 :param name: must be a string not containing a ``/`` separator. 51 Make sure the name contains your plugin or application 52 identifiers to prevent clashes with other cache users. 53 """ 54 if name.count("/") != 0: 55 raise ValueError("name is not allowed to contain '/'") 56 p = self._cachedir.join("d/" + name) 57 p.ensure(dir=1) 58 return p 59 60 def _getpath(self, key): 61 if not key.count("/") > 1: 62 raise KeyError("Key must be of format 'dir/.../subname") 63 return self._cachedir.join(key) 64 65 def _getvaluepath(self, key): 66 p = self._getpath("v/" + key) 67 p.dirpath().ensure(dir=1) 68 return p 69 70 def get(self, key, default): 71 """ return cached value for the given key. If no value 72 was yet cached or the value cannot be read, the specified 73 default is returned. 74 75 :param key: must be a ``/`` separated value. Usually the first 76 name is the name of your plugin or your application. 77 :param default: must be provided in case of a cache-miss or 78 invalid cache values. 79 80 """ 81 from execnet import loads, DataFormatError 82 path = self._getvaluepath(key) 83 if path.check(): 84 f = path.open("rb") 85 try: 86 try: 87 return loads(f.read()) 88 finally: 89 f.close() 90 except DataFormatError: 91 self.trace("cache-invalid at %s" % (key,)) 92 return default 93 94 def set(self, key, value): 95 """ save value for the given key. 96 97 :param key: must be a ``/`` separated value. Usually the first 98 name is the name of your plugin or your application. 99 :param value: must be of any combination of basic 100 python types, including nested types 101 like e. g. lists of dictionaries. 102 """ 103 from execnet import dumps, DataFormatError 104 path = self._getvaluepath(key) 105 f = path.open("wb") 106 try: 107 try: 108 self.trace("cache-write %s: %r" % (key, value,)) 109 return f.write(dumps(value)) 110 finally: 111 f.close() 112 except DataFormatError: 113 raise ValueError("cannot serialize a builtin python type") 114 115 116class LFPlugin: 117 """ Plugin which implements the --lf (run last-failing) option """ 118 def __init__(self, config): 119 self.config = config 120 self.active = config.getvalue("lf") or config.getvalue("failedfirst") 121 if self.active: 122 self.lastfailed = config.cache.get("cache/lastfailed", set()) 123 else: 124 self.lastfailed = set() 125 126 def pytest_report_header(self): 127 if self.active: 128 if not self.lastfailed: 129 mode = "run all (no recorded failures)" 130 else: 131 mode = "rerun last %d failures%s" % ( 132 len(self.lastfailed), 133 " first" if self.config.getvalue("failedfirst") else "") 134 return "run-last-failure: %s" % mode 135 136 def pytest_runtest_logreport(self, report): 137 if report.failed and "xfail" not in report.keywords: 138 self.lastfailed.add(report.nodeid) 139 elif not report.failed: 140 if report.when == "call": 141 try: 142 self.lastfailed.remove(report.nodeid) 143 except KeyError: 144 pass 145 146 def pytest_collection_modifyitems(self, session, config, items): 147 if self.active and self.lastfailed: 148 previously_failed = [] 149 previously_passed = [] 150 for item in items: 151 if item.nodeid in self.lastfailed: 152 previously_failed.append(item) 153 else: 154 previously_passed.append(item) 155 if self.config.getvalue("failedfirst"): 156 items[:] = previously_failed + previously_passed 157 else: 158 items[:] = previously_failed 159 config.hook.pytest_deselected(items=previously_passed) 160 161 def pytest_sessionfinish(self, session): 162 config = self.config 163 if config.getvalue("showcache") or hasattr(config, "slaveinput"): 164 return 165 config.cache.set("cache/lastfailed", self.lastfailed) 166 167 168def showcache(config, session): 169 from pprint import pprint 170 tw = py.io.TerminalWriter() 171 tw.line("cachedir: " + str(config.cache._cachedir)) 172 if not config.cache._cachedir.check(): 173 tw.line("cache is empty") 174 return 0 175 dummy = object() 176 basedir = config.cache._cachedir 177 vdir = basedir.join("v") 178 tw.sep("-", "cache values") 179 for valpath in vdir.visit(lambda x: x.check(file=1)): 180 key = valpath.relto(vdir).replace(valpath.sep, "/") 181 val = config.cache.get(key, dummy) 182 if val is dummy: 183 tw.line("%s contains unreadable content, " 184 "will be ignored" % key) 185 else: 186 tw.line("%s contains:" % key) 187 stream = py.io.TextIO() 188 pprint(val, stream=stream) 189 for line in stream.getvalue().splitlines(): 190 tw.line(" " + line) 191 192 ddir = basedir.join("d") 193 if ddir.check(dir=1) and ddir.listdir(): 194 tw.sep("-", "cache directories") 195 for p in basedir.join("d").visit(): 196 #if p.check(dir=1): 197 # print("%s/" % p.relto(basedir)) 198 if p.check(file=1): 199 key = p.relto(basedir) 200 tw.line("%s is a file of length %d" % ( 201 key, p.size())) 202 203 204### XXX consider shifting part of the below to pytest config object 205 206def getrootdir(config, name): 207 """ return a best-effort root subdir for this test run. 208 209 Starting from files specified at the command line (or cwd) 210 search starts upward for the first "tox.ini", "pytest.ini", 211 "setup.cfg" or "setup.py" file. The first directory containing 212 such a file will be used to return a named subdirectory 213 (py.path.local object). 214 215 """ 216 if config.inicfg: 217 p = py.path.local(config.inicfg.config.path).dirpath() 218 else: 219 inibasenames = ["setup.py", "setup.cfg", "tox.ini", "pytest.ini"] 220 for x in getroot(config.args, inibasenames): 221 p = x.dirpath() 222 break 223 else: 224 p = py.path.local() 225 config.trace.get("warn")("no rootdir found, using %s" % p) 226 subdir = p.join(name) 227 config.trace("root %s: %s" % (name, subdir)) 228 return subdir 229 230def getroot(args, inibasenames): 231 args = [x for x in args if not str(x).startswith("-")] 232 if not args: 233 args = [py.path.local()] 234 for arg in args: 235 arg = py.path.local(arg) 236 for base in arg.parts(reverse=True): 237 for inibasename in inibasenames: 238 p = base.join(inibasename) 239 if p.check(): 240 yield p 241