1""" core implementation of testing process: init, session, runtest loop. """
2from __future__ import absolute_import, division, print_function
3
4import contextlib
5import functools
6import os
7import pkgutil
8import six
9import sys
10
11import _pytest
12from _pytest import nodes
13import _pytest._code
14import py
15
16from _pytest.config import directory_arg, UsageError, hookimpl
17from _pytest.outcomes import exit
18from _pytest.runner import collect_one_node
19
20
21# exitcodes for the command line
22EXIT_OK = 0
23EXIT_TESTSFAILED = 1
24EXIT_INTERRUPTED = 2
25EXIT_INTERNALERROR = 3
26EXIT_USAGEERROR = 4
27EXIT_NOTESTSCOLLECTED = 5
28
29
30def pytest_addoption(parser):
31    parser.addini(
32        "norecursedirs",
33        "directory patterns to avoid for recursion",
34        type="args",
35        default=[".*", "build", "dist", "CVS", "_darcs", "{arch}", "*.egg", "venv"],
36    )
37    parser.addini(
38        "testpaths",
39        "directories to search for tests when no files or directories are given in the "
40        "command line.",
41        type="args",
42        default=[],
43    )
44    # parser.addini("dirpatterns",
45    #    "patterns specifying possible locations of test files",
46    #    type="linelist", default=["**/test_*.txt",
47    #            "**/test_*.py", "**/*_test.py"]
48    # )
49    group = parser.getgroup("general", "running and selection options")
50    group._addoption(
51        "-x",
52        "--exitfirst",
53        action="store_const",
54        dest="maxfail",
55        const=1,
56        help="exit instantly on first error or failed test.",
57    ),
58    group._addoption(
59        "--maxfail",
60        metavar="num",
61        action="store",
62        type=int,
63        dest="maxfail",
64        default=0,
65        help="exit after first num failures or errors.",
66    )
67    group._addoption(
68        "--strict",
69        action="store_true",
70        help="marks not registered in configuration file raise errors.",
71    )
72    group._addoption(
73        "-c",
74        metavar="file",
75        type=str,
76        dest="inifilename",
77        help="load configuration from `file` instead of trying to locate one of the implicit "
78        "configuration files.",
79    )
80    group._addoption(
81        "--continue-on-collection-errors",
82        action="store_true",
83        default=False,
84        dest="continue_on_collection_errors",
85        help="Force test execution even if collection errors occur.",
86    )
87    group._addoption(
88        "--rootdir",
89        action="store",
90        dest="rootdir",
91        help="Define root directory for tests. Can be relative path: 'root_dir', './root_dir', "
92        "'root_dir/another_dir/'; absolute path: '/home/user/root_dir'; path with variables: "
93        "'$HOME/root_dir'.",
94    )
95
96    group = parser.getgroup("collect", "collection")
97    group.addoption(
98        "--collectonly",
99        "--collect-only",
100        action="store_true",
101        help="only collect tests, don't execute them.",
102    ),
103    group.addoption(
104        "--pyargs",
105        action="store_true",
106        help="try to interpret all arguments as python packages.",
107    )
108    group.addoption(
109        "--ignore",
110        action="append",
111        metavar="path",
112        help="ignore path during collection (multi-allowed).",
113    )
114    group.addoption(
115        "--deselect",
116        action="append",
117        metavar="nodeid_prefix",
118        help="deselect item during collection (multi-allowed).",
119    )
120    # when changing this to --conf-cut-dir, config.py Conftest.setinitial
121    # needs upgrading as well
122    group.addoption(
123        "--confcutdir",
124        dest="confcutdir",
125        default=None,
126        metavar="dir",
127        type=functools.partial(directory_arg, optname="--confcutdir"),
128        help="only load conftest.py's relative to specified dir.",
129    )
130    group.addoption(
131        "--noconftest",
132        action="store_true",
133        dest="noconftest",
134        default=False,
135        help="Don't load any conftest.py files.",
136    )
137    group.addoption(
138        "--keepduplicates",
139        "--keep-duplicates",
140        action="store_true",
141        dest="keepduplicates",
142        default=False,
143        help="Keep duplicate tests.",
144    )
145    group.addoption(
146        "--collect-in-virtualenv",
147        action="store_true",
148        dest="collect_in_virtualenv",
149        default=False,
150        help="Don't ignore tests in a local virtualenv directory",
151    )
152
153    group = parser.getgroup("debugconfig", "test session debugging and configuration")
154    group.addoption(
155        "--basetemp",
156        dest="basetemp",
157        default=None,
158        metavar="dir",
159        help="base temporary directory for this test run.",
160    )
161
162
163def pytest_configure(config):
164    __import__("pytest").config = config  # compatibility
165
166
167def wrap_session(config, doit):
168    """Skeleton command line program"""
169    session = Session(config)
170    session.exitstatus = EXIT_OK
171    initstate = 0
172    try:
173        try:
174            config._do_configure()
175            initstate = 1
176            config.hook.pytest_sessionstart(session=session)
177            initstate = 2
178            session.exitstatus = doit(config, session) or 0
179        except UsageError:
180            raise
181        except Failed:
182            session.exitstatus = EXIT_TESTSFAILED
183        except KeyboardInterrupt:
184            excinfo = _pytest._code.ExceptionInfo()
185            if initstate < 2 and isinstance(excinfo.value, exit.Exception):
186                sys.stderr.write("{}: {}\n".format(excinfo.typename, excinfo.value.msg))
187            config.hook.pytest_keyboard_interrupt(excinfo=excinfo)
188            session.exitstatus = EXIT_INTERRUPTED
189        except:  # noqa
190            excinfo = _pytest._code.ExceptionInfo()
191            config.notify_exception(excinfo, config.option)
192            session.exitstatus = EXIT_INTERNALERROR
193            if excinfo.errisinstance(SystemExit):
194                sys.stderr.write("mainloop: caught Spurious SystemExit!\n")
195
196    finally:
197        excinfo = None  # Explicitly break reference cycle.
198        session.startdir.chdir()
199        if initstate >= 2:
200            config.hook.pytest_sessionfinish(
201                session=session, exitstatus=session.exitstatus
202            )
203        config._ensure_unconfigure()
204    return session.exitstatus
205
206
207def pytest_cmdline_main(config):
208    return wrap_session(config, _main)
209
210
211def _main(config, session):
212    """ default command line protocol for initialization, session,
213    running tests and reporting. """
214    config.hook.pytest_collection(session=session)
215    config.hook.pytest_runtestloop(session=session)
216
217    if session.testsfailed:
218        return EXIT_TESTSFAILED
219    elif session.testscollected == 0:
220        return EXIT_NOTESTSCOLLECTED
221
222
223def pytest_collection(session):
224    return session.perform_collect()
225
226
227def pytest_runtestloop(session):
228    if session.testsfailed and not session.config.option.continue_on_collection_errors:
229        raise session.Interrupted("%d errors during collection" % session.testsfailed)
230
231    if session.config.option.collectonly:
232        return True
233
234    for i, item in enumerate(session.items):
235        nextitem = session.items[i + 1] if i + 1 < len(session.items) else None
236        item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
237        if session.shouldfail:
238            raise session.Failed(session.shouldfail)
239        if session.shouldstop:
240            raise session.Interrupted(session.shouldstop)
241    return True
242
243
244def _in_venv(path):
245    """Attempts to detect if ``path`` is the root of a Virtual Environment by
246    checking for the existence of the appropriate activate script"""
247    bindir = path.join("Scripts" if sys.platform.startswith("win") else "bin")
248    if not bindir.isdir():
249        return False
250    activates = (
251        "activate",
252        "activate.csh",
253        "activate.fish",
254        "Activate",
255        "Activate.bat",
256        "Activate.ps1",
257    )
258    return any([fname.basename in activates for fname in bindir.listdir()])
259
260
261def pytest_ignore_collect(path, config):
262    ignore_paths = config._getconftest_pathlist("collect_ignore", path=path.dirpath())
263    ignore_paths = ignore_paths or []
264    excludeopt = config.getoption("ignore")
265    if excludeopt:
266        ignore_paths.extend([py.path.local(x) for x in excludeopt])
267
268    if py.path.local(path) in ignore_paths:
269        return True
270
271    allow_in_venv = config.getoption("collect_in_virtualenv")
272    if _in_venv(path) and not allow_in_venv:
273        return True
274
275    # Skip duplicate paths.
276    keepduplicates = config.getoption("keepduplicates")
277    duplicate_paths = config.pluginmanager._duplicatepaths
278    if not keepduplicates:
279        if path in duplicate_paths:
280            return True
281        else:
282            duplicate_paths.add(path)
283
284    return False
285
286
287def pytest_collection_modifyitems(items, config):
288    deselect_prefixes = tuple(config.getoption("deselect") or [])
289    if not deselect_prefixes:
290        return
291
292    remaining = []
293    deselected = []
294    for colitem in items:
295        if colitem.nodeid.startswith(deselect_prefixes):
296            deselected.append(colitem)
297        else:
298            remaining.append(colitem)
299
300    if deselected:
301        config.hook.pytest_deselected(items=deselected)
302        items[:] = remaining
303
304
305@contextlib.contextmanager
306def _patched_find_module():
307    """Patch bug in pkgutil.ImpImporter.find_module
308
309    When using pkgutil.find_loader on python<3.4 it removes symlinks
310    from the path due to a call to os.path.realpath. This is not consistent
311    with actually doing the import (in these versions, pkgutil and __import__
312    did not share the same underlying code). This can break conftest
313    discovery for pytest where symlinks are involved.
314
315    The only supported python<3.4 by pytest is python 2.7.
316    """
317    if six.PY2:  # python 3.4+ uses importlib instead
318
319        def find_module_patched(self, fullname, path=None):
320            # Note: we ignore 'path' argument since it is only used via meta_path
321            subname = fullname.split(".")[-1]
322            if subname != fullname and self.path is None:
323                return None
324            if self.path is None:
325                path = None
326            else:
327                # original: path = [os.path.realpath(self.path)]
328                path = [self.path]
329            try:
330                file, filename, etc = pkgutil.imp.find_module(subname, path)
331            except ImportError:
332                return None
333            return pkgutil.ImpLoader(fullname, file, filename, etc)
334
335        old_find_module = pkgutil.ImpImporter.find_module
336        pkgutil.ImpImporter.find_module = find_module_patched
337        try:
338            yield
339        finally:
340            pkgutil.ImpImporter.find_module = old_find_module
341    else:
342        yield
343
344
345class FSHookProxy(object):
346
347    def __init__(self, fspath, pm, remove_mods):
348        self.fspath = fspath
349        self.pm = pm
350        self.remove_mods = remove_mods
351
352    def __getattr__(self, name):
353        x = self.pm.subset_hook_caller(name, remove_plugins=self.remove_mods)
354        self.__dict__[name] = x
355        return x
356
357
358class NoMatch(Exception):
359    """ raised if matching cannot locate a matching names. """
360
361
362class Interrupted(KeyboardInterrupt):
363    """ signals an interrupted test run. """
364    __module__ = "builtins"  # for py3
365
366
367class Failed(Exception):
368    """ signals a stop as failed test run. """
369
370
371class Session(nodes.FSCollector):
372    Interrupted = Interrupted
373    Failed = Failed
374
375    def __init__(self, config):
376        nodes.FSCollector.__init__(
377            self, config.rootdir, parent=None, config=config, session=self, nodeid=""
378        )
379        self.testsfailed = 0
380        self.testscollected = 0
381        self.shouldstop = False
382        self.shouldfail = False
383        self.trace = config.trace.root.get("collection")
384        self._norecursepatterns = config.getini("norecursedirs")
385        self.startdir = py.path.local()
386
387        self.config.pluginmanager.register(self, name="session")
388
389    @hookimpl(tryfirst=True)
390    def pytest_collectstart(self):
391        if self.shouldfail:
392            raise self.Failed(self.shouldfail)
393        if self.shouldstop:
394            raise self.Interrupted(self.shouldstop)
395
396    @hookimpl(tryfirst=True)
397    def pytest_runtest_logreport(self, report):
398        if report.failed and not hasattr(report, "wasxfail"):
399            self.testsfailed += 1
400            maxfail = self.config.getvalue("maxfail")
401            if maxfail and self.testsfailed >= maxfail:
402                self.shouldfail = "stopping after %d failures" % (self.testsfailed)
403
404    pytest_collectreport = pytest_runtest_logreport
405
406    def isinitpath(self, path):
407        return path in self._initialpaths
408
409    def gethookproxy(self, fspath):
410        # check if we have the common case of running
411        # hooks with all conftest.py files
412        pm = self.config.pluginmanager
413        my_conftestmodules = pm._getconftestmodules(fspath)
414        remove_mods = pm._conftest_plugins.difference(my_conftestmodules)
415        if remove_mods:
416            # one or more conftests are not in use at this fspath
417            proxy = FSHookProxy(fspath, pm, remove_mods)
418        else:
419            # all plugis are active for this fspath
420            proxy = self.config.hook
421        return proxy
422
423    def perform_collect(self, args=None, genitems=True):
424        hook = self.config.hook
425        try:
426            items = self._perform_collect(args, genitems)
427            self.config.pluginmanager.check_pending()
428            hook.pytest_collection_modifyitems(
429                session=self, config=self.config, items=items
430            )
431        finally:
432            hook.pytest_collection_finish(session=self)
433        self.testscollected = len(items)
434        return items
435
436    def _perform_collect(self, args, genitems):
437        if args is None:
438            args = self.config.args
439        self.trace("perform_collect", self, args)
440        self.trace.root.indent += 1
441        self._notfound = []
442        self._initialpaths = set()
443        self._initialparts = []
444        self.items = items = []
445        for arg in args:
446            parts = self._parsearg(arg)
447            self._initialparts.append(parts)
448            self._initialpaths.add(parts[0])
449        rep = collect_one_node(self)
450        self.ihook.pytest_collectreport(report=rep)
451        self.trace.root.indent -= 1
452        if self._notfound:
453            errors = []
454            for arg, exc in self._notfound:
455                line = "(no name %r in any of %r)" % (arg, exc.args[0])
456                errors.append("not found: %s\n%s" % (arg, line))
457                # XXX: test this
458            raise UsageError(*errors)
459        if not genitems:
460            return rep.result
461        else:
462            if rep.passed:
463                for node in rep.result:
464                    self.items.extend(self.genitems(node))
465            return items
466
467    def collect(self):
468        for parts in self._initialparts:
469            arg = "::".join(map(str, parts))
470            self.trace("processing argument", arg)
471            self.trace.root.indent += 1
472            try:
473                for x in self._collect(arg):
474                    yield x
475            except NoMatch:
476                # we are inside a make_report hook so
477                # we cannot directly pass through the exception
478                self._notfound.append((arg, sys.exc_info()[1]))
479
480            self.trace.root.indent -= 1
481
482    def _collect(self, arg):
483        names = self._parsearg(arg)
484        path = names.pop(0)
485        if path.check(dir=1):
486            assert not names, "invalid arg %r" % (arg,)
487            for path in path.visit(
488                fil=lambda x: x.check(file=1), rec=self._recurse, bf=True, sort=True
489            ):
490                for x in self._collectfile(path):
491                    yield x
492        else:
493            assert path.check(file=1)
494            for x in self.matchnodes(self._collectfile(path), names):
495                yield x
496
497    def _collectfile(self, path):
498        ihook = self.gethookproxy(path)
499        if not self.isinitpath(path):
500            if ihook.pytest_ignore_collect(path=path, config=self.config):
501                return ()
502        return ihook.pytest_collect_file(path=path, parent=self)
503
504    def _recurse(self, path):
505        ihook = self.gethookproxy(path.dirpath())
506        if ihook.pytest_ignore_collect(path=path, config=self.config):
507            return
508        for pat in self._norecursepatterns:
509            if path.check(fnmatch=pat):
510                return False
511        ihook = self.gethookproxy(path)
512        ihook.pytest_collect_directory(path=path, parent=self)
513        return True
514
515    def _tryconvertpyarg(self, x):
516        """Convert a dotted module name to path.
517
518        """
519
520        try:
521            with _patched_find_module():
522                loader = pkgutil.find_loader(x)
523        except ImportError:
524            return x
525        if loader is None:
526            return x
527        # This method is sometimes invoked when AssertionRewritingHook, which
528        # does not define a get_filename method, is already in place:
529        try:
530            with _patched_find_module():
531                path = loader.get_filename(x)
532        except AttributeError:
533            # Retrieve path from AssertionRewritingHook:
534            path = loader.modules[x][0].co_filename
535        if loader.is_package(x):
536            path = os.path.dirname(path)
537        return path
538
539    def _parsearg(self, arg):
540        """ return (fspath, names) tuple after checking the file exists. """
541        parts = str(arg).split("::")
542        if self.config.option.pyargs:
543            parts[0] = self._tryconvertpyarg(parts[0])
544        relpath = parts[0].replace("/", os.sep)
545        path = self.config.invocation_dir.join(relpath, abs=True)
546        if not path.check():
547            if self.config.option.pyargs:
548                raise UsageError(
549                    "file or package not found: " + arg + " (missing __init__.py?)"
550                )
551            else:
552                raise UsageError("file not found: " + arg)
553        parts[0] = path
554        return parts
555
556    def matchnodes(self, matching, names):
557        self.trace("matchnodes", matching, names)
558        self.trace.root.indent += 1
559        nodes = self._matchnodes(matching, names)
560        num = len(nodes)
561        self.trace("matchnodes finished -> ", num, "nodes")
562        self.trace.root.indent -= 1
563        if num == 0:
564            raise NoMatch(matching, names[:1])
565        return nodes
566
567    def _matchnodes(self, matching, names):
568        if not matching or not names:
569            return matching
570        name = names[0]
571        assert name
572        nextnames = names[1:]
573        resultnodes = []
574        for node in matching:
575            if isinstance(node, nodes.Item):
576                if not names:
577                    resultnodes.append(node)
578                continue
579            assert isinstance(node, nodes.Collector)
580            rep = collect_one_node(node)
581            if rep.passed:
582                has_matched = False
583                for x in rep.result:
584                    # TODO: remove parametrized workaround once collection structure contains parametrization
585                    if x.name == name or x.name.split("[")[0] == name:
586                        resultnodes.extend(self.matchnodes([x], nextnames))
587                        has_matched = True
588                # XXX accept IDs that don't have "()" for class instances
589                if not has_matched and len(rep.result) == 1 and x.name == "()":
590                    nextnames.insert(0, name)
591                    resultnodes.extend(self.matchnodes([x], nextnames))
592            else:
593                # report collection failures here to avoid failing to run some test
594                # specified in the command line because the module could not be
595                # imported (#134)
596                node.ihook.pytest_collectreport(report=rep)
597        return resultnodes
598
599    def genitems(self, node):
600        self.trace("genitems", node)
601        if isinstance(node, nodes.Item):
602            node.ihook.pytest_itemcollected(item=node)
603            yield node
604        else:
605            assert isinstance(node, nodes.Collector)
606            rep = collect_one_node(node)
607            if rep.passed:
608                for subnode in rep.result:
609                    for x in self.genitems(subnode):
610                        yield x
611            node.ihook.pytest_collectreport(report=rep)
612