1"""(disabled by default) support for testing pytest and pytest plugins."""
2from __future__ import absolute_import, division, print_function
3
4import codecs
5import gc
6import os
7import platform
8import re
9import subprocess
10import six
11import sys
12import time
13import traceback
14from fnmatch import fnmatch
15
16from weakref import WeakKeyDictionary
17
18from _pytest.capture import MultiCapture, SysCapture
19from _pytest._code import Source
20import py
21import pytest
22from _pytest.main import Session, EXIT_OK
23from _pytest.assertion.rewrite import AssertionRewritingHook
24
25
26PYTEST_FULLPATH = os.path.abspath(pytest.__file__.rstrip("oc")).replace(
27    "$py.class", ".py"
28)
29
30
31IGNORE_PAM = [  # filenames added when obtaining details about the current user
32    u"/var/lib/sss/mc/passwd"
33]
34
35
36def pytest_addoption(parser):
37    parser.addoption(
38        "--lsof",
39        action="store_true",
40        dest="lsof",
41        default=False,
42        help=("run FD checks if lsof is available"),
43    )
44
45    parser.addoption(
46        "--runpytest",
47        default="inprocess",
48        dest="runpytest",
49        choices=("inprocess", "subprocess"),
50        help=(
51            "run pytest sub runs in tests using an 'inprocess' "
52            "or 'subprocess' (python -m main) method"
53        ),
54    )
55
56
57def pytest_configure(config):
58    if config.getvalue("lsof"):
59        checker = LsofFdLeakChecker()
60        if checker.matching_platform():
61            config.pluginmanager.register(checker)
62
63
64class LsofFdLeakChecker(object):
65
66    def get_open_files(self):
67        out = self._exec_lsof()
68        open_files = self._parse_lsof_output(out)
69        return open_files
70
71    def _exec_lsof(self):
72        pid = os.getpid()
73        return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
74
75    def _parse_lsof_output(self, out):
76
77        def isopen(line):
78            return line.startswith("f") and (
79                "deleted" not in line
80                and "mem" not in line
81                and "txt" not in line
82                and "cwd" not in line
83            )
84
85        open_files = []
86
87        for line in out.split("\n"):
88            if isopen(line):
89                fields = line.split("\0")
90                fd = fields[0][1:]
91                filename = fields[1][1:]
92                if filename in IGNORE_PAM:
93                    continue
94                if filename.startswith("/"):
95                    open_files.append((fd, filename))
96
97        return open_files
98
99    def matching_platform(self):
100        try:
101            py.process.cmdexec("lsof -v")
102        except (py.process.cmdexec.Error, UnicodeDecodeError):
103            # cmdexec may raise UnicodeDecodeError on Windows systems with
104            # locale other than English:
105            # https://bitbucket.org/pytest-dev/py/issues/66
106            return False
107        else:
108            return True
109
110    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
111    def pytest_runtest_protocol(self, item):
112        lines1 = self.get_open_files()
113        yield
114        if hasattr(sys, "pypy_version_info"):
115            gc.collect()
116        lines2 = self.get_open_files()
117
118        new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
119        leaked_files = [t for t in lines2 if t[0] in new_fds]
120        if leaked_files:
121            error = []
122            error.append("***** %s FD leakage detected" % len(leaked_files))
123            error.extend([str(f) for f in leaked_files])
124            error.append("*** Before:")
125            error.extend([str(f) for f in lines1])
126            error.append("*** After:")
127            error.extend([str(f) for f in lines2])
128            error.append(error[0])
129            error.append("*** function %s:%s: %s " % item.location)
130            error.append("See issue #2366")
131            item.warn("", "\n".join(error))
132
133
134# XXX copied from execnet's conftest.py - needs to be merged
135winpymap = {
136    "python2.7": r"C:\Python27\python.exe",
137    "python3.4": r"C:\Python34\python.exe",
138    "python3.5": r"C:\Python35\python.exe",
139    "python3.6": r"C:\Python36\python.exe",
140}
141
142
143def getexecutable(name, cache={}):
144    try:
145        return cache[name]
146    except KeyError:
147        executable = py.path.local.sysfind(name)
148        if executable:
149            import subprocess
150
151            popen = subprocess.Popen(
152                [str(executable), "--version"],
153                universal_newlines=True,
154                stderr=subprocess.PIPE,
155            )
156            out, err = popen.communicate()
157            if name == "jython":
158                if not err or "2.5" not in err:
159                    executable = None
160                if "2.5.2" in err:
161                    executable = None  # http://bugs.jython.org/issue1790
162            elif popen.returncode != 0:
163                # handle pyenv's 127
164                executable = None
165        cache[name] = executable
166        return executable
167
168
169@pytest.fixture(params=["python2.7", "python3.4", "pypy", "pypy3"])
170def anypython(request):
171    name = request.param
172    executable = getexecutable(name)
173    if executable is None:
174        if sys.platform == "win32":
175            executable = winpymap.get(name, None)
176            if executable:
177                executable = py.path.local(executable)
178                if executable.check():
179                    return executable
180        pytest.skip("no suitable %s found" % (name,))
181    return executable
182
183
184# used at least by pytest-xdist plugin
185
186
187@pytest.fixture
188def _pytest(request):
189    """Return a helper which offers a gethookrecorder(hook) method which
190    returns a HookRecorder instance which helps to make assertions about called
191    hooks.
192
193    """
194    return PytestArg(request)
195
196
197class PytestArg(object):
198
199    def __init__(self, request):
200        self.request = request
201
202    def gethookrecorder(self, hook):
203        hookrecorder = HookRecorder(hook._pm)
204        self.request.addfinalizer(hookrecorder.finish_recording)
205        return hookrecorder
206
207
208def get_public_names(values):
209    """Only return names from iterator values without a leading underscore."""
210    return [x for x in values if x[0] != "_"]
211
212
213class ParsedCall(object):
214
215    def __init__(self, name, kwargs):
216        self.__dict__.update(kwargs)
217        self._name = name
218
219    def __repr__(self):
220        d = self.__dict__.copy()
221        del d["_name"]
222        return "<ParsedCall %r(**%r)>" % (self._name, d)
223
224
225class HookRecorder(object):
226    """Record all hooks called in a plugin manager.
227
228    This wraps all the hook calls in the plugin manager, recording each call
229    before propagating the normal calls.
230
231    """
232
233    def __init__(self, pluginmanager):
234        self._pluginmanager = pluginmanager
235        self.calls = []
236
237        def before(hook_name, hook_impls, kwargs):
238            self.calls.append(ParsedCall(hook_name, kwargs))
239
240        def after(outcome, hook_name, hook_impls, kwargs):
241            pass
242
243        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
244
245    def finish_recording(self):
246        self._undo_wrapping()
247
248    def getcalls(self, names):
249        if isinstance(names, str):
250            names = names.split()
251        return [call for call in self.calls if call._name in names]
252
253    def assert_contains(self, entries):
254        __tracebackhide__ = True
255        i = 0
256        entries = list(entries)
257        backlocals = sys._getframe(1).f_locals
258        while entries:
259            name, check = entries.pop(0)
260            for ind, call in enumerate(self.calls[i:]):
261                if call._name == name:
262                    print("NAMEMATCH", name, call)
263                    if eval(check, backlocals, call.__dict__):
264                        print("CHECKERMATCH", repr(check), "->", call)
265                    else:
266                        print("NOCHECKERMATCH", repr(check), "-", call)
267                        continue
268                    i += ind + 1
269                    break
270                print("NONAMEMATCH", name, "with", call)
271            else:
272                pytest.fail("could not find %r check %r" % (name, check))
273
274    def popcall(self, name):
275        __tracebackhide__ = True
276        for i, call in enumerate(self.calls):
277            if call._name == name:
278                del self.calls[i]
279                return call
280        lines = ["could not find call %r, in:" % (name,)]
281        lines.extend(["  %s" % str(x) for x in self.calls])
282        pytest.fail("\n".join(lines))
283
284    def getcall(self, name):
285        values = self.getcalls(name)
286        assert len(values) == 1, (name, values)
287        return values[0]
288
289    # functionality for test reports
290
291    def getreports(self, names="pytest_runtest_logreport pytest_collectreport"):
292        return [x.report for x in self.getcalls(names)]
293
294    def matchreport(
295        self,
296        inamepart="",
297        names="pytest_runtest_logreport pytest_collectreport",
298        when=None,
299    ):
300        """return a testreport whose dotted import path matches"""
301        values = []
302        for rep in self.getreports(names=names):
303            try:
304                if not when and rep.when != "call" and rep.passed:
305                    # setup/teardown passing reports - let's ignore those
306                    continue
307            except AttributeError:
308                pass
309            if when and getattr(rep, "when", None) != when:
310                continue
311            if not inamepart or inamepart in rep.nodeid.split("::"):
312                values.append(rep)
313        if not values:
314            raise ValueError(
315                "could not find test report matching %r: "
316                "no test reports at all!" % (inamepart,)
317            )
318        if len(values) > 1:
319            raise ValueError(
320                "found 2 or more testreports matching %r: %s" % (inamepart, values)
321            )
322        return values[0]
323
324    def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"):
325        return [rep for rep in self.getreports(names) if rep.failed]
326
327    def getfailedcollections(self):
328        return self.getfailures("pytest_collectreport")
329
330    def listoutcomes(self):
331        passed = []
332        skipped = []
333        failed = []
334        for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
335            if rep.passed:
336                if getattr(rep, "when", None) == "call":
337                    passed.append(rep)
338            elif rep.skipped:
339                skipped.append(rep)
340            elif rep.failed:
341                failed.append(rep)
342        return passed, skipped, failed
343
344    def countoutcomes(self):
345        return [len(x) for x in self.listoutcomes()]
346
347    def assertoutcome(self, passed=0, skipped=0, failed=0):
348        realpassed, realskipped, realfailed = self.listoutcomes()
349        assert passed == len(realpassed)
350        assert skipped == len(realskipped)
351        assert failed == len(realfailed)
352
353    def clear(self):
354        self.calls[:] = []
355
356
357@pytest.fixture
358def linecomp(request):
359    return LineComp()
360
361
362@pytest.fixture(name="LineMatcher")
363def LineMatcher_fixture(request):
364    return LineMatcher
365
366
367@pytest.fixture
368def testdir(request, tmpdir_factory):
369    return Testdir(request, tmpdir_factory)
370
371
372rex_outcome = re.compile(r"(\d+) ([\w-]+)")
373
374
375class RunResult(object):
376    """The result of running a command.
377
378    Attributes:
379
380    :ret: the return value
381    :outlines: list of lines captured from stdout
382    :errlines: list of lines captures from stderr
383    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
384       reconstruct stdout or the commonly used ``stdout.fnmatch_lines()``
385       method
386    :stderr: :py:class:`LineMatcher` of stderr
387    :duration: duration in seconds
388
389    """
390
391    def __init__(self, ret, outlines, errlines, duration):
392        self.ret = ret
393        self.outlines = outlines
394        self.errlines = errlines
395        self.stdout = LineMatcher(outlines)
396        self.stderr = LineMatcher(errlines)
397        self.duration = duration
398
399    def parseoutcomes(self):
400        """Return a dictionary of outcomestring->num from parsing the terminal
401        output that the test process produced.
402
403        """
404        for line in reversed(self.outlines):
405            if "seconds" in line:
406                outcomes = rex_outcome.findall(line)
407                if outcomes:
408                    d = {}
409                    for num, cat in outcomes:
410                        d[cat] = int(num)
411                    return d
412        raise ValueError("Pytest terminal report not found")
413
414    def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0):
415        """Assert that the specified outcomes appear with the respective
416        numbers (0 means it didn't occur) in the text output from a test run.
417
418        """
419        d = self.parseoutcomes()
420        obtained = {
421            "passed": d.get("passed", 0),
422            "skipped": d.get("skipped", 0),
423            "failed": d.get("failed", 0),
424            "error": d.get("error", 0),
425        }
426        assert (
427            obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error)
428        )
429
430
431class CwdSnapshot(object):
432
433    def __init__(self):
434        self.__saved = os.getcwd()
435
436    def restore(self):
437        os.chdir(self.__saved)
438
439
440class SysModulesSnapshot(object):
441
442    def __init__(self, preserve=None):
443        self.__preserve = preserve
444        self.__saved = dict(sys.modules)
445
446    def restore(self):
447        if self.__preserve:
448            self.__saved.update(
449                (k, m) for k, m in sys.modules.items() if self.__preserve(k)
450            )
451        sys.modules.clear()
452        sys.modules.update(self.__saved)
453
454
455class SysPathsSnapshot(object):
456
457    def __init__(self):
458        self.__saved = list(sys.path), list(sys.meta_path)
459
460    def restore(self):
461        sys.path[:], sys.meta_path[:] = self.__saved
462
463
464class Testdir(object):
465    """Temporary test directory with tools to test/run pytest itself.
466
467    This is based on the ``tmpdir`` fixture but provides a number of methods
468    which aid with testing pytest itself.  Unless :py:meth:`chdir` is used all
469    methods will use :py:attr:`tmpdir` as their current working directory.
470
471    Attributes:
472
473    :tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
474
475    :plugins: A list of plugins to use with :py:meth:`parseconfig` and
476       :py:meth:`runpytest`.  Initially this is an empty list but plugins can
477       be added to the list.  The type of items to add to the list depends on
478       the method using them so refer to them for details.
479
480    """
481
482    def __init__(self, request, tmpdir_factory):
483        self.request = request
484        self._mod_collections = WeakKeyDictionary()
485        name = request.function.__name__
486        self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
487        self.plugins = []
488        self._cwd_snapshot = CwdSnapshot()
489        self._sys_path_snapshot = SysPathsSnapshot()
490        self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
491        self.chdir()
492        self.request.addfinalizer(self.finalize)
493        method = self.request.config.getoption("--runpytest")
494        if method == "inprocess":
495            self._runpytest_method = self.runpytest_inprocess
496        elif method == "subprocess":
497            self._runpytest_method = self.runpytest_subprocess
498
499    def __repr__(self):
500        return "<Testdir %r>" % (self.tmpdir,)
501
502    def finalize(self):
503        """Clean up global state artifacts.
504
505        Some methods modify the global interpreter state and this tries to
506        clean this up.  It does not remove the temporary directory however so
507        it can be looked at after the test run has finished.
508
509        """
510        self._sys_modules_snapshot.restore()
511        self._sys_path_snapshot.restore()
512        self._cwd_snapshot.restore()
513
514    def __take_sys_modules_snapshot(self):
515        # some zope modules used by twisted-related tests keep internal state
516        # and can't be deleted; we had some trouble in the past with
517        # `zope.interface` for example
518        def preserve_module(name):
519            return name.startswith("zope")
520
521        return SysModulesSnapshot(preserve=preserve_module)
522
523    def make_hook_recorder(self, pluginmanager):
524        """Create a new :py:class:`HookRecorder` for a PluginManager."""
525        assert not hasattr(pluginmanager, "reprec")
526        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
527        self.request.addfinalizer(reprec.finish_recording)
528        return reprec
529
530    def chdir(self):
531        """Cd into the temporary directory.
532
533        This is done automatically upon instantiation.
534
535        """
536        self.tmpdir.chdir()
537
538    def _makefile(self, ext, args, kwargs, encoding="utf-8"):
539        items = list(kwargs.items())
540
541        def to_text(s):
542            return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
543
544        if args:
545            source = u"\n".join(to_text(x) for x in args)
546            basename = self.request.function.__name__
547            items.insert(0, (basename, source))
548
549        ret = None
550        for basename, value in items:
551            p = self.tmpdir.join(basename).new(ext=ext)
552            p.dirpath().ensure_dir()
553            source = Source(value)
554            source = u"\n".join(to_text(line) for line in source.lines)
555            p.write(source.strip().encode(encoding), "wb")
556            if ret is None:
557                ret = p
558        return ret
559
560    def makefile(self, ext, *args, **kwargs):
561        """Create a new file in the testdir.
562
563        ext: The extension the file should use, including the dot, e.g. `.py`.
564
565        args: All args will be treated as strings and joined using newlines.
566           The result will be written as contents to the file.  The name of the
567           file will be based on the test function requesting this fixture.
568           E.g. "testdir.makefile('.txt', 'line1', 'line2')"
569
570        kwargs: Each keyword is the name of a file, while the value of it will
571           be written as contents of the file.
572           E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
573
574        """
575        return self._makefile(ext, args, kwargs)
576
577    def makeconftest(self, source):
578        """Write a contest.py file with 'source' as contents."""
579        return self.makepyfile(conftest=source)
580
581    def makeini(self, source):
582        """Write a tox.ini file with 'source' as contents."""
583        return self.makefile(".ini", tox=source)
584
585    def getinicfg(self, source):
586        """Return the pytest section from the tox.ini config file."""
587        p = self.makeini(source)
588        return py.iniconfig.IniConfig(p)["pytest"]
589
590    def makepyfile(self, *args, **kwargs):
591        """Shortcut for .makefile() with a .py extension."""
592        return self._makefile(".py", args, kwargs)
593
594    def maketxtfile(self, *args, **kwargs):
595        """Shortcut for .makefile() with a .txt extension."""
596        return self._makefile(".txt", args, kwargs)
597
598    def syspathinsert(self, path=None):
599        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
600
601        This is undone automatically when this object dies at the end of each
602        test.
603
604        """
605        if path is None:
606            path = self.tmpdir
607        sys.path.insert(0, str(path))
608        # a call to syspathinsert() usually means that the caller wants to
609        # import some dynamically created files, thus with python3 we
610        # invalidate its import caches
611        self._possibly_invalidate_import_caches()
612
613    def _possibly_invalidate_import_caches(self):
614        # invalidate caches if we can (py33 and above)
615        try:
616            import importlib
617        except ImportError:
618            pass
619        else:
620            if hasattr(importlib, "invalidate_caches"):
621                importlib.invalidate_caches()
622
623    def mkdir(self, name):
624        """Create a new (sub)directory."""
625        return self.tmpdir.mkdir(name)
626
627    def mkpydir(self, name):
628        """Create a new python package.
629
630        This creates a (sub)directory with an empty ``__init__.py`` file so it
631        gets recognised as a python package.
632
633        """
634        p = self.mkdir(name)
635        p.ensure("__init__.py")
636        return p
637
638    Session = Session
639
640    def getnode(self, config, arg):
641        """Return the collection node of a file.
642
643        :param config: :py:class:`_pytest.config.Config` instance, see
644           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the
645           configuration
646
647        :param arg: a :py:class:`py.path.local` instance of the file
648
649        """
650        session = Session(config)
651        assert "::" not in str(arg)
652        p = py.path.local(arg)
653        config.hook.pytest_sessionstart(session=session)
654        res = session.perform_collect([str(p)], genitems=False)[0]
655        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
656        return res
657
658    def getpathnode(self, path):
659        """Return the collection node of a file.
660
661        This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to
662        create the (configured) pytest Config instance.
663
664        :param path: a :py:class:`py.path.local` instance of the file
665
666        """
667        config = self.parseconfigure(path)
668        session = Session(config)
669        x = session.fspath.bestrelpath(path)
670        config.hook.pytest_sessionstart(session=session)
671        res = session.perform_collect([x], genitems=False)[0]
672        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
673        return res
674
675    def genitems(self, colitems):
676        """Generate all test items from a collection node.
677
678        This recurses into the collection node and returns a list of all the
679        test items contained within.
680
681        """
682        session = colitems[0].session
683        result = []
684        for colitem in colitems:
685            result.extend(session.genitems(colitem))
686        return result
687
688    def runitem(self, source):
689        """Run the "test_func" Item.
690
691        The calling test instance (class containing the test method) must
692        provide a ``.getrunner()`` method which should return a runner which
693        can run the test protocol for a single item, e.g.
694        :py:func:`_pytest.runner.runtestprotocol`.
695
696        """
697        # used from runner functional tests
698        item = self.getitem(source)
699        # the test class where we are called from wants to provide the runner
700        testclassinstance = self.request.instance
701        runner = testclassinstance.getrunner()
702        return runner(item)
703
704    def inline_runsource(self, source, *cmdlineargs):
705        """Run a test module in process using ``pytest.main()``.
706
707        This run writes "source" into a temporary file and runs
708        ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance
709        for the result.
710
711        :param source: the source code of the test module
712
713        :param cmdlineargs: any extra command line arguments to use
714
715        :return: :py:class:`HookRecorder` instance of the result
716
717        """
718        p = self.makepyfile(source)
719        values = list(cmdlineargs) + [p]
720        return self.inline_run(*values)
721
722    def inline_genitems(self, *args):
723        """Run ``pytest.main(['--collectonly'])`` in-process.
724
725        Runs the :py:func:`pytest.main` function to run all of pytest inside
726        the test process itself like :py:meth:`inline_run`, but returns a
727        tuple of the collected items and a :py:class:`HookRecorder` instance.
728
729        """
730        rec = self.inline_run("--collect-only", *args)
731        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
732        return items, rec
733
734    def inline_run(self, *args, **kwargs):
735        """Run ``pytest.main()`` in-process, returning a HookRecorder.
736
737        Runs the :py:func:`pytest.main` function to run all of pytest inside
738        the test process itself.  This means it can return a
739        :py:class:`HookRecorder` instance which gives more detailed results
740        from that run than can be done by matching stdout/stderr from
741        :py:meth:`runpytest`.
742
743        :param args: command line arguments to pass to :py:func:`pytest.main`
744
745        :param plugin: (keyword-only) extra plugin instances the
746           ``pytest.main()`` instance should use
747
748        :return: a :py:class:`HookRecorder` instance
749
750        """
751        finalizers = []
752        try:
753            # When running pytest inline any plugins active in the main test
754            # process are already imported.  So this disables the warning which
755            # will trigger to say they can no longer be rewritten, which is
756            # fine as they have already been rewritten.
757            orig_warn = AssertionRewritingHook._warn_already_imported
758
759            def revert_warn_already_imported():
760                AssertionRewritingHook._warn_already_imported = orig_warn
761
762            finalizers.append(revert_warn_already_imported)
763            AssertionRewritingHook._warn_already_imported = lambda *a: None
764
765            # Any sys.module or sys.path changes done while running pytest
766            # inline should be reverted after the test run completes to avoid
767            # clashing with later inline tests run within the same pytest test,
768            # e.g. just because they use matching test module names.
769            finalizers.append(self.__take_sys_modules_snapshot().restore)
770            finalizers.append(SysPathsSnapshot().restore)
771
772            # Important note:
773            # - our tests should not leave any other references/registrations
774            #   laying around other than possibly loaded test modules
775            #   referenced from sys.modules, as nothing will clean those up
776            #   automatically
777
778            rec = []
779
780            class Collect(object):
781
782                def pytest_configure(x, config):
783                    rec.append(self.make_hook_recorder(config.pluginmanager))
784
785            plugins = kwargs.get("plugins") or []
786            plugins.append(Collect())
787            ret = pytest.main(list(args), plugins=plugins)
788            if len(rec) == 1:
789                reprec = rec.pop()
790            else:
791
792                class reprec(object):
793                    pass
794
795            reprec.ret = ret
796
797            # typically we reraise keyboard interrupts from the child run
798            # because it's our user requesting interruption of the testing
799            if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
800                calls = reprec.getcalls("pytest_keyboard_interrupt")
801                if calls and calls[-1].excinfo.type == KeyboardInterrupt:
802                    raise KeyboardInterrupt()
803            return reprec
804        finally:
805            for finalizer in finalizers:
806                finalizer()
807
808    def runpytest_inprocess(self, *args, **kwargs):
809        """Return result of running pytest in-process, providing a similar
810        interface to what self.runpytest() provides.
811
812        """
813        if kwargs.get("syspathinsert"):
814            self.syspathinsert()
815        now = time.time()
816        capture = MultiCapture(Capture=SysCapture)
817        capture.start_capturing()
818        try:
819            try:
820                reprec = self.inline_run(*args, **kwargs)
821            except SystemExit as e:
822
823                class reprec(object):
824                    ret = e.args[0]
825
826            except Exception:
827                traceback.print_exc()
828
829                class reprec(object):
830                    ret = 3
831
832        finally:
833            out, err = capture.readouterr()
834            capture.stop_capturing()
835            sys.stdout.write(out)
836            sys.stderr.write(err)
837
838        res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now)
839        res.reprec = reprec
840        return res
841
842    def runpytest(self, *args, **kwargs):
843        """Run pytest inline or in a subprocess, depending on the command line
844        option "--runpytest" and return a :py:class:`RunResult`.
845
846        """
847        args = self._ensure_basetemp(args)
848        return self._runpytest_method(*args, **kwargs)
849
850    def _ensure_basetemp(self, args):
851        args = [str(x) for x in args]
852        for x in args:
853            if str(x).startswith("--basetemp"):
854                # print("basedtemp exists: %s" %(args,))
855                break
856        else:
857            args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
858            # print("added basetemp: %s" %(args,))
859        return args
860
861    def parseconfig(self, *args):
862        """Return a new pytest Config instance from given commandline args.
863
864        This invokes the pytest bootstrapping code in _pytest.config to create
865        a new :py:class:`_pytest.core.PluginManager` and call the
866        pytest_cmdline_parse hook to create a new
867        :py:class:`_pytest.config.Config` instance.
868
869        If :py:attr:`plugins` has been populated they should be plugin modules
870        to be registered with the PluginManager.
871
872        """
873        args = self._ensure_basetemp(args)
874
875        import _pytest.config
876
877        config = _pytest.config._prepareconfig(args, self.plugins)
878        # we don't know what the test will do with this half-setup config
879        # object and thus we make sure it gets unconfigured properly in any
880        # case (otherwise capturing could still be active, for example)
881        self.request.addfinalizer(config._ensure_unconfigure)
882        return config
883
884    def parseconfigure(self, *args):
885        """Return a new pytest configured Config instance.
886
887        This returns a new :py:class:`_pytest.config.Config` instance like
888        :py:meth:`parseconfig`, but also calls the pytest_configure hook.
889
890        """
891        config = self.parseconfig(*args)
892        config._do_configure()
893        self.request.addfinalizer(config._ensure_unconfigure)
894        return config
895
896    def getitem(self, source, funcname="test_func"):
897        """Return the test item for a test function.
898
899        This writes the source to a python file and runs pytest's collection on
900        the resulting module, returning the test item for the requested
901        function name.
902
903        :param source: the module source
904
905        :param funcname: the name of the test function for which to return a
906            test item
907
908        """
909        items = self.getitems(source)
910        for item in items:
911            if item.name == funcname:
912                return item
913        assert 0, (
914            "%r item not found in module:\n%s\nitems: %s" % (funcname, source, items)
915        )
916
917    def getitems(self, source):
918        """Return all test items collected from the module.
919
920        This writes the source to a python file and runs pytest's collection on
921        the resulting module, returning all test items contained within.
922
923        """
924        modcol = self.getmodulecol(source)
925        return self.genitems([modcol])
926
927    def getmodulecol(self, source, configargs=(), withinit=False):
928        """Return the module collection node for ``source``.
929
930        This writes ``source`` to a file using :py:meth:`makepyfile` and then
931        runs the pytest collection on it, returning the collection node for the
932        test module.
933
934        :param source: the source code of the module to collect
935
936        :param configargs: any extra arguments to pass to
937            :py:meth:`parseconfigure`
938
939        :param withinit: whether to also write an ``__init__.py`` file to the
940            same directory to ensure it is a package
941
942        """
943        kw = {self.request.function.__name__: Source(source).strip()}
944        path = self.makepyfile(**kw)
945        if withinit:
946            self.makepyfile(__init__="#")
947        self.config = config = self.parseconfigure(path, *configargs)
948        node = self.getnode(config, path)
949
950        return node
951
952    def collect_by_name(self, modcol, name):
953        """Return the collection node for name from the module collection.
954
955        This will search a module collection node for a collection node
956        matching the given name.
957
958        :param modcol: a module collection node; see :py:meth:`getmodulecol`
959
960        :param name: the name of the node to return
961
962        """
963        if modcol not in self._mod_collections:
964            self._mod_collections[modcol] = list(modcol.collect())
965        for colitem in self._mod_collections[modcol]:
966            if colitem.name == name:
967                return colitem
968
969    def popen(self, cmdargs, stdout, stderr, **kw):
970        """Invoke subprocess.Popen.
971
972        This calls subprocess.Popen making sure the current working directory
973        is in the PYTHONPATH.
974
975        You probably want to use :py:meth:`run` instead.
976
977        """
978        env = os.environ.copy()
979        env["PYTHONPATH"] = os.pathsep.join(
980            filter(None, [str(os.getcwd()), env.get("PYTHONPATH", "")])
981        )
982        kw["env"] = env
983
984        popen = subprocess.Popen(
985            cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw
986        )
987        popen.stdin.close()
988
989        return popen
990
991    def run(self, *cmdargs):
992        """Run a command with arguments.
993
994        Run a process using subprocess.Popen saving the stdout and stderr.
995
996        Returns a :py:class:`RunResult`.
997
998        """
999        return self._run(*cmdargs)
1000
1001    def _run(self, *cmdargs):
1002        cmdargs = [str(x) for x in cmdargs]
1003        p1 = self.tmpdir.join("stdout")
1004        p2 = self.tmpdir.join("stderr")
1005        print("running:", " ".join(cmdargs))
1006        print("     in:", str(py.path.local()))
1007        f1 = codecs.open(str(p1), "w", encoding="utf8")
1008        f2 = codecs.open(str(p2), "w", encoding="utf8")
1009        try:
1010            now = time.time()
1011            popen = self.popen(
1012                cmdargs, stdout=f1, stderr=f2, close_fds=(sys.platform != "win32")
1013            )
1014            ret = popen.wait()
1015        finally:
1016            f1.close()
1017            f2.close()
1018        f1 = codecs.open(str(p1), "r", encoding="utf8")
1019        f2 = codecs.open(str(p2), "r", encoding="utf8")
1020        try:
1021            out = f1.read().splitlines()
1022            err = f2.read().splitlines()
1023        finally:
1024            f1.close()
1025            f2.close()
1026        self._dump_lines(out, sys.stdout)
1027        self._dump_lines(err, sys.stderr)
1028        return RunResult(ret, out, err, time.time() - now)
1029
1030    def _dump_lines(self, lines, fp):
1031        try:
1032            for line in lines:
1033                print(line, file=fp)
1034        except UnicodeEncodeError:
1035            print("couldn't print to %s because of encoding" % (fp,))
1036
1037    def _getpytestargs(self):
1038        # we cannot use `(sys.executable, script)` because on Windows the
1039        # script is e.g. `pytest.exe`
1040        return (sys.executable, PYTEST_FULLPATH)  # noqa
1041
1042    def runpython(self, script):
1043        """Run a python script using sys.executable as interpreter.
1044
1045        Returns a :py:class:`RunResult`.
1046
1047        """
1048        return self.run(sys.executable, script)
1049
1050    def runpython_c(self, command):
1051        """Run python -c "command", return a :py:class:`RunResult`."""
1052        return self.run(sys.executable, "-c", command)
1053
1054    def runpytest_subprocess(self, *args, **kwargs):
1055        """Run pytest as a subprocess with given arguments.
1056
1057        Any plugins added to the :py:attr:`plugins` list will added using the
1058        ``-p`` command line option.  Additionally ``--basetemp`` is used put
1059        any temporary files and directories in a numbered directory prefixed
1060        with "runpytest-" so they do not conflict with the normal numbered
1061        pytest location for temporary files and directories.
1062
1063        Returns a :py:class:`RunResult`.
1064
1065        """
1066        p = py.path.local.make_numbered_dir(
1067            prefix="runpytest-", keep=None, rootdir=self.tmpdir
1068        )
1069        args = ("--basetemp=%s" % p,) + args
1070        plugins = [x for x in self.plugins if isinstance(x, str)]
1071        if plugins:
1072            args = ("-p", plugins[0]) + args
1073        args = self._getpytestargs() + args
1074        return self.run(*args)
1075
1076    def spawn_pytest(self, string, expect_timeout=10.0):
1077        """Run pytest using pexpect.
1078
1079        This makes sure to use the right pytest and sets up the temporary
1080        directory locations.
1081
1082        The pexpect child is returned.
1083
1084        """
1085        basetemp = self.tmpdir.mkdir("temp-pexpect")
1086        invoke = " ".join(map(str, self._getpytestargs()))
1087        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
1088        return self.spawn(cmd, expect_timeout=expect_timeout)
1089
1090    def spawn(self, cmd, expect_timeout=10.0):
1091        """Run a command using pexpect.
1092
1093        The pexpect child is returned.
1094
1095        """
1096        pexpect = pytest.importorskip("pexpect", "3.0")
1097        if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
1098            pytest.skip("pypy-64 bit not supported")
1099        if sys.platform.startswith("freebsd"):
1100            pytest.xfail("pexpect does not work reliably on freebsd")
1101        logfile = self.tmpdir.join("spawn.out").open("wb")
1102        child = pexpect.spawn(cmd, logfile=logfile)
1103        self.request.addfinalizer(logfile.close)
1104        child.timeout = expect_timeout
1105        return child
1106
1107
1108def getdecoded(out):
1109    try:
1110        return out.decode("utf-8")
1111    except UnicodeDecodeError:
1112        return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
1113            py.io.saferepr(out),
1114        )
1115
1116
1117class LineComp(object):
1118
1119    def __init__(self):
1120        self.stringio = py.io.TextIO()
1121
1122    def assert_contains_lines(self, lines2):
1123        """Assert that lines2 are contained (linearly) in lines1.
1124
1125        Return a list of extralines found.
1126
1127        """
1128        __tracebackhide__ = True
1129        val = self.stringio.getvalue()
1130        self.stringio.truncate(0)
1131        self.stringio.seek(0)
1132        lines1 = val.split("\n")
1133        return LineMatcher(lines1).fnmatch_lines(lines2)
1134
1135
1136class LineMatcher(object):
1137    """Flexible matching of text.
1138
1139    This is a convenience class to test large texts like the output of
1140    commands.
1141
1142    The constructor takes a list of lines without their trailing newlines, i.e.
1143    ``text.splitlines()``.
1144
1145    """
1146
1147    def __init__(self, lines):
1148        self.lines = lines
1149        self._log_output = []
1150
1151    def str(self):
1152        """Return the entire original text."""
1153        return "\n".join(self.lines)
1154
1155    def _getlines(self, lines2):
1156        if isinstance(lines2, str):
1157            lines2 = Source(lines2)
1158        if isinstance(lines2, Source):
1159            lines2 = lines2.strip().lines
1160        return lines2
1161
1162    def fnmatch_lines_random(self, lines2):
1163        """Check lines exist in the output using in any order.
1164
1165        Lines are checked using ``fnmatch.fnmatch``. The argument is a list of
1166        lines which have to occur in the output, in any order.
1167
1168        """
1169        self._match_lines_random(lines2, fnmatch)
1170
1171    def re_match_lines_random(self, lines2):
1172        """Check lines exist in the output using ``re.match``, in any order.
1173
1174        The argument is a list of lines which have to occur in the output, in
1175        any order.
1176
1177        """
1178        self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
1179
1180    def _match_lines_random(self, lines2, match_func):
1181        """Check lines exist in the output.
1182
1183        The argument is a list of lines which have to occur in the output, in
1184        any order.  Each line can contain glob whildcards.
1185
1186        """
1187        lines2 = self._getlines(lines2)
1188        for line in lines2:
1189            for x in self.lines:
1190                if line == x or match_func(x, line):
1191                    self._log("matched: ", repr(line))
1192                    break
1193            else:
1194                self._log("line %r not found in output" % line)
1195                raise ValueError(self._log_text)
1196
1197    def get_lines_after(self, fnline):
1198        """Return all lines following the given line in the text.
1199
1200        The given line can contain glob wildcards.
1201
1202        """
1203        for i, line in enumerate(self.lines):
1204            if fnline == line or fnmatch(line, fnline):
1205                return self.lines[i + 1:]
1206        raise ValueError("line %r not found in output" % fnline)
1207
1208    def _log(self, *args):
1209        self._log_output.append(" ".join((str(x) for x in args)))
1210
1211    @property
1212    def _log_text(self):
1213        return "\n".join(self._log_output)
1214
1215    def fnmatch_lines(self, lines2):
1216        """Search captured text for matching lines using ``fnmatch.fnmatch``.
1217
1218        The argument is a list of lines which have to match and can use glob
1219        wildcards.  If they do not match a pytest.fail() is called.  The
1220        matches and non-matches are also printed on stdout.
1221
1222        """
1223        self._match_lines(lines2, fnmatch, "fnmatch")
1224
1225    def re_match_lines(self, lines2):
1226        """Search captured text for matching lines using ``re.match``.
1227
1228        The argument is a list of lines which have to match using ``re.match``.
1229        If they do not match a pytest.fail() is called.
1230
1231        The matches and non-matches are also printed on stdout.
1232
1233        """
1234        self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match")
1235
1236    def _match_lines(self, lines2, match_func, match_nickname):
1237        """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
1238
1239        :param list[str] lines2: list of string patterns to match. The actual
1240            format depends on ``match_func``
1241        :param match_func: a callable ``match_func(line, pattern)`` where line
1242            is the captured line from stdout/stderr and pattern is the matching
1243            pattern
1244        :param str match_nickname: the nickname for the match function that
1245            will be logged to stdout when a match occurs
1246
1247        """
1248        lines2 = self._getlines(lines2)
1249        lines1 = self.lines[:]
1250        nextline = None
1251        extralines = []
1252        __tracebackhide__ = True
1253        for line in lines2:
1254            nomatchprinted = False
1255            while lines1:
1256                nextline = lines1.pop(0)
1257                if line == nextline:
1258                    self._log("exact match:", repr(line))
1259                    break
1260                elif match_func(nextline, line):
1261                    self._log("%s:" % match_nickname, repr(line))
1262                    self._log("   with:", repr(nextline))
1263                    break
1264                else:
1265                    if not nomatchprinted:
1266                        self._log("nomatch:", repr(line))
1267                        nomatchprinted = True
1268                    self._log("    and:", repr(nextline))
1269                extralines.append(nextline)
1270            else:
1271                self._log("remains unmatched: %r" % (line,))
1272                pytest.fail(self._log_text)
1273