1"""(Disabled by default) support for testing pytest and pytest plugins."""
2import collections.abc
3import gc
4import importlib
5import os
6import platform
7import re
8import subprocess
9import sys
10import traceback
11from fnmatch import fnmatch
12from io import StringIO
13from typing import Callable
14from typing import Dict
15from typing import Generator
16from typing import Iterable
17from typing import List
18from typing import Optional
19from typing import Sequence
20from typing import Tuple
21from typing import Union
22from weakref import WeakKeyDictionary
23
24import py
25from iniconfig import IniConfig
26
27import pytest
28from _pytest import timing
29from _pytest._code import Source
30from _pytest.capture import _get_multicapture
31from _pytest.compat import final
32from _pytest.compat import overload
33from _pytest.compat import TYPE_CHECKING
34from _pytest.config import _PluggyPlugin
35from _pytest.config import Config
36from _pytest.config import ExitCode
37from _pytest.config import PytestPluginManager
38from _pytest.config.argparsing import Parser
39from _pytest.fixtures import FixtureRequest
40from _pytest.main import Session
41from _pytest.monkeypatch import MonkeyPatch
42from _pytest.nodes import Collector
43from _pytest.nodes import Item
44from _pytest.pathlib import make_numbered_dir
45from _pytest.pathlib import Path
46from _pytest.python import Module
47from _pytest.reports import CollectReport
48from _pytest.reports import TestReport
49from _pytest.tmpdir import TempdirFactory
50
51if TYPE_CHECKING:
52    from typing import Type
53    from typing_extensions import Literal
54
55    import pexpect
56
57
58IGNORE_PAM = [  # filenames added when obtaining details about the current user
59    "/var/lib/sss/mc/passwd"
60]
61
62
63def pytest_addoption(parser: Parser) -> None:
64    parser.addoption(
65        "--lsof",
66        action="store_true",
67        dest="lsof",
68        default=False,
69        help="run FD checks if lsof is available",
70    )
71
72    parser.addoption(
73        "--runpytest",
74        default="inprocess",
75        dest="runpytest",
76        choices=("inprocess", "subprocess"),
77        help=(
78            "run pytest sub runs in tests using an 'inprocess' "
79            "or 'subprocess' (python -m main) method"
80        ),
81    )
82
83    parser.addini(
84        "pytester_example_dir", help="directory to take the pytester example files from"
85    )
86
87
88def pytest_configure(config: Config) -> None:
89    if config.getvalue("lsof"):
90        checker = LsofFdLeakChecker()
91        if checker.matching_platform():
92            config.pluginmanager.register(checker)
93
94    config.addinivalue_line(
95        "markers",
96        "pytester_example_path(*path_segments): join the given path "
97        "segments to `pytester_example_dir` for this test.",
98    )
99
100
101class LsofFdLeakChecker:
102    def get_open_files(self) -> List[Tuple[str, str]]:
103        out = subprocess.run(
104            ("lsof", "-Ffn0", "-p", str(os.getpid())),
105            stdout=subprocess.PIPE,
106            stderr=subprocess.DEVNULL,
107            check=True,
108            universal_newlines=True,
109        ).stdout
110
111        def isopen(line: str) -> bool:
112            return line.startswith("f") and (
113                "deleted" not in line
114                and "mem" not in line
115                and "txt" not in line
116                and "cwd" not in line
117            )
118
119        open_files = []
120
121        for line in out.split("\n"):
122            if isopen(line):
123                fields = line.split("\0")
124                fd = fields[0][1:]
125                filename = fields[1][1:]
126                if filename in IGNORE_PAM:
127                    continue
128                if filename.startswith("/"):
129                    open_files.append((fd, filename))
130
131        return open_files
132
133    def matching_platform(self) -> bool:
134        try:
135            subprocess.run(("lsof", "-v"), check=True)
136        except (OSError, subprocess.CalledProcessError):
137            return False
138        else:
139            return True
140
141    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
142    def pytest_runtest_protocol(self, item: Item) -> Generator[None, None, None]:
143        lines1 = self.get_open_files()
144        yield
145        if hasattr(sys, "pypy_version_info"):
146            gc.collect()
147        lines2 = self.get_open_files()
148
149        new_fds = {t[0] for t in lines2} - {t[0] for t in lines1}
150        leaked_files = [t for t in lines2 if t[0] in new_fds]
151        if leaked_files:
152            error = [
153                "***** %s FD leakage detected" % len(leaked_files),
154                *(str(f) for f in leaked_files),
155                "*** Before:",
156                *(str(f) for f in lines1),
157                "*** After:",
158                *(str(f) for f in lines2),
159                "***** %s FD leakage detected" % len(leaked_files),
160                "*** function %s:%s: %s " % item.location,
161                "See issue #2366",
162            ]
163            item.warn(pytest.PytestWarning("\n".join(error)))
164
165
166# used at least by pytest-xdist plugin
167
168
169@pytest.fixture
170def _pytest(request: FixtureRequest) -> "PytestArg":
171    """Return a helper which offers a gethookrecorder(hook) method which
172    returns a HookRecorder instance which helps to make assertions about called
173    hooks."""
174    return PytestArg(request)
175
176
177class PytestArg:
178    def __init__(self, request: FixtureRequest) -> None:
179        self.request = request
180
181    def gethookrecorder(self, hook) -> "HookRecorder":
182        hookrecorder = HookRecorder(hook._pm)
183        self.request.addfinalizer(hookrecorder.finish_recording)
184        return hookrecorder
185
186
187def get_public_names(values: Iterable[str]) -> List[str]:
188    """Only return names from iterator values without a leading underscore."""
189    return [x for x in values if x[0] != "_"]
190
191
192class ParsedCall:
193    def __init__(self, name: str, kwargs) -> None:
194        self.__dict__.update(kwargs)
195        self._name = name
196
197    def __repr__(self) -> str:
198        d = self.__dict__.copy()
199        del d["_name"]
200        return "<ParsedCall {!r}(**{!r})>".format(self._name, d)
201
202    if TYPE_CHECKING:
203        # The class has undetermined attributes, this tells mypy about it.
204        def __getattr__(self, key: str):
205            ...
206
207
208class HookRecorder:
209    """Record all hooks called in a plugin manager.
210
211    This wraps all the hook calls in the plugin manager, recording each call
212    before propagating the normal calls.
213    """
214
215    def __init__(self, pluginmanager: PytestPluginManager) -> None:
216        self._pluginmanager = pluginmanager
217        self.calls = []  # type: List[ParsedCall]
218        self.ret = None  # type: Optional[Union[int, ExitCode]]
219
220        def before(hook_name: str, hook_impls, kwargs) -> None:
221            self.calls.append(ParsedCall(hook_name, kwargs))
222
223        def after(outcome, hook_name: str, hook_impls, kwargs) -> None:
224            pass
225
226        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
227
228    def finish_recording(self) -> None:
229        self._undo_wrapping()
230
231    def getcalls(self, names: Union[str, Iterable[str]]) -> List[ParsedCall]:
232        if isinstance(names, str):
233            names = names.split()
234        return [call for call in self.calls if call._name in names]
235
236    def assert_contains(self, entries: Sequence[Tuple[str, str]]) -> None:
237        __tracebackhide__ = True
238        i = 0
239        entries = list(entries)
240        backlocals = sys._getframe(1).f_locals
241        while entries:
242            name, check = entries.pop(0)
243            for ind, call in enumerate(self.calls[i:]):
244                if call._name == name:
245                    print("NAMEMATCH", name, call)
246                    if eval(check, backlocals, call.__dict__):
247                        print("CHECKERMATCH", repr(check), "->", call)
248                    else:
249                        print("NOCHECKERMATCH", repr(check), "-", call)
250                        continue
251                    i += ind + 1
252                    break
253                print("NONAMEMATCH", name, "with", call)
254            else:
255                pytest.fail("could not find {!r} check {!r}".format(name, check))
256
257    def popcall(self, name: str) -> ParsedCall:
258        __tracebackhide__ = True
259        for i, call in enumerate(self.calls):
260            if call._name == name:
261                del self.calls[i]
262                return call
263        lines = ["could not find call {!r}, in:".format(name)]
264        lines.extend(["  %s" % x for x in self.calls])
265        pytest.fail("\n".join(lines))
266
267    def getcall(self, name: str) -> ParsedCall:
268        values = self.getcalls(name)
269        assert len(values) == 1, (name, values)
270        return values[0]
271
272    # functionality for test reports
273
274    @overload
275    def getreports(
276        self, names: "Literal['pytest_collectreport']",
277    ) -> Sequence[CollectReport]:
278        ...
279
280    @overload  # noqa: F811
281    def getreports(  # noqa: F811
282        self, names: "Literal['pytest_runtest_logreport']",
283    ) -> Sequence[TestReport]:
284        ...
285
286    @overload  # noqa: F811
287    def getreports(  # noqa: F811
288        self,
289        names: Union[str, Iterable[str]] = (
290            "pytest_collectreport",
291            "pytest_runtest_logreport",
292        ),
293    ) -> Sequence[Union[CollectReport, TestReport]]:
294        ...
295
296    def getreports(  # noqa: F811
297        self,
298        names: Union[str, Iterable[str]] = (
299            "pytest_collectreport",
300            "pytest_runtest_logreport",
301        ),
302    ) -> Sequence[Union[CollectReport, TestReport]]:
303        return [x.report for x in self.getcalls(names)]
304
305    def matchreport(
306        self,
307        inamepart: str = "",
308        names: Union[str, Iterable[str]] = (
309            "pytest_runtest_logreport",
310            "pytest_collectreport",
311        ),
312        when: Optional[str] = None,
313    ) -> Union[CollectReport, TestReport]:
314        """Return a testreport whose dotted import path matches."""
315        values = []
316        for rep in self.getreports(names=names):
317            if not when and rep.when != "call" and rep.passed:
318                # setup/teardown passing reports - let's ignore those
319                continue
320            if when and rep.when != when:
321                continue
322            if not inamepart or inamepart in rep.nodeid.split("::"):
323                values.append(rep)
324        if not values:
325            raise ValueError(
326                "could not find test report matching %r: "
327                "no test reports at all!" % (inamepart,)
328            )
329        if len(values) > 1:
330            raise ValueError(
331                "found 2 or more testreports matching {!r}: {}".format(
332                    inamepart, values
333                )
334            )
335        return values[0]
336
337    @overload
338    def getfailures(
339        self, names: "Literal['pytest_collectreport']",
340    ) -> Sequence[CollectReport]:
341        ...
342
343    @overload  # noqa: F811
344    def getfailures(  # noqa: F811
345        self, names: "Literal['pytest_runtest_logreport']",
346    ) -> Sequence[TestReport]:
347        ...
348
349    @overload  # noqa: F811
350    def getfailures(  # noqa: F811
351        self,
352        names: Union[str, Iterable[str]] = (
353            "pytest_collectreport",
354            "pytest_runtest_logreport",
355        ),
356    ) -> Sequence[Union[CollectReport, TestReport]]:
357        ...
358
359    def getfailures(  # noqa: F811
360        self,
361        names: Union[str, Iterable[str]] = (
362            "pytest_collectreport",
363            "pytest_runtest_logreport",
364        ),
365    ) -> Sequence[Union[CollectReport, TestReport]]:
366        return [rep for rep in self.getreports(names) if rep.failed]
367
368    def getfailedcollections(self) -> Sequence[CollectReport]:
369        return self.getfailures("pytest_collectreport")
370
371    def listoutcomes(
372        self,
373    ) -> Tuple[
374        Sequence[TestReport],
375        Sequence[Union[CollectReport, TestReport]],
376        Sequence[Union[CollectReport, TestReport]],
377    ]:
378        passed = []
379        skipped = []
380        failed = []
381        for rep in self.getreports(
382            ("pytest_collectreport", "pytest_runtest_logreport")
383        ):
384            if rep.passed:
385                if rep.when == "call":
386                    assert isinstance(rep, TestReport)
387                    passed.append(rep)
388            elif rep.skipped:
389                skipped.append(rep)
390            else:
391                assert rep.failed, "Unexpected outcome: {!r}".format(rep)
392                failed.append(rep)
393        return passed, skipped, failed
394
395    def countoutcomes(self) -> List[int]:
396        return [len(x) for x in self.listoutcomes()]
397
398    def assertoutcome(self, passed: int = 0, skipped: int = 0, failed: int = 0) -> None:
399        __tracebackhide__ = True
400
401        outcomes = self.listoutcomes()
402        realpassed, realskipped, realfailed = outcomes
403        obtained = {
404            "passed": len(realpassed),
405            "skipped": len(realskipped),
406            "failed": len(realfailed),
407        }
408        expected = {"passed": passed, "skipped": skipped, "failed": failed}
409        assert obtained == expected, outcomes
410
411    def clear(self) -> None:
412        self.calls[:] = []
413
414
415@pytest.fixture
416def linecomp() -> "LineComp":
417    """A :class: `LineComp` instance for checking that an input linearly
418    contains a sequence of strings."""
419    return LineComp()
420
421
422@pytest.fixture(name="LineMatcher")
423def LineMatcher_fixture(request: FixtureRequest) -> "Type[LineMatcher]":
424    """A reference to the :class: `LineMatcher`.
425
426    This is instantiable with a list of lines (without their trailing newlines).
427    This is useful for testing large texts, such as the output of commands.
428    """
429    return LineMatcher
430
431
432@pytest.fixture
433def testdir(request: FixtureRequest, tmpdir_factory: TempdirFactory) -> "Testdir":
434    """A :class: `TestDir` instance, that can be used to run and test pytest itself.
435
436    It is particularly useful for testing plugins. It is similar to the `tmpdir` fixture
437    but provides methods which aid in testing pytest itself.
438    """
439    return Testdir(request, tmpdir_factory)
440
441
442@pytest.fixture
443def _sys_snapshot() -> Generator[None, None, None]:
444    snappaths = SysPathsSnapshot()
445    snapmods = SysModulesSnapshot()
446    yield
447    snapmods.restore()
448    snappaths.restore()
449
450
451@pytest.fixture
452def _config_for_test() -> Generator[Config, None, None]:
453    from _pytest.config import get_config
454
455    config = get_config()
456    yield config
457    config._ensure_unconfigure()  # cleanup, e.g. capman closing tmpfiles.
458
459
460# Regex to match the session duration string in the summary: "74.34s".
461rex_session_duration = re.compile(r"\d+\.\d\ds")
462# Regex to match all the counts and phrases in the summary line: "34 passed, 111 skipped".
463rex_outcome = re.compile(r"(\d+) (\w+)")
464
465
466class RunResult:
467    """The result of running a command."""
468
469    def __init__(
470        self,
471        ret: Union[int, ExitCode],
472        outlines: List[str],
473        errlines: List[str],
474        duration: float,
475    ) -> None:
476        try:
477            self.ret = pytest.ExitCode(ret)  # type: Union[int, ExitCode]
478            """The return value."""
479        except ValueError:
480            self.ret = ret
481        self.outlines = outlines
482        """List of lines captured from stdout."""
483        self.errlines = errlines
484        """List of lines captured from stderr."""
485        self.stdout = LineMatcher(outlines)
486        """:class:`LineMatcher` of stdout.
487
488        Use e.g. :func:`stdout.str() <LineMatcher.str()>` to reconstruct stdout, or the commonly used
489        :func:`stdout.fnmatch_lines() <LineMatcher.fnmatch_lines()>` method.
490        """
491        self.stderr = LineMatcher(errlines)
492        """:class:`LineMatcher` of stderr."""
493        self.duration = duration
494        """Duration in seconds."""
495
496    def __repr__(self) -> str:
497        return (
498            "<RunResult ret=%s len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>"
499            % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration)
500        )
501
502    def parseoutcomes(self) -> Dict[str, int]:
503        """Return a dictionary of outcome noun -> count from parsing the terminal
504        output that the test process produced.
505
506        The returned nouns will always be in plural form::
507
508            ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ====
509
510        Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``.
511        """
512        return self.parse_summary_nouns(self.outlines)
513
514    @classmethod
515    def parse_summary_nouns(cls, lines) -> Dict[str, int]:
516        """Extract the nouns from a pytest terminal summary line.
517
518        It always returns the plural noun for consistency::
519
520            ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ====
521
522        Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``.
523        """
524        for line in reversed(lines):
525            if rex_session_duration.search(line):
526                outcomes = rex_outcome.findall(line)
527                ret = {noun: int(count) for (count, noun) in outcomes}
528                break
529        else:
530            raise ValueError("Pytest terminal summary report not found")
531
532        to_plural = {
533            "warning": "warnings",
534            "error": "errors",
535        }
536        return {to_plural.get(k, k): v for k, v in ret.items()}
537
538    def assert_outcomes(
539        self,
540        passed: int = 0,
541        skipped: int = 0,
542        failed: int = 0,
543        errors: int = 0,
544        xpassed: int = 0,
545        xfailed: int = 0,
546    ) -> None:
547        """Assert that the specified outcomes appear with the respective
548        numbers (0 means it didn't occur) in the text output from a test run."""
549        __tracebackhide__ = True
550
551        d = self.parseoutcomes()
552        obtained = {
553            "passed": d.get("passed", 0),
554            "skipped": d.get("skipped", 0),
555            "failed": d.get("failed", 0),
556            "errors": d.get("errors", 0),
557            "xpassed": d.get("xpassed", 0),
558            "xfailed": d.get("xfailed", 0),
559        }
560        expected = {
561            "passed": passed,
562            "skipped": skipped,
563            "failed": failed,
564            "errors": errors,
565            "xpassed": xpassed,
566            "xfailed": xfailed,
567        }
568        assert obtained == expected
569
570
571class CwdSnapshot:
572    def __init__(self) -> None:
573        self.__saved = os.getcwd()
574
575    def restore(self) -> None:
576        os.chdir(self.__saved)
577
578
579class SysModulesSnapshot:
580    def __init__(self, preserve: Optional[Callable[[str], bool]] = None) -> None:
581        self.__preserve = preserve
582        self.__saved = dict(sys.modules)
583
584    def restore(self) -> None:
585        if self.__preserve:
586            self.__saved.update(
587                (k, m) for k, m in sys.modules.items() if self.__preserve(k)
588            )
589        sys.modules.clear()
590        sys.modules.update(self.__saved)
591
592
593class SysPathsSnapshot:
594    def __init__(self) -> None:
595        self.__saved = list(sys.path), list(sys.meta_path)
596
597    def restore(self) -> None:
598        sys.path[:], sys.meta_path[:] = self.__saved
599
600
601@final
602class Testdir:
603    """Temporary test directory with tools to test/run pytest itself.
604
605    This is based on the :fixture:`tmpdir` fixture but provides a number of methods
606    which aid with testing pytest itself.  Unless :py:meth:`chdir` is used all
607    methods will use :py:attr:`tmpdir` as their current working directory.
608
609    Attributes:
610
611    :ivar tmpdir: The :py:class:`py.path.local` instance of the temporary directory.
612
613    :ivar plugins:
614       A list of plugins to use with :py:meth:`parseconfig` and
615       :py:meth:`runpytest`.  Initially this is an empty list but plugins can
616       be added to the list.  The type of items to add to the list depends on
617       the method using them so refer to them for details.
618    """
619
620    __test__ = False
621
622    CLOSE_STDIN = object
623
624    class TimeoutExpired(Exception):
625        pass
626
627    def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> None:
628        self.request = request
629        self._mod_collections = (
630            WeakKeyDictionary()
631        )  # type: WeakKeyDictionary[Module, List[Union[Item, Collector]]]
632        if request.function:
633            name = request.function.__name__  # type: str
634        else:
635            name = request.node.name
636        self._name = name
637        self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
638        self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True)
639        self.plugins = []  # type: List[Union[str, _PluggyPlugin]]
640        self._cwd_snapshot = CwdSnapshot()
641        self._sys_path_snapshot = SysPathsSnapshot()
642        self._sys_modules_snapshot = self.__take_sys_modules_snapshot()
643        self.chdir()
644        self.request.addfinalizer(self.finalize)
645        self._method = self.request.config.getoption("--runpytest")
646
647        mp = self.monkeypatch = MonkeyPatch()
648        mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot))
649        # Ensure no unexpected caching via tox.
650        mp.delenv("TOX_ENV_DIR", raising=False)
651        # Discard outer pytest options.
652        mp.delenv("PYTEST_ADDOPTS", raising=False)
653        # Ensure no user config is used.
654        tmphome = str(self.tmpdir)
655        mp.setenv("HOME", tmphome)
656        mp.setenv("USERPROFILE", tmphome)
657        # Do not use colors for inner runs by default.
658        mp.setenv("PY_COLORS", "0")
659
660    def __repr__(self) -> str:
661        return "<Testdir {!r}>".format(self.tmpdir)
662
663    def __str__(self) -> str:
664        return str(self.tmpdir)
665
666    def finalize(self) -> None:
667        """Clean up global state artifacts.
668
669        Some methods modify the global interpreter state and this tries to
670        clean this up.  It does not remove the temporary directory however so
671        it can be looked at after the test run has finished.
672        """
673        self._sys_modules_snapshot.restore()
674        self._sys_path_snapshot.restore()
675        self._cwd_snapshot.restore()
676        self.monkeypatch.undo()
677
678    def __take_sys_modules_snapshot(self) -> SysModulesSnapshot:
679        # Some zope modules used by twisted-related tests keep internal state
680        # and can't be deleted; we had some trouble in the past with
681        # `zope.interface` for example.
682        def preserve_module(name):
683            return name.startswith("zope")
684
685        return SysModulesSnapshot(preserve=preserve_module)
686
687    def make_hook_recorder(self, pluginmanager: PytestPluginManager) -> HookRecorder:
688        """Create a new :py:class:`HookRecorder` for a PluginManager."""
689        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
690        self.request.addfinalizer(reprec.finish_recording)
691        return reprec
692
693    def chdir(self) -> None:
694        """Cd into the temporary directory.
695
696        This is done automatically upon instantiation.
697        """
698        self.tmpdir.chdir()
699
700    def _makefile(self, ext: str, lines, files, encoding: str = "utf-8"):
701        items = list(files.items())
702
703        def to_text(s):
704            return s.decode(encoding) if isinstance(s, bytes) else str(s)
705
706        if lines:
707            source = "\n".join(to_text(x) for x in lines)
708            basename = self._name
709            items.insert(0, (basename, source))
710
711        ret = None
712        for basename, value in items:
713            p = self.tmpdir.join(basename).new(ext=ext)
714            p.dirpath().ensure_dir()
715            source_ = Source(value)
716            source = "\n".join(to_text(line) for line in source_.lines)
717            p.write(source.strip().encode(encoding), "wb")
718            if ret is None:
719                ret = p
720        return ret
721
722    def makefile(self, ext: str, *args: str, **kwargs):
723        r"""Create new file(s) in the testdir.
724
725        :param str ext:
726            The extension the file(s) should use, including the dot, e.g. `.py`.
727        :param args:
728            All args are treated as strings and joined using newlines.
729            The result is written as contents to the file.  The name of the
730            file is based on the test function requesting this fixture.
731        :param kwargs:
732            Each keyword is the name of a file, while the value of it will
733            be written as contents of the file.
734
735        Examples:
736
737        .. code-block:: python
738
739            testdir.makefile(".txt", "line1", "line2")
740
741            testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n")
742
743        """
744        return self._makefile(ext, args, kwargs)
745
746    def makeconftest(self, source):
747        """Write a contest.py file with 'source' as contents."""
748        return self.makepyfile(conftest=source)
749
750    def makeini(self, source):
751        """Write a tox.ini file with 'source' as contents."""
752        return self.makefile(".ini", tox=source)
753
754    def getinicfg(self, source) -> IniConfig:
755        """Return the pytest section from the tox.ini config file."""
756        p = self.makeini(source)
757        return IniConfig(p)["pytest"]
758
759    def makepyprojecttoml(self, source):
760        """Write a pyproject.toml file with 'source' as contents.
761
762        .. versionadded:: 6.0
763        """
764        return self.makefile(".toml", pyproject=source)
765
766    def makepyfile(self, *args, **kwargs):
767        r"""Shortcut for .makefile() with a .py extension.
768
769        Defaults to the test name with a '.py' extension, e.g test_foobar.py, overwriting
770        existing files.
771
772        Examples:
773
774        .. code-block:: python
775
776            def test_something(testdir):
777                # Initial file is created test_something.py.
778                testdir.makepyfile("foobar")
779                # To create multiple files, pass kwargs accordingly.
780                testdir.makepyfile(custom="foobar")
781                # At this point, both 'test_something.py' & 'custom.py' exist in the test directory.
782
783        """
784        return self._makefile(".py", args, kwargs)
785
786    def maketxtfile(self, *args, **kwargs):
787        r"""Shortcut for .makefile() with a .txt extension.
788
789        Defaults to the test name with a '.txt' extension, e.g test_foobar.txt, overwriting
790        existing files.
791
792        Examples:
793
794        .. code-block:: python
795
796            def test_something(testdir):
797                # Initial file is created test_something.txt.
798                testdir.maketxtfile("foobar")
799                # To create multiple files, pass kwargs accordingly.
800                testdir.maketxtfile(custom="foobar")
801                # At this point, both 'test_something.txt' & 'custom.txt' exist in the test directory.
802
803        """
804        return self._makefile(".txt", args, kwargs)
805
806    def syspathinsert(self, path=None) -> None:
807        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
808
809        This is undone automatically when this object dies at the end of each
810        test.
811        """
812        if path is None:
813            path = self.tmpdir
814
815        self.monkeypatch.syspath_prepend(str(path))
816
817    def mkdir(self, name) -> py.path.local:
818        """Create a new (sub)directory."""
819        return self.tmpdir.mkdir(name)
820
821    def mkpydir(self, name) -> py.path.local:
822        """Create a new Python package.
823
824        This creates a (sub)directory with an empty ``__init__.py`` file so it
825        gets recognised as a Python package.
826        """
827        p = self.mkdir(name)
828        p.ensure("__init__.py")
829        return p
830
831    def copy_example(self, name=None) -> py.path.local:
832        """Copy file from project's directory into the testdir.
833
834        :param str name: The name of the file to copy.
835        :returns: Path to the copied directory (inside ``self.tmpdir``).
836        """
837        import warnings
838        from _pytest.warning_types import PYTESTER_COPY_EXAMPLE
839
840        warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2)
841        example_dir = self.request.config.getini("pytester_example_dir")
842        if example_dir is None:
843            raise ValueError("pytester_example_dir is unset, can't copy examples")
844        example_dir = self.request.config.rootdir.join(example_dir)
845
846        for extra_element in self.request.node.iter_markers("pytester_example_path"):
847            assert extra_element.args
848            example_dir = example_dir.join(*extra_element.args)
849
850        if name is None:
851            func_name = self._name
852            maybe_dir = example_dir / func_name
853            maybe_file = example_dir / (func_name + ".py")
854
855            if maybe_dir.isdir():
856                example_path = maybe_dir
857            elif maybe_file.isfile():
858                example_path = maybe_file
859            else:
860                raise LookupError(
861                    "{} cant be found as module or package in {}".format(
862                        func_name, example_dir.bestrelpath(self.request.config.rootdir)
863                    )
864                )
865        else:
866            example_path = example_dir.join(name)
867
868        if example_path.isdir() and not example_path.join("__init__.py").isfile():
869            example_path.copy(self.tmpdir)
870            return self.tmpdir
871        elif example_path.isfile():
872            result = self.tmpdir.join(example_path.basename)
873            example_path.copy(result)
874            return result
875        else:
876            raise LookupError(
877                'example "{}" is not found as a file or directory'.format(example_path)
878            )
879
880    Session = Session
881
882    def getnode(self, config: Config, arg):
883        """Return the collection node of a file.
884
885        :param _pytest.config.Config config:
886           A pytest config.
887           See :py:meth:`parseconfig` and :py:meth:`parseconfigure` for creating it.
888        :param py.path.local arg:
889            Path to the file.
890        """
891        session = Session.from_config(config)
892        assert "::" not in str(arg)
893        p = py.path.local(arg)
894        config.hook.pytest_sessionstart(session=session)
895        res = session.perform_collect([str(p)], genitems=False)[0]
896        config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK)
897        return res
898
899    def getpathnode(self, path):
900        """Return the collection node of a file.
901
902        This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to
903        create the (configured) pytest Config instance.
904
905        :param py.path.local path: Path to the file.
906        """
907        config = self.parseconfigure(path)
908        session = Session.from_config(config)
909        x = session.fspath.bestrelpath(path)
910        config.hook.pytest_sessionstart(session=session)
911        res = session.perform_collect([x], genitems=False)[0]
912        config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK)
913        return res
914
915    def genitems(self, colitems: Sequence[Union[Item, Collector]]) -> List[Item]:
916        """Generate all test items from a collection node.
917
918        This recurses into the collection node and returns a list of all the
919        test items contained within.
920        """
921        session = colitems[0].session
922        result = []  # type: List[Item]
923        for colitem in colitems:
924            result.extend(session.genitems(colitem))
925        return result
926
927    def runitem(self, source):
928        """Run the "test_func" Item.
929
930        The calling test instance (class containing the test method) must
931        provide a ``.getrunner()`` method which should return a runner which
932        can run the test protocol for a single item, e.g.
933        :py:func:`_pytest.runner.runtestprotocol`.
934        """
935        # used from runner functional tests
936        item = self.getitem(source)
937        # the test class where we are called from wants to provide the runner
938        testclassinstance = self.request.instance
939        runner = testclassinstance.getrunner()
940        return runner(item)
941
942    def inline_runsource(self, source, *cmdlineargs) -> HookRecorder:
943        """Run a test module in process using ``pytest.main()``.
944
945        This run writes "source" into a temporary file and runs
946        ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance
947        for the result.
948
949        :param source: The source code of the test module.
950
951        :param cmdlineargs: Any extra command line arguments to use.
952
953        :returns: :py:class:`HookRecorder` instance of the result.
954        """
955        p = self.makepyfile(source)
956        values = list(cmdlineargs) + [p]
957        return self.inline_run(*values)
958
959    def inline_genitems(self, *args) -> Tuple[List[Item], HookRecorder]:
960        """Run ``pytest.main(['--collectonly'])`` in-process.
961
962        Runs the :py:func:`pytest.main` function to run all of pytest inside
963        the test process itself like :py:meth:`inline_run`, but returns a
964        tuple of the collected items and a :py:class:`HookRecorder` instance.
965        """
966        rec = self.inline_run("--collect-only", *args)
967        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
968        return items, rec
969
970    def inline_run(
971        self, *args, plugins=(), no_reraise_ctrlc: bool = False
972    ) -> HookRecorder:
973        """Run ``pytest.main()`` in-process, returning a HookRecorder.
974
975        Runs the :py:func:`pytest.main` function to run all of pytest inside
976        the test process itself.  This means it can return a
977        :py:class:`HookRecorder` instance which gives more detailed results
978        from that run than can be done by matching stdout/stderr from
979        :py:meth:`runpytest`.
980
981        :param args:
982            Command line arguments to pass to :py:func:`pytest.main`.
983        :param plugins:
984            Extra plugin instances the ``pytest.main()`` instance should use.
985        :param no_reraise_ctrlc:
986            Typically we reraise keyboard interrupts from the child run. If
987            True, the KeyboardInterrupt exception is captured.
988
989        :returns: A :py:class:`HookRecorder` instance.
990        """
991        # (maybe a cpython bug?) the importlib cache sometimes isn't updated
992        # properly between file creation and inline_run (especially if imports
993        # are interspersed with file creation)
994        importlib.invalidate_caches()
995
996        plugins = list(plugins)
997        finalizers = []
998        try:
999            # Any sys.module or sys.path changes done while running pytest
1000            # inline should be reverted after the test run completes to avoid
1001            # clashing with later inline tests run within the same pytest test,
1002            # e.g. just because they use matching test module names.
1003            finalizers.append(self.__take_sys_modules_snapshot().restore)
1004            finalizers.append(SysPathsSnapshot().restore)
1005
1006            # Important note:
1007            # - our tests should not leave any other references/registrations
1008            #   laying around other than possibly loaded test modules
1009            #   referenced from sys.modules, as nothing will clean those up
1010            #   automatically
1011
1012            rec = []
1013
1014            class Collect:
1015                def pytest_configure(x, config: Config) -> None:
1016                    rec.append(self.make_hook_recorder(config.pluginmanager))
1017
1018            plugins.append(Collect())
1019            ret = pytest.main(list(args), plugins=plugins)
1020            if len(rec) == 1:
1021                reprec = rec.pop()
1022            else:
1023
1024                class reprec:  # type: ignore
1025                    pass
1026
1027            reprec.ret = ret
1028
1029            # Typically we reraise keyboard interrupts from the child run
1030            # because it's our user requesting interruption of the testing.
1031            if ret == ExitCode.INTERRUPTED and not no_reraise_ctrlc:
1032                calls = reprec.getcalls("pytest_keyboard_interrupt")
1033                if calls and calls[-1].excinfo.type == KeyboardInterrupt:
1034                    raise KeyboardInterrupt()
1035            return reprec
1036        finally:
1037            for finalizer in finalizers:
1038                finalizer()
1039
1040    def runpytest_inprocess(self, *args, **kwargs) -> RunResult:
1041        """Return result of running pytest in-process, providing a similar
1042        interface to what self.runpytest() provides."""
1043        syspathinsert = kwargs.pop("syspathinsert", False)
1044
1045        if syspathinsert:
1046            self.syspathinsert()
1047        now = timing.time()
1048        capture = _get_multicapture("sys")
1049        capture.start_capturing()
1050        try:
1051            try:
1052                reprec = self.inline_run(*args, **kwargs)
1053            except SystemExit as e:
1054                ret = e.args[0]
1055                try:
1056                    ret = ExitCode(e.args[0])
1057                except ValueError:
1058                    pass
1059
1060                class reprec:  # type: ignore
1061                    ret = ret
1062
1063            except Exception:
1064                traceback.print_exc()
1065
1066                class reprec:  # type: ignore
1067                    ret = ExitCode(3)
1068
1069        finally:
1070            out, err = capture.readouterr()
1071            capture.stop_capturing()
1072            sys.stdout.write(out)
1073            sys.stderr.write(err)
1074
1075        assert reprec.ret is not None
1076        res = RunResult(
1077            reprec.ret, out.splitlines(), err.splitlines(), timing.time() - now
1078        )
1079        res.reprec = reprec  # type: ignore
1080        return res
1081
1082    def runpytest(self, *args, **kwargs) -> RunResult:
1083        """Run pytest inline or in a subprocess, depending on the command line
1084        option "--runpytest" and return a :py:class:`RunResult`."""
1085        args = self._ensure_basetemp(args)
1086        if self._method == "inprocess":
1087            return self.runpytest_inprocess(*args, **kwargs)
1088        elif self._method == "subprocess":
1089            return self.runpytest_subprocess(*args, **kwargs)
1090        raise RuntimeError("Unrecognized runpytest option: {}".format(self._method))
1091
1092    def _ensure_basetemp(self, args):
1093        args = list(args)
1094        for x in args:
1095            if str(x).startswith("--basetemp"):
1096                break
1097        else:
1098            args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp"))
1099        return args
1100
1101    def parseconfig(self, *args) -> Config:
1102        """Return a new pytest Config instance from given commandline args.
1103
1104        This invokes the pytest bootstrapping code in _pytest.config to create
1105        a new :py:class:`_pytest.core.PluginManager` and call the
1106        pytest_cmdline_parse hook to create a new
1107        :py:class:`_pytest.config.Config` instance.
1108
1109        If :py:attr:`plugins` has been populated they should be plugin modules
1110        to be registered with the PluginManager.
1111        """
1112        args = self._ensure_basetemp(args)
1113
1114        import _pytest.config
1115
1116        config = _pytest.config._prepareconfig(args, self.plugins)  # type: ignore[arg-type]
1117        # we don't know what the test will do with this half-setup config
1118        # object and thus we make sure it gets unconfigured properly in any
1119        # case (otherwise capturing could still be active, for example)
1120        self.request.addfinalizer(config._ensure_unconfigure)
1121        return config
1122
1123    def parseconfigure(self, *args) -> Config:
1124        """Return a new pytest configured Config instance.
1125
1126        Returns a new :py:class:`_pytest.config.Config` instance like
1127        :py:meth:`parseconfig`, but also calls the pytest_configure hook.
1128        """
1129        config = self.parseconfig(*args)
1130        config._do_configure()
1131        return config
1132
1133    def getitem(self, source, funcname: str = "test_func") -> Item:
1134        """Return the test item for a test function.
1135
1136        Writes the source to a python file and runs pytest's collection on
1137        the resulting module, returning the test item for the requested
1138        function name.
1139
1140        :param source:
1141            The module source.
1142        :param funcname:
1143            The name of the test function for which to return a test item.
1144        """
1145        items = self.getitems(source)
1146        for item in items:
1147            if item.name == funcname:
1148                return item
1149        assert 0, "{!r} item not found in module:\n{}\nitems: {}".format(
1150            funcname, source, items
1151        )
1152
1153    def getitems(self, source) -> List[Item]:
1154        """Return all test items collected from the module.
1155
1156        Writes the source to a Python file and runs pytest's collection on
1157        the resulting module, returning all test items contained within.
1158        """
1159        modcol = self.getmodulecol(source)
1160        return self.genitems([modcol])
1161
1162    def getmodulecol(self, source, configargs=(), withinit: bool = False):
1163        """Return the module collection node for ``source``.
1164
1165        Writes ``source`` to a file using :py:meth:`makepyfile` and then
1166        runs the pytest collection on it, returning the collection node for the
1167        test module.
1168
1169        :param source:
1170            The source code of the module to collect.
1171
1172        :param configargs:
1173            Any extra arguments to pass to :py:meth:`parseconfigure`.
1174
1175        :param withinit:
1176            Whether to also write an ``__init__.py`` file to the same
1177            directory to ensure it is a package.
1178        """
1179        if isinstance(source, Path):
1180            path = self.tmpdir.join(str(source))
1181            assert not withinit, "not supported for paths"
1182        else:
1183            kw = {self._name: Source(source).strip()}
1184            path = self.makepyfile(**kw)
1185        if withinit:
1186            self.makepyfile(__init__="#")
1187        self.config = config = self.parseconfigure(path, *configargs)
1188        return self.getnode(config, path)
1189
1190    def collect_by_name(
1191        self, modcol: Module, name: str
1192    ) -> Optional[Union[Item, Collector]]:
1193        """Return the collection node for name from the module collection.
1194
1195        Searchs a module collection node for a collection node matching the
1196        given name.
1197
1198        :param modcol: A module collection node; see :py:meth:`getmodulecol`.
1199        :param name: The name of the node to return.
1200        """
1201        if modcol not in self._mod_collections:
1202            self._mod_collections[modcol] = list(modcol.collect())
1203        for colitem in self._mod_collections[modcol]:
1204            if colitem.name == name:
1205                return colitem
1206        return None
1207
1208    def popen(
1209        self,
1210        cmdargs,
1211        stdout=subprocess.PIPE,
1212        stderr=subprocess.PIPE,
1213        stdin=CLOSE_STDIN,
1214        **kw
1215    ):
1216        """Invoke subprocess.Popen.
1217
1218        Calls subprocess.Popen making sure the current working directory is
1219        in the PYTHONPATH.
1220
1221        You probably want to use :py:meth:`run` instead.
1222        """
1223        env = os.environ.copy()
1224        env["PYTHONPATH"] = os.pathsep.join(
1225            filter(None, [os.getcwd(), env.get("PYTHONPATH", "")])
1226        )
1227        kw["env"] = env
1228
1229        if stdin is Testdir.CLOSE_STDIN:
1230            kw["stdin"] = subprocess.PIPE
1231        elif isinstance(stdin, bytes):
1232            kw["stdin"] = subprocess.PIPE
1233        else:
1234            kw["stdin"] = stdin
1235
1236        popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw)
1237        if stdin is Testdir.CLOSE_STDIN:
1238            assert popen.stdin is not None
1239            popen.stdin.close()
1240        elif isinstance(stdin, bytes):
1241            assert popen.stdin is not None
1242            popen.stdin.write(stdin)
1243
1244        return popen
1245
1246    def run(
1247        self, *cmdargs, timeout: Optional[float] = None, stdin=CLOSE_STDIN
1248    ) -> RunResult:
1249        """Run a command with arguments.
1250
1251        Run a process using subprocess.Popen saving the stdout and stderr.
1252
1253        :param args:
1254            The sequence of arguments to pass to `subprocess.Popen()`.
1255        :param timeout:
1256            The period in seconds after which to timeout and raise
1257            :py:class:`Testdir.TimeoutExpired`.
1258        :param stdin:
1259            Optional standard input.  Bytes are being send, closing
1260            the pipe, otherwise it is passed through to ``popen``.
1261            Defaults to ``CLOSE_STDIN``, which translates to using a pipe
1262            (``subprocess.PIPE``) that gets closed.
1263
1264        :rtype: RunResult
1265        """
1266        __tracebackhide__ = True
1267
1268        cmdargs = tuple(
1269            str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs
1270        )
1271        p1 = self.tmpdir.join("stdout")
1272        p2 = self.tmpdir.join("stderr")
1273        print("running:", *cmdargs)
1274        print("     in:", py.path.local())
1275        f1 = open(str(p1), "w", encoding="utf8")
1276        f2 = open(str(p2), "w", encoding="utf8")
1277        try:
1278            now = timing.time()
1279            popen = self.popen(
1280                cmdargs,
1281                stdin=stdin,
1282                stdout=f1,
1283                stderr=f2,
1284                close_fds=(sys.platform != "win32"),
1285            )
1286            if isinstance(stdin, bytes):
1287                popen.stdin.close()
1288
1289            def handle_timeout() -> None:
1290                __tracebackhide__ = True
1291
1292                timeout_message = (
1293                    "{seconds} second timeout expired running:"
1294                    " {command}".format(seconds=timeout, command=cmdargs)
1295                )
1296
1297                popen.kill()
1298                popen.wait()
1299                raise self.TimeoutExpired(timeout_message)
1300
1301            if timeout is None:
1302                ret = popen.wait()
1303            else:
1304                try:
1305                    ret = popen.wait(timeout)
1306                except subprocess.TimeoutExpired:
1307                    handle_timeout()
1308        finally:
1309            f1.close()
1310            f2.close()
1311        f1 = open(str(p1), encoding="utf8")
1312        f2 = open(str(p2), encoding="utf8")
1313        try:
1314            out = f1.read().splitlines()
1315            err = f2.read().splitlines()
1316        finally:
1317            f1.close()
1318            f2.close()
1319        self._dump_lines(out, sys.stdout)
1320        self._dump_lines(err, sys.stderr)
1321        try:
1322            ret = ExitCode(ret)
1323        except ValueError:
1324            pass
1325        return RunResult(ret, out, err, timing.time() - now)
1326
1327    def _dump_lines(self, lines, fp):
1328        try:
1329            for line in lines:
1330                print(line, file=fp)
1331        except UnicodeEncodeError:
1332            print("couldn't print to {} because of encoding".format(fp))
1333
1334    def _getpytestargs(self) -> Tuple[str, ...]:
1335        return sys.executable, "-mpytest"
1336
1337    def runpython(self, script) -> RunResult:
1338        """Run a python script using sys.executable as interpreter.
1339
1340        :rtype: RunResult
1341        """
1342        return self.run(sys.executable, script)
1343
1344    def runpython_c(self, command):
1345        """Run python -c "command".
1346
1347        :rtype: RunResult
1348        """
1349        return self.run(sys.executable, "-c", command)
1350
1351    def runpytest_subprocess(self, *args, timeout: Optional[float] = None) -> RunResult:
1352        """Run pytest as a subprocess with given arguments.
1353
1354        Any plugins added to the :py:attr:`plugins` list will be added using the
1355        ``-p`` command line option.  Additionally ``--basetemp`` is used to put
1356        any temporary files and directories in a numbered directory prefixed
1357        with "runpytest-" to not conflict with the normal numbered pytest
1358        location for temporary files and directories.
1359
1360        :param args:
1361            The sequence of arguments to pass to the pytest subprocess.
1362        :param timeout:
1363            The period in seconds after which to timeout and raise
1364            :py:class:`Testdir.TimeoutExpired`.
1365
1366        :rtype: RunResult
1367        """
1368        __tracebackhide__ = True
1369        p = make_numbered_dir(root=Path(str(self.tmpdir)), prefix="runpytest-")
1370        args = ("--basetemp=%s" % p,) + args
1371        plugins = [x for x in self.plugins if isinstance(x, str)]
1372        if plugins:
1373            args = ("-p", plugins[0]) + args
1374        args = self._getpytestargs() + args
1375        return self.run(*args, timeout=timeout)
1376
1377    def spawn_pytest(
1378        self, string: str, expect_timeout: float = 10.0
1379    ) -> "pexpect.spawn":
1380        """Run pytest using pexpect.
1381
1382        This makes sure to use the right pytest and sets up the temporary
1383        directory locations.
1384
1385        The pexpect child is returned.
1386        """
1387        basetemp = self.tmpdir.mkdir("temp-pexpect")
1388        invoke = " ".join(map(str, self._getpytestargs()))
1389        cmd = "{} --basetemp={} {}".format(invoke, basetemp, string)
1390        return self.spawn(cmd, expect_timeout=expect_timeout)
1391
1392    def spawn(self, cmd: str, expect_timeout: float = 10.0) -> "pexpect.spawn":
1393        """Run a command using pexpect.
1394
1395        The pexpect child is returned.
1396        """
1397        pexpect = pytest.importorskip("pexpect", "3.0")
1398        if hasattr(sys, "pypy_version_info") and "64" in platform.machine():
1399            pytest.skip("pypy-64 bit not supported")
1400        if not hasattr(pexpect, "spawn"):
1401            pytest.skip("pexpect.spawn not available")
1402        logfile = self.tmpdir.join("spawn.out").open("wb")
1403
1404        child = pexpect.spawn(cmd, logfile=logfile)
1405        self.request.addfinalizer(logfile.close)
1406        child.timeout = expect_timeout
1407        return child
1408
1409
1410class LineComp:
1411    def __init__(self) -> None:
1412        self.stringio = StringIO()
1413        """:class:`python:io.StringIO()` instance used for input."""
1414
1415    def assert_contains_lines(self, lines2: Sequence[str]) -> None:
1416        """Assert that ``lines2`` are contained (linearly) in :attr:`stringio`'s value.
1417
1418        Lines are matched using :func:`LineMatcher.fnmatch_lines`.
1419        """
1420        __tracebackhide__ = True
1421        val = self.stringio.getvalue()
1422        self.stringio.truncate(0)
1423        self.stringio.seek(0)
1424        lines1 = val.split("\n")
1425        LineMatcher(lines1).fnmatch_lines(lines2)
1426
1427
1428class LineMatcher:
1429    """Flexible matching of text.
1430
1431    This is a convenience class to test large texts like the output of
1432    commands.
1433
1434    The constructor takes a list of lines without their trailing newlines, i.e.
1435    ``text.splitlines()``.
1436    """
1437
1438    def __init__(self, lines: List[str]) -> None:
1439        self.lines = lines
1440        self._log_output = []  # type: List[str]
1441
1442    def _getlines(self, lines2: Union[str, Sequence[str], Source]) -> Sequence[str]:
1443        if isinstance(lines2, str):
1444            lines2 = Source(lines2)
1445        if isinstance(lines2, Source):
1446            lines2 = lines2.strip().lines
1447        return lines2
1448
1449    def fnmatch_lines_random(self, lines2: Sequence[str]) -> None:
1450        """Check lines exist in the output in any order (using :func:`python:fnmatch.fnmatch`)."""
1451        __tracebackhide__ = True
1452        self._match_lines_random(lines2, fnmatch)
1453
1454    def re_match_lines_random(self, lines2: Sequence[str]) -> None:
1455        """Check lines exist in the output in any order (using :func:`python:re.match`)."""
1456        __tracebackhide__ = True
1457        self._match_lines_random(lines2, lambda name, pat: bool(re.match(pat, name)))
1458
1459    def _match_lines_random(
1460        self, lines2: Sequence[str], match_func: Callable[[str, str], bool]
1461    ) -> None:
1462        __tracebackhide__ = True
1463        lines2 = self._getlines(lines2)
1464        for line in lines2:
1465            for x in self.lines:
1466                if line == x or match_func(x, line):
1467                    self._log("matched: ", repr(line))
1468                    break
1469            else:
1470                msg = "line %r not found in output" % line
1471                self._log(msg)
1472                self._fail(msg)
1473
1474    def get_lines_after(self, fnline: str) -> Sequence[str]:
1475        """Return all lines following the given line in the text.
1476
1477        The given line can contain glob wildcards.
1478        """
1479        for i, line in enumerate(self.lines):
1480            if fnline == line or fnmatch(line, fnline):
1481                return self.lines[i + 1 :]
1482        raise ValueError("line %r not found in output" % fnline)
1483
1484    def _log(self, *args) -> None:
1485        self._log_output.append(" ".join(str(x) for x in args))
1486
1487    @property
1488    def _log_text(self) -> str:
1489        return "\n".join(self._log_output)
1490
1491    def fnmatch_lines(
1492        self, lines2: Sequence[str], *, consecutive: bool = False
1493    ) -> None:
1494        """Check lines exist in the output (using :func:`python:fnmatch.fnmatch`).
1495
1496        The argument is a list of lines which have to match and can use glob
1497        wildcards.  If they do not match a pytest.fail() is called.  The
1498        matches and non-matches are also shown as part of the error message.
1499
1500        :param lines2: String patterns to match.
1501        :param consecutive: Match lines consecutively?
1502        """
1503        __tracebackhide__ = True
1504        self._match_lines(lines2, fnmatch, "fnmatch", consecutive=consecutive)
1505
1506    def re_match_lines(
1507        self, lines2: Sequence[str], *, consecutive: bool = False
1508    ) -> None:
1509        """Check lines exist in the output (using :func:`python:re.match`).
1510
1511        The argument is a list of lines which have to match using ``re.match``.
1512        If they do not match a pytest.fail() is called.
1513
1514        The matches and non-matches are also shown as part of the error message.
1515
1516        :param lines2: string patterns to match.
1517        :param consecutive: match lines consecutively?
1518        """
1519        __tracebackhide__ = True
1520        self._match_lines(
1521            lines2,
1522            lambda name, pat: bool(re.match(pat, name)),
1523            "re.match",
1524            consecutive=consecutive,
1525        )
1526
1527    def _match_lines(
1528        self,
1529        lines2: Sequence[str],
1530        match_func: Callable[[str, str], bool],
1531        match_nickname: str,
1532        *,
1533        consecutive: bool = False
1534    ) -> None:
1535        """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
1536
1537        :param Sequence[str] lines2:
1538            List of string patterns to match. The actual format depends on
1539            ``match_func``.
1540        :param match_func:
1541            A callable ``match_func(line, pattern)`` where line is the
1542            captured line from stdout/stderr and pattern is the matching
1543            pattern.
1544        :param str match_nickname:
1545            The nickname for the match function that will be logged to stdout
1546            when a match occurs.
1547        :param consecutive:
1548            Match lines consecutively?
1549        """
1550        if not isinstance(lines2, collections.abc.Sequence):
1551            raise TypeError("invalid type for lines2: {}".format(type(lines2).__name__))
1552        lines2 = self._getlines(lines2)
1553        lines1 = self.lines[:]
1554        nextline = None
1555        extralines = []
1556        __tracebackhide__ = True
1557        wnick = len(match_nickname) + 1
1558        started = False
1559        for line in lines2:
1560            nomatchprinted = False
1561            while lines1:
1562                nextline = lines1.pop(0)
1563                if line == nextline:
1564                    self._log("exact match:", repr(line))
1565                    started = True
1566                    break
1567                elif match_func(nextline, line):
1568                    self._log("%s:" % match_nickname, repr(line))
1569                    self._log(
1570                        "{:>{width}}".format("with:", width=wnick), repr(nextline)
1571                    )
1572                    started = True
1573                    break
1574                else:
1575                    if consecutive and started:
1576                        msg = "no consecutive match: {!r}".format(line)
1577                        self._log(msg)
1578                        self._log(
1579                            "{:>{width}}".format("with:", width=wnick), repr(nextline)
1580                        )
1581                        self._fail(msg)
1582                    if not nomatchprinted:
1583                        self._log(
1584                            "{:>{width}}".format("nomatch:", width=wnick), repr(line)
1585                        )
1586                        nomatchprinted = True
1587                    self._log("{:>{width}}".format("and:", width=wnick), repr(nextline))
1588                extralines.append(nextline)
1589            else:
1590                msg = "remains unmatched: {!r}".format(line)
1591                self._log(msg)
1592                self._fail(msg)
1593        self._log_output = []
1594
1595    def no_fnmatch_line(self, pat: str) -> None:
1596        """Ensure captured lines do not match the given pattern, using ``fnmatch.fnmatch``.
1597
1598        :param str pat: The pattern to match lines.
1599        """
1600        __tracebackhide__ = True
1601        self._no_match_line(pat, fnmatch, "fnmatch")
1602
1603    def no_re_match_line(self, pat: str) -> None:
1604        """Ensure captured lines do not match the given pattern, using ``re.match``.
1605
1606        :param str pat: The regular expression to match lines.
1607        """
1608        __tracebackhide__ = True
1609        self._no_match_line(
1610            pat, lambda name, pat: bool(re.match(pat, name)), "re.match"
1611        )
1612
1613    def _no_match_line(
1614        self, pat: str, match_func: Callable[[str, str], bool], match_nickname: str
1615    ) -> None:
1616        """Ensure captured lines does not have a the given pattern, using ``fnmatch.fnmatch``.
1617
1618        :param str pat: The pattern to match lines.
1619        """
1620        __tracebackhide__ = True
1621        nomatch_printed = False
1622        wnick = len(match_nickname) + 1
1623        for line in self.lines:
1624            if match_func(line, pat):
1625                msg = "{}: {!r}".format(match_nickname, pat)
1626                self._log(msg)
1627                self._log("{:>{width}}".format("with:", width=wnick), repr(line))
1628                self._fail(msg)
1629            else:
1630                if not nomatch_printed:
1631                    self._log("{:>{width}}".format("nomatch:", width=wnick), repr(pat))
1632                    nomatch_printed = True
1633                self._log("{:>{width}}".format("and:", width=wnick), repr(line))
1634        self._log_output = []
1635
1636    def _fail(self, msg: str) -> None:
1637        __tracebackhide__ = True
1638        log_text = self._log_text
1639        self._log_output = []
1640        pytest.fail(log_text)
1641
1642    def str(self) -> str:
1643        """Return the entire original text."""
1644        return "\n".join(self.lines)
1645