1import py
2import pytest
3
4__version__ = '1.0'
5
6def pytest_addoption(parser):
7    group = parser.getgroup("general")
8    group.addoption('--lf', action='store_true', dest="lf",
9        help="rerun only the tests that failed at the last run (or all if none failed)")
10    group.addoption('--ff', action='store_true', dest="failedfirst",
11        help="run all tests but run the last failures first.  This may re-order "
12             "tests and thus lead to repeated fixture setup/teardown")
13    group.addoption('--cache', action='store_true', dest="showcache",
14        help="show cache contents, don't perform collection or tests")
15    group.addoption('--clearcache', action='store_true', dest="clearcache",
16        help="remove all cache contents at start of test run.")
17
18def pytest_cmdline_main(config):
19    if config.option.showcache:
20        from _pytest.main import wrap_session
21        return wrap_session(config, showcache)
22
23@pytest.mark.tryfirst
24def pytest_configure(config):
25    config.cache = cache = Cache(config)
26    config.pluginmanager.register(LFPlugin(config), "lfplugin")
27
28def pytest_report_header(config):
29    if config.option.verbose:
30        relpath = py.path.local().bestrelpath(config.cache._cachedir)
31        return "cachedir: %s" % config.cache._cachedir
32
33class Cache:
34    def __init__(self, config):
35        self.config = config
36        self._cachedir = getrootdir(config, ".cache")
37        self.trace = config.trace.root.get("cache")
38        if config.getvalue("clearcache"):
39            self.trace("clearing cachedir")
40            if self._cachedir.check():
41                self._cachedir.remove()
42            self._cachedir.mkdir()
43
44    def makedir(self, name):
45        """ return a directory path object with the given name.  If the
46        directory does not yet exist, it will be created.  You can use it
47        to manage files likes e. g. store/retrieve database
48        dumps across test sessions.
49
50        :param name: must be a string not containing a ``/`` separator.
51             Make sure the name contains your plugin or application
52             identifiers to prevent clashes with other cache users.
53        """
54        if name.count("/") != 0:
55            raise ValueError("name is not allowed to contain '/'")
56        p = self._cachedir.join("d/" + name)
57        p.ensure(dir=1)
58        return p
59
60    def _getpath(self, key):
61        if not key.count("/") > 1:
62            raise KeyError("Key must be of format 'dir/.../subname")
63        return self._cachedir.join(key)
64
65    def _getvaluepath(self, key):
66        p = self._getpath("v/" + key)
67        p.dirpath().ensure(dir=1)
68        return p
69
70    def get(self, key, default):
71        """ return cached value for the given key.  If no value
72        was yet cached or the value cannot be read, the specified
73        default is returned.
74
75        :param key: must be a ``/`` separated value. Usually the first
76             name is the name of your plugin or your application.
77        :param default: must be provided in case of a cache-miss or
78             invalid cache values.
79
80        """
81        from execnet import loads, DataFormatError
82        path = self._getvaluepath(key)
83        if path.check():
84            f = path.open("rb")
85            try:
86                try:
87                    return loads(f.read())
88                finally:
89                    f.close()
90            except DataFormatError:
91                self.trace("cache-invalid at %s" % (key,))
92        return default
93
94    def set(self, key, value):
95        """ save value for the given key.
96
97        :param key: must be a ``/`` separated value. Usually the first
98             name is the name of your plugin or your application.
99        :param value: must be of any combination of basic
100               python types, including nested types
101               like e. g. lists of dictionaries.
102        """
103        from execnet import dumps, DataFormatError
104        path = self._getvaluepath(key)
105        f = path.open("wb")
106        try:
107            try:
108                self.trace("cache-write %s: %r" % (key, value,))
109                return f.write(dumps(value))
110            finally:
111                f.close()
112        except DataFormatError:
113            raise ValueError("cannot serialize a builtin python type")
114
115
116class LFPlugin:
117    """ Plugin which implements the --lf (run last-failing) option """
118    def __init__(self, config):
119        self.config = config
120        self.active = config.getvalue("lf") or config.getvalue("failedfirst")
121        if self.active:
122            self.lastfailed = config.cache.get("cache/lastfailed", set())
123        else:
124            self.lastfailed = set()
125
126    def pytest_report_header(self):
127        if self.active:
128            if not self.lastfailed:
129                mode = "run all (no recorded failures)"
130            else:
131                mode = "rerun last %d failures%s" % (
132                    len(self.lastfailed),
133                    " first" if self.config.getvalue("failedfirst") else "")
134            return "run-last-failure: %s" % mode
135
136    def pytest_runtest_logreport(self, report):
137        if report.failed and "xfail" not in report.keywords:
138            self.lastfailed.add(report.nodeid)
139        elif not report.failed:
140            if report.when == "call":
141                try:
142                    self.lastfailed.remove(report.nodeid)
143                except KeyError:
144                    pass
145
146    def pytest_collection_modifyitems(self, session, config, items):
147        if self.active and self.lastfailed:
148            previously_failed = []
149            previously_passed = []
150            for item in items:
151                if item.nodeid in self.lastfailed:
152                    previously_failed.append(item)
153                else:
154                    previously_passed.append(item)
155            if self.config.getvalue("failedfirst"):
156                items[:] = previously_failed + previously_passed
157            else:
158                items[:] = previously_failed
159                config.hook.pytest_deselected(items=previously_passed)
160
161    def pytest_sessionfinish(self, session):
162        config = self.config
163        if config.getvalue("showcache") or hasattr(config, "slaveinput"):
164            return
165        config.cache.set("cache/lastfailed", self.lastfailed)
166
167
168def showcache(config, session):
169    from pprint import pprint
170    tw = py.io.TerminalWriter()
171    tw.line("cachedir: " + str(config.cache._cachedir))
172    if not config.cache._cachedir.check():
173        tw.line("cache is empty")
174        return 0
175    dummy = object()
176    basedir = config.cache._cachedir
177    vdir = basedir.join("v")
178    tw.sep("-", "cache values")
179    for valpath in vdir.visit(lambda x: x.check(file=1)):
180        key = valpath.relto(vdir).replace(valpath.sep, "/")
181        val = config.cache.get(key, dummy)
182        if val is dummy:
183            tw.line("%s contains unreadable content, "
184                  "will be ignored" % key)
185        else:
186            tw.line("%s contains:" % key)
187            stream = py.io.TextIO()
188            pprint(val, stream=stream)
189            for line in stream.getvalue().splitlines():
190                tw.line("  " + line)
191
192    ddir = basedir.join("d")
193    if ddir.check(dir=1) and ddir.listdir():
194        tw.sep("-", "cache directories")
195        for p in basedir.join("d").visit():
196            #if p.check(dir=1):
197            #    print("%s/" % p.relto(basedir))
198            if p.check(file=1):
199                key = p.relto(basedir)
200                tw.line("%s is a file of length %d" % (
201                        key, p.size()))
202
203
204### XXX consider shifting part of the below to pytest config object
205
206def getrootdir(config, name):
207    """ return a best-effort root subdir for this test run.
208
209    Starting from files specified at the command line (or cwd)
210    search starts upward for the first "tox.ini", "pytest.ini",
211    "setup.cfg" or "setup.py" file.  The first directory containing
212    such a file will be used to return a named subdirectory
213    (py.path.local object).
214
215    """
216    if config.inicfg:
217        p = py.path.local(config.inicfg.config.path).dirpath()
218    else:
219        inibasenames = ["setup.py", "setup.cfg", "tox.ini", "pytest.ini"]
220        for x in getroot(config.args, inibasenames):
221            p = x.dirpath()
222            break
223        else:
224            p = py.path.local()
225            config.trace.get("warn")("no rootdir found, using %s" % p)
226    subdir = p.join(name)
227    config.trace("root %s: %s" % (name, subdir))
228    return subdir
229
230def getroot(args, inibasenames):
231    args = [x for x in args if not str(x).startswith("-")]
232    if not args:
233        args = [py.path.local()]
234    for arg in args:
235        arg = py.path.local(arg)
236        for base in arg.parts(reverse=True):
237            for inibasename in inibasenames:
238                p = base.join(inibasename)
239                if p.check():
240                    yield p
241