1"""(Disabled by default) support for testing pytest and pytest plugins.""" 2import collections.abc 3import gc 4import importlib 5import os 6import platform 7import re 8import subprocess 9import sys 10import traceback 11from fnmatch import fnmatch 12from io import StringIO 13from typing import Callable 14from typing import Dict 15from typing import Generator 16from typing import Iterable 17from typing import List 18from typing import Optional 19from typing import Sequence 20from typing import Tuple 21from typing import Union 22from weakref import WeakKeyDictionary 23 24import py 25from iniconfig import IniConfig 26 27import pytest 28from _pytest import timing 29from _pytest._code import Source 30from _pytest.capture import _get_multicapture 31from _pytest.compat import final 32from _pytest.compat import overload 33from _pytest.compat import TYPE_CHECKING 34from _pytest.config import _PluggyPlugin 35from _pytest.config import Config 36from _pytest.config import ExitCode 37from _pytest.config import PytestPluginManager 38from _pytest.config.argparsing import Parser 39from _pytest.fixtures import FixtureRequest 40from _pytest.main import Session 41from _pytest.monkeypatch import MonkeyPatch 42from _pytest.nodes import Collector 43from _pytest.nodes import Item 44from _pytest.pathlib import make_numbered_dir 45from _pytest.pathlib import Path 46from _pytest.python import Module 47from _pytest.reports import CollectReport 48from _pytest.reports import TestReport 49from _pytest.tmpdir import TempdirFactory 50 51if TYPE_CHECKING: 52 from typing import Type 53 from typing_extensions import Literal 54 55 import pexpect 56 57 58IGNORE_PAM = [ # filenames added when obtaining details about the current user 59 "/var/lib/sss/mc/passwd" 60] 61 62 63def pytest_addoption(parser: Parser) -> None: 64 parser.addoption( 65 "--lsof", 66 action="store_true", 67 dest="lsof", 68 default=False, 69 help="run FD checks if lsof is available", 70 ) 71 72 parser.addoption( 73 "--runpytest", 74 default="inprocess", 75 dest="runpytest", 76 choices=("inprocess", "subprocess"), 77 help=( 78 "run pytest sub runs in tests using an 'inprocess' " 79 "or 'subprocess' (python -m main) method" 80 ), 81 ) 82 83 parser.addini( 84 "pytester_example_dir", help="directory to take the pytester example files from" 85 ) 86 87 88def pytest_configure(config: Config) -> None: 89 if config.getvalue("lsof"): 90 checker = LsofFdLeakChecker() 91 if checker.matching_platform(): 92 config.pluginmanager.register(checker) 93 94 config.addinivalue_line( 95 "markers", 96 "pytester_example_path(*path_segments): join the given path " 97 "segments to `pytester_example_dir` for this test.", 98 ) 99 100 101class LsofFdLeakChecker: 102 def get_open_files(self) -> List[Tuple[str, str]]: 103 out = subprocess.run( 104 ("lsof", "-Ffn0", "-p", str(os.getpid())), 105 stdout=subprocess.PIPE, 106 stderr=subprocess.DEVNULL, 107 check=True, 108 universal_newlines=True, 109 ).stdout 110 111 def isopen(line: str) -> bool: 112 return line.startswith("f") and ( 113 "deleted" not in line 114 and "mem" not in line 115 and "txt" not in line 116 and "cwd" not in line 117 ) 118 119 open_files = [] 120 121 for line in out.split("\n"): 122 if isopen(line): 123 fields = line.split("\0") 124 fd = fields[0][1:] 125 filename = fields[1][1:] 126 if filename in IGNORE_PAM: 127 continue 128 if filename.startswith("/"): 129 open_files.append((fd, filename)) 130 131 return open_files 132 133 def matching_platform(self) -> bool: 134 try: 135 subprocess.run(("lsof", "-v"), check=True) 136 except (OSError, subprocess.CalledProcessError): 137 return False 138 else: 139 return True 140 141 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 142 def pytest_runtest_protocol(self, item: Item) -> Generator[None, None, None]: 143 lines1 = self.get_open_files() 144 yield 145 if hasattr(sys, "pypy_version_info"): 146 gc.collect() 147 lines2 = self.get_open_files() 148 149 new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} 150 leaked_files = [t for t in lines2 if t[0] in new_fds] 151 if leaked_files: 152 error = [ 153 "***** %s FD leakage detected" % len(leaked_files), 154 *(str(f) for f in leaked_files), 155 "*** Before:", 156 *(str(f) for f in lines1), 157 "*** After:", 158 *(str(f) for f in lines2), 159 "***** %s FD leakage detected" % len(leaked_files), 160 "*** function %s:%s: %s " % item.location, 161 "See issue #2366", 162 ] 163 item.warn(pytest.PytestWarning("\n".join(error))) 164 165 166# used at least by pytest-xdist plugin 167 168 169@pytest.fixture 170def _pytest(request: FixtureRequest) -> "PytestArg": 171 """Return a helper which offers a gethookrecorder(hook) method which 172 returns a HookRecorder instance which helps to make assertions about called 173 hooks.""" 174 return PytestArg(request) 175 176 177class PytestArg: 178 def __init__(self, request: FixtureRequest) -> None: 179 self.request = request 180 181 def gethookrecorder(self, hook) -> "HookRecorder": 182 hookrecorder = HookRecorder(hook._pm) 183 self.request.addfinalizer(hookrecorder.finish_recording) 184 return hookrecorder 185 186 187def get_public_names(values: Iterable[str]) -> List[str]: 188 """Only return names from iterator values without a leading underscore.""" 189 return [x for x in values if x[0] != "_"] 190 191 192class ParsedCall: 193 def __init__(self, name: str, kwargs) -> None: 194 self.__dict__.update(kwargs) 195 self._name = name 196 197 def __repr__(self) -> str: 198 d = self.__dict__.copy() 199 del d["_name"] 200 return "<ParsedCall {!r}(**{!r})>".format(self._name, d) 201 202 if TYPE_CHECKING: 203 # The class has undetermined attributes, this tells mypy about it. 204 def __getattr__(self, key: str): 205 ... 206 207 208class HookRecorder: 209 """Record all hooks called in a plugin manager. 210 211 This wraps all the hook calls in the plugin manager, recording each call 212 before propagating the normal calls. 213 """ 214 215 def __init__(self, pluginmanager: PytestPluginManager) -> None: 216 self._pluginmanager = pluginmanager 217 self.calls = [] # type: List[ParsedCall] 218 self.ret = None # type: Optional[Union[int, ExitCode]] 219 220 def before(hook_name: str, hook_impls, kwargs) -> None: 221 self.calls.append(ParsedCall(hook_name, kwargs)) 222 223 def after(outcome, hook_name: str, hook_impls, kwargs) -> None: 224 pass 225 226 self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) 227 228 def finish_recording(self) -> None: 229 self._undo_wrapping() 230 231 def getcalls(self, names: Union[str, Iterable[str]]) -> List[ParsedCall]: 232 if isinstance(names, str): 233 names = names.split() 234 return [call for call in self.calls if call._name in names] 235 236 def assert_contains(self, entries: Sequence[Tuple[str, str]]) -> None: 237 __tracebackhide__ = True 238 i = 0 239 entries = list(entries) 240 backlocals = sys._getframe(1).f_locals 241 while entries: 242 name, check = entries.pop(0) 243 for ind, call in enumerate(self.calls[i:]): 244 if call._name == name: 245 print("NAMEMATCH", name, call) 246 if eval(check, backlocals, call.__dict__): 247 print("CHECKERMATCH", repr(check), "->", call) 248 else: 249 print("NOCHECKERMATCH", repr(check), "-", call) 250 continue 251 i += ind + 1 252 break 253 print("NONAMEMATCH", name, "with", call) 254 else: 255 pytest.fail("could not find {!r} check {!r}".format(name, check)) 256 257 def popcall(self, name: str) -> ParsedCall: 258 __tracebackhide__ = True 259 for i, call in enumerate(self.calls): 260 if call._name == name: 261 del self.calls[i] 262 return call 263 lines = ["could not find call {!r}, in:".format(name)] 264 lines.extend([" %s" % x for x in self.calls]) 265 pytest.fail("\n".join(lines)) 266 267 def getcall(self, name: str) -> ParsedCall: 268 values = self.getcalls(name) 269 assert len(values) == 1, (name, values) 270 return values[0] 271 272 # functionality for test reports 273 274 @overload 275 def getreports( 276 self, names: "Literal['pytest_collectreport']", 277 ) -> Sequence[CollectReport]: 278 ... 279 280 @overload # noqa: F811 281 def getreports( # noqa: F811 282 self, names: "Literal['pytest_runtest_logreport']", 283 ) -> Sequence[TestReport]: 284 ... 285 286 @overload # noqa: F811 287 def getreports( # noqa: F811 288 self, 289 names: Union[str, Iterable[str]] = ( 290 "pytest_collectreport", 291 "pytest_runtest_logreport", 292 ), 293 ) -> Sequence[Union[CollectReport, TestReport]]: 294 ... 295 296 def getreports( # noqa: F811 297 self, 298 names: Union[str, Iterable[str]] = ( 299 "pytest_collectreport", 300 "pytest_runtest_logreport", 301 ), 302 ) -> Sequence[Union[CollectReport, TestReport]]: 303 return [x.report for x in self.getcalls(names)] 304 305 def matchreport( 306 self, 307 inamepart: str = "", 308 names: Union[str, Iterable[str]] = ( 309 "pytest_runtest_logreport", 310 "pytest_collectreport", 311 ), 312 when: Optional[str] = None, 313 ) -> Union[CollectReport, TestReport]: 314 """Return a testreport whose dotted import path matches.""" 315 values = [] 316 for rep in self.getreports(names=names): 317 if not when and rep.when != "call" and rep.passed: 318 # setup/teardown passing reports - let's ignore those 319 continue 320 if when and rep.when != when: 321 continue 322 if not inamepart or inamepart in rep.nodeid.split("::"): 323 values.append(rep) 324 if not values: 325 raise ValueError( 326 "could not find test report matching %r: " 327 "no test reports at all!" % (inamepart,) 328 ) 329 if len(values) > 1: 330 raise ValueError( 331 "found 2 or more testreports matching {!r}: {}".format( 332 inamepart, values 333 ) 334 ) 335 return values[0] 336 337 @overload 338 def getfailures( 339 self, names: "Literal['pytest_collectreport']", 340 ) -> Sequence[CollectReport]: 341 ... 342 343 @overload # noqa: F811 344 def getfailures( # noqa: F811 345 self, names: "Literal['pytest_runtest_logreport']", 346 ) -> Sequence[TestReport]: 347 ... 348 349 @overload # noqa: F811 350 def getfailures( # noqa: F811 351 self, 352 names: Union[str, Iterable[str]] = ( 353 "pytest_collectreport", 354 "pytest_runtest_logreport", 355 ), 356 ) -> Sequence[Union[CollectReport, TestReport]]: 357 ... 358 359 def getfailures( # noqa: F811 360 self, 361 names: Union[str, Iterable[str]] = ( 362 "pytest_collectreport", 363 "pytest_runtest_logreport", 364 ), 365 ) -> Sequence[Union[CollectReport, TestReport]]: 366 return [rep for rep in self.getreports(names) if rep.failed] 367 368 def getfailedcollections(self) -> Sequence[CollectReport]: 369 return self.getfailures("pytest_collectreport") 370 371 def listoutcomes( 372 self, 373 ) -> Tuple[ 374 Sequence[TestReport], 375 Sequence[Union[CollectReport, TestReport]], 376 Sequence[Union[CollectReport, TestReport]], 377 ]: 378 passed = [] 379 skipped = [] 380 failed = [] 381 for rep in self.getreports( 382 ("pytest_collectreport", "pytest_runtest_logreport") 383 ): 384 if rep.passed: 385 if rep.when == "call": 386 assert isinstance(rep, TestReport) 387 passed.append(rep) 388 elif rep.skipped: 389 skipped.append(rep) 390 else: 391 assert rep.failed, "Unexpected outcome: {!r}".format(rep) 392 failed.append(rep) 393 return passed, skipped, failed 394 395 def countoutcomes(self) -> List[int]: 396 return [len(x) for x in self.listoutcomes()] 397 398 def assertoutcome(self, passed: int = 0, skipped: int = 0, failed: int = 0) -> None: 399 __tracebackhide__ = True 400 401 outcomes = self.listoutcomes() 402 realpassed, realskipped, realfailed = outcomes 403 obtained = { 404 "passed": len(realpassed), 405 "skipped": len(realskipped), 406 "failed": len(realfailed), 407 } 408 expected = {"passed": passed, "skipped": skipped, "failed": failed} 409 assert obtained == expected, outcomes 410 411 def clear(self) -> None: 412 self.calls[:] = [] 413 414 415@pytest.fixture 416def linecomp() -> "LineComp": 417 """A :class: `LineComp` instance for checking that an input linearly 418 contains a sequence of strings.""" 419 return LineComp() 420 421 422@pytest.fixture(name="LineMatcher") 423def LineMatcher_fixture(request: FixtureRequest) -> "Type[LineMatcher]": 424 """A reference to the :class: `LineMatcher`. 425 426 This is instantiable with a list of lines (without their trailing newlines). 427 This is useful for testing large texts, such as the output of commands. 428 """ 429 return LineMatcher 430 431 432@pytest.fixture 433def testdir(request: FixtureRequest, tmpdir_factory: TempdirFactory) -> "Testdir": 434 """A :class: `TestDir` instance, that can be used to run and test pytest itself. 435 436 It is particularly useful for testing plugins. It is similar to the `tmpdir` fixture 437 but provides methods which aid in testing pytest itself. 438 """ 439 return Testdir(request, tmpdir_factory) 440 441 442@pytest.fixture 443def _sys_snapshot() -> Generator[None, None, None]: 444 snappaths = SysPathsSnapshot() 445 snapmods = SysModulesSnapshot() 446 yield 447 snapmods.restore() 448 snappaths.restore() 449 450 451@pytest.fixture 452def _config_for_test() -> Generator[Config, None, None]: 453 from _pytest.config import get_config 454 455 config = get_config() 456 yield config 457 config._ensure_unconfigure() # cleanup, e.g. capman closing tmpfiles. 458 459 460# Regex to match the session duration string in the summary: "74.34s". 461rex_session_duration = re.compile(r"\d+\.\d\ds") 462# Regex to match all the counts and phrases in the summary line: "34 passed, 111 skipped". 463rex_outcome = re.compile(r"(\d+) (\w+)") 464 465 466class RunResult: 467 """The result of running a command.""" 468 469 def __init__( 470 self, 471 ret: Union[int, ExitCode], 472 outlines: List[str], 473 errlines: List[str], 474 duration: float, 475 ) -> None: 476 try: 477 self.ret = pytest.ExitCode(ret) # type: Union[int, ExitCode] 478 """The return value.""" 479 except ValueError: 480 self.ret = ret 481 self.outlines = outlines 482 """List of lines captured from stdout.""" 483 self.errlines = errlines 484 """List of lines captured from stderr.""" 485 self.stdout = LineMatcher(outlines) 486 """:class:`LineMatcher` of stdout. 487 488 Use e.g. :func:`stdout.str() <LineMatcher.str()>` to reconstruct stdout, or the commonly used 489 :func:`stdout.fnmatch_lines() <LineMatcher.fnmatch_lines()>` method. 490 """ 491 self.stderr = LineMatcher(errlines) 492 """:class:`LineMatcher` of stderr.""" 493 self.duration = duration 494 """Duration in seconds.""" 495 496 def __repr__(self) -> str: 497 return ( 498 "<RunResult ret=%s len(stdout.lines)=%d len(stderr.lines)=%d duration=%.2fs>" 499 % (self.ret, len(self.stdout.lines), len(self.stderr.lines), self.duration) 500 ) 501 502 def parseoutcomes(self) -> Dict[str, int]: 503 """Return a dictionary of outcome noun -> count from parsing the terminal 504 output that the test process produced. 505 506 The returned nouns will always be in plural form:: 507 508 ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ==== 509 510 Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``. 511 """ 512 return self.parse_summary_nouns(self.outlines) 513 514 @classmethod 515 def parse_summary_nouns(cls, lines) -> Dict[str, int]: 516 """Extract the nouns from a pytest terminal summary line. 517 518 It always returns the plural noun for consistency:: 519 520 ======= 1 failed, 1 passed, 1 warning, 1 error in 0.13s ==== 521 522 Will return ``{"failed": 1, "passed": 1, "warnings": 1, "errors": 1}``. 523 """ 524 for line in reversed(lines): 525 if rex_session_duration.search(line): 526 outcomes = rex_outcome.findall(line) 527 ret = {noun: int(count) for (count, noun) in outcomes} 528 break 529 else: 530 raise ValueError("Pytest terminal summary report not found") 531 532 to_plural = { 533 "warning": "warnings", 534 "error": "errors", 535 } 536 return {to_plural.get(k, k): v for k, v in ret.items()} 537 538 def assert_outcomes( 539 self, 540 passed: int = 0, 541 skipped: int = 0, 542 failed: int = 0, 543 errors: int = 0, 544 xpassed: int = 0, 545 xfailed: int = 0, 546 ) -> None: 547 """Assert that the specified outcomes appear with the respective 548 numbers (0 means it didn't occur) in the text output from a test run.""" 549 __tracebackhide__ = True 550 551 d = self.parseoutcomes() 552 obtained = { 553 "passed": d.get("passed", 0), 554 "skipped": d.get("skipped", 0), 555 "failed": d.get("failed", 0), 556 "errors": d.get("errors", 0), 557 "xpassed": d.get("xpassed", 0), 558 "xfailed": d.get("xfailed", 0), 559 } 560 expected = { 561 "passed": passed, 562 "skipped": skipped, 563 "failed": failed, 564 "errors": errors, 565 "xpassed": xpassed, 566 "xfailed": xfailed, 567 } 568 assert obtained == expected 569 570 571class CwdSnapshot: 572 def __init__(self) -> None: 573 self.__saved = os.getcwd() 574 575 def restore(self) -> None: 576 os.chdir(self.__saved) 577 578 579class SysModulesSnapshot: 580 def __init__(self, preserve: Optional[Callable[[str], bool]] = None) -> None: 581 self.__preserve = preserve 582 self.__saved = dict(sys.modules) 583 584 def restore(self) -> None: 585 if self.__preserve: 586 self.__saved.update( 587 (k, m) for k, m in sys.modules.items() if self.__preserve(k) 588 ) 589 sys.modules.clear() 590 sys.modules.update(self.__saved) 591 592 593class SysPathsSnapshot: 594 def __init__(self) -> None: 595 self.__saved = list(sys.path), list(sys.meta_path) 596 597 def restore(self) -> None: 598 sys.path[:], sys.meta_path[:] = self.__saved 599 600 601@final 602class Testdir: 603 """Temporary test directory with tools to test/run pytest itself. 604 605 This is based on the :fixture:`tmpdir` fixture but provides a number of methods 606 which aid with testing pytest itself. Unless :py:meth:`chdir` is used all 607 methods will use :py:attr:`tmpdir` as their current working directory. 608 609 Attributes: 610 611 :ivar tmpdir: The :py:class:`py.path.local` instance of the temporary directory. 612 613 :ivar plugins: 614 A list of plugins to use with :py:meth:`parseconfig` and 615 :py:meth:`runpytest`. Initially this is an empty list but plugins can 616 be added to the list. The type of items to add to the list depends on 617 the method using them so refer to them for details. 618 """ 619 620 __test__ = False 621 622 CLOSE_STDIN = object 623 624 class TimeoutExpired(Exception): 625 pass 626 627 def __init__(self, request: FixtureRequest, tmpdir_factory: TempdirFactory) -> None: 628 self.request = request 629 self._mod_collections = ( 630 WeakKeyDictionary() 631 ) # type: WeakKeyDictionary[Module, List[Union[Item, Collector]]] 632 if request.function: 633 name = request.function.__name__ # type: str 634 else: 635 name = request.node.name 636 self._name = name 637 self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) 638 self.test_tmproot = tmpdir_factory.mktemp("tmp-" + name, numbered=True) 639 self.plugins = [] # type: List[Union[str, _PluggyPlugin]] 640 self._cwd_snapshot = CwdSnapshot() 641 self._sys_path_snapshot = SysPathsSnapshot() 642 self._sys_modules_snapshot = self.__take_sys_modules_snapshot() 643 self.chdir() 644 self.request.addfinalizer(self.finalize) 645 self._method = self.request.config.getoption("--runpytest") 646 647 mp = self.monkeypatch = MonkeyPatch() 648 mp.setenv("PYTEST_DEBUG_TEMPROOT", str(self.test_tmproot)) 649 # Ensure no unexpected caching via tox. 650 mp.delenv("TOX_ENV_DIR", raising=False) 651 # Discard outer pytest options. 652 mp.delenv("PYTEST_ADDOPTS", raising=False) 653 # Ensure no user config is used. 654 tmphome = str(self.tmpdir) 655 mp.setenv("HOME", tmphome) 656 mp.setenv("USERPROFILE", tmphome) 657 # Do not use colors for inner runs by default. 658 mp.setenv("PY_COLORS", "0") 659 660 def __repr__(self) -> str: 661 return "<Testdir {!r}>".format(self.tmpdir) 662 663 def __str__(self) -> str: 664 return str(self.tmpdir) 665 666 def finalize(self) -> None: 667 """Clean up global state artifacts. 668 669 Some methods modify the global interpreter state and this tries to 670 clean this up. It does not remove the temporary directory however so 671 it can be looked at after the test run has finished. 672 """ 673 self._sys_modules_snapshot.restore() 674 self._sys_path_snapshot.restore() 675 self._cwd_snapshot.restore() 676 self.monkeypatch.undo() 677 678 def __take_sys_modules_snapshot(self) -> SysModulesSnapshot: 679 # Some zope modules used by twisted-related tests keep internal state 680 # and can't be deleted; we had some trouble in the past with 681 # `zope.interface` for example. 682 def preserve_module(name): 683 return name.startswith("zope") 684 685 return SysModulesSnapshot(preserve=preserve_module) 686 687 def make_hook_recorder(self, pluginmanager: PytestPluginManager) -> HookRecorder: 688 """Create a new :py:class:`HookRecorder` for a PluginManager.""" 689 pluginmanager.reprec = reprec = HookRecorder(pluginmanager) 690 self.request.addfinalizer(reprec.finish_recording) 691 return reprec 692 693 def chdir(self) -> None: 694 """Cd into the temporary directory. 695 696 This is done automatically upon instantiation. 697 """ 698 self.tmpdir.chdir() 699 700 def _makefile(self, ext: str, lines, files, encoding: str = "utf-8"): 701 items = list(files.items()) 702 703 def to_text(s): 704 return s.decode(encoding) if isinstance(s, bytes) else str(s) 705 706 if lines: 707 source = "\n".join(to_text(x) for x in lines) 708 basename = self._name 709 items.insert(0, (basename, source)) 710 711 ret = None 712 for basename, value in items: 713 p = self.tmpdir.join(basename).new(ext=ext) 714 p.dirpath().ensure_dir() 715 source_ = Source(value) 716 source = "\n".join(to_text(line) for line in source_.lines) 717 p.write(source.strip().encode(encoding), "wb") 718 if ret is None: 719 ret = p 720 return ret 721 722 def makefile(self, ext: str, *args: str, **kwargs): 723 r"""Create new file(s) in the testdir. 724 725 :param str ext: 726 The extension the file(s) should use, including the dot, e.g. `.py`. 727 :param args: 728 All args are treated as strings and joined using newlines. 729 The result is written as contents to the file. The name of the 730 file is based on the test function requesting this fixture. 731 :param kwargs: 732 Each keyword is the name of a file, while the value of it will 733 be written as contents of the file. 734 735 Examples: 736 737 .. code-block:: python 738 739 testdir.makefile(".txt", "line1", "line2") 740 741 testdir.makefile(".ini", pytest="[pytest]\naddopts=-rs\n") 742 743 """ 744 return self._makefile(ext, args, kwargs) 745 746 def makeconftest(self, source): 747 """Write a contest.py file with 'source' as contents.""" 748 return self.makepyfile(conftest=source) 749 750 def makeini(self, source): 751 """Write a tox.ini file with 'source' as contents.""" 752 return self.makefile(".ini", tox=source) 753 754 def getinicfg(self, source) -> IniConfig: 755 """Return the pytest section from the tox.ini config file.""" 756 p = self.makeini(source) 757 return IniConfig(p)["pytest"] 758 759 def makepyprojecttoml(self, source): 760 """Write a pyproject.toml file with 'source' as contents. 761 762 .. versionadded:: 6.0 763 """ 764 return self.makefile(".toml", pyproject=source) 765 766 def makepyfile(self, *args, **kwargs): 767 r"""Shortcut for .makefile() with a .py extension. 768 769 Defaults to the test name with a '.py' extension, e.g test_foobar.py, overwriting 770 existing files. 771 772 Examples: 773 774 .. code-block:: python 775 776 def test_something(testdir): 777 # Initial file is created test_something.py. 778 testdir.makepyfile("foobar") 779 # To create multiple files, pass kwargs accordingly. 780 testdir.makepyfile(custom="foobar") 781 # At this point, both 'test_something.py' & 'custom.py' exist in the test directory. 782 783 """ 784 return self._makefile(".py", args, kwargs) 785 786 def maketxtfile(self, *args, **kwargs): 787 r"""Shortcut for .makefile() with a .txt extension. 788 789 Defaults to the test name with a '.txt' extension, e.g test_foobar.txt, overwriting 790 existing files. 791 792 Examples: 793 794 .. code-block:: python 795 796 def test_something(testdir): 797 # Initial file is created test_something.txt. 798 testdir.maketxtfile("foobar") 799 # To create multiple files, pass kwargs accordingly. 800 testdir.maketxtfile(custom="foobar") 801 # At this point, both 'test_something.txt' & 'custom.txt' exist in the test directory. 802 803 """ 804 return self._makefile(".txt", args, kwargs) 805 806 def syspathinsert(self, path=None) -> None: 807 """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. 808 809 This is undone automatically when this object dies at the end of each 810 test. 811 """ 812 if path is None: 813 path = self.tmpdir 814 815 self.monkeypatch.syspath_prepend(str(path)) 816 817 def mkdir(self, name) -> py.path.local: 818 """Create a new (sub)directory.""" 819 return self.tmpdir.mkdir(name) 820 821 def mkpydir(self, name) -> py.path.local: 822 """Create a new Python package. 823 824 This creates a (sub)directory with an empty ``__init__.py`` file so it 825 gets recognised as a Python package. 826 """ 827 p = self.mkdir(name) 828 p.ensure("__init__.py") 829 return p 830 831 def copy_example(self, name=None) -> py.path.local: 832 """Copy file from project's directory into the testdir. 833 834 :param str name: The name of the file to copy. 835 :returns: Path to the copied directory (inside ``self.tmpdir``). 836 """ 837 import warnings 838 from _pytest.warning_types import PYTESTER_COPY_EXAMPLE 839 840 warnings.warn(PYTESTER_COPY_EXAMPLE, stacklevel=2) 841 example_dir = self.request.config.getini("pytester_example_dir") 842 if example_dir is None: 843 raise ValueError("pytester_example_dir is unset, can't copy examples") 844 example_dir = self.request.config.rootdir.join(example_dir) 845 846 for extra_element in self.request.node.iter_markers("pytester_example_path"): 847 assert extra_element.args 848 example_dir = example_dir.join(*extra_element.args) 849 850 if name is None: 851 func_name = self._name 852 maybe_dir = example_dir / func_name 853 maybe_file = example_dir / (func_name + ".py") 854 855 if maybe_dir.isdir(): 856 example_path = maybe_dir 857 elif maybe_file.isfile(): 858 example_path = maybe_file 859 else: 860 raise LookupError( 861 "{} cant be found as module or package in {}".format( 862 func_name, example_dir.bestrelpath(self.request.config.rootdir) 863 ) 864 ) 865 else: 866 example_path = example_dir.join(name) 867 868 if example_path.isdir() and not example_path.join("__init__.py").isfile(): 869 example_path.copy(self.tmpdir) 870 return self.tmpdir 871 elif example_path.isfile(): 872 result = self.tmpdir.join(example_path.basename) 873 example_path.copy(result) 874 return result 875 else: 876 raise LookupError( 877 'example "{}" is not found as a file or directory'.format(example_path) 878 ) 879 880 Session = Session 881 882 def getnode(self, config: Config, arg): 883 """Return the collection node of a file. 884 885 :param _pytest.config.Config config: 886 A pytest config. 887 See :py:meth:`parseconfig` and :py:meth:`parseconfigure` for creating it. 888 :param py.path.local arg: 889 Path to the file. 890 """ 891 session = Session.from_config(config) 892 assert "::" not in str(arg) 893 p = py.path.local(arg) 894 config.hook.pytest_sessionstart(session=session) 895 res = session.perform_collect([str(p)], genitems=False)[0] 896 config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) 897 return res 898 899 def getpathnode(self, path): 900 """Return the collection node of a file. 901 902 This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to 903 create the (configured) pytest Config instance. 904 905 :param py.path.local path: Path to the file. 906 """ 907 config = self.parseconfigure(path) 908 session = Session.from_config(config) 909 x = session.fspath.bestrelpath(path) 910 config.hook.pytest_sessionstart(session=session) 911 res = session.perform_collect([x], genitems=False)[0] 912 config.hook.pytest_sessionfinish(session=session, exitstatus=ExitCode.OK) 913 return res 914 915 def genitems(self, colitems: Sequence[Union[Item, Collector]]) -> List[Item]: 916 """Generate all test items from a collection node. 917 918 This recurses into the collection node and returns a list of all the 919 test items contained within. 920 """ 921 session = colitems[0].session 922 result = [] # type: List[Item] 923 for colitem in colitems: 924 result.extend(session.genitems(colitem)) 925 return result 926 927 def runitem(self, source): 928 """Run the "test_func" Item. 929 930 The calling test instance (class containing the test method) must 931 provide a ``.getrunner()`` method which should return a runner which 932 can run the test protocol for a single item, e.g. 933 :py:func:`_pytest.runner.runtestprotocol`. 934 """ 935 # used from runner functional tests 936 item = self.getitem(source) 937 # the test class where we are called from wants to provide the runner 938 testclassinstance = self.request.instance 939 runner = testclassinstance.getrunner() 940 return runner(item) 941 942 def inline_runsource(self, source, *cmdlineargs) -> HookRecorder: 943 """Run a test module in process using ``pytest.main()``. 944 945 This run writes "source" into a temporary file and runs 946 ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance 947 for the result. 948 949 :param source: The source code of the test module. 950 951 :param cmdlineargs: Any extra command line arguments to use. 952 953 :returns: :py:class:`HookRecorder` instance of the result. 954 """ 955 p = self.makepyfile(source) 956 values = list(cmdlineargs) + [p] 957 return self.inline_run(*values) 958 959 def inline_genitems(self, *args) -> Tuple[List[Item], HookRecorder]: 960 """Run ``pytest.main(['--collectonly'])`` in-process. 961 962 Runs the :py:func:`pytest.main` function to run all of pytest inside 963 the test process itself like :py:meth:`inline_run`, but returns a 964 tuple of the collected items and a :py:class:`HookRecorder` instance. 965 """ 966 rec = self.inline_run("--collect-only", *args) 967 items = [x.item for x in rec.getcalls("pytest_itemcollected")] 968 return items, rec 969 970 def inline_run( 971 self, *args, plugins=(), no_reraise_ctrlc: bool = False 972 ) -> HookRecorder: 973 """Run ``pytest.main()`` in-process, returning a HookRecorder. 974 975 Runs the :py:func:`pytest.main` function to run all of pytest inside 976 the test process itself. This means it can return a 977 :py:class:`HookRecorder` instance which gives more detailed results 978 from that run than can be done by matching stdout/stderr from 979 :py:meth:`runpytest`. 980 981 :param args: 982 Command line arguments to pass to :py:func:`pytest.main`. 983 :param plugins: 984 Extra plugin instances the ``pytest.main()`` instance should use. 985 :param no_reraise_ctrlc: 986 Typically we reraise keyboard interrupts from the child run. If 987 True, the KeyboardInterrupt exception is captured. 988 989 :returns: A :py:class:`HookRecorder` instance. 990 """ 991 # (maybe a cpython bug?) the importlib cache sometimes isn't updated 992 # properly between file creation and inline_run (especially if imports 993 # are interspersed with file creation) 994 importlib.invalidate_caches() 995 996 plugins = list(plugins) 997 finalizers = [] 998 try: 999 # Any sys.module or sys.path changes done while running pytest 1000 # inline should be reverted after the test run completes to avoid 1001 # clashing with later inline tests run within the same pytest test, 1002 # e.g. just because they use matching test module names. 1003 finalizers.append(self.__take_sys_modules_snapshot().restore) 1004 finalizers.append(SysPathsSnapshot().restore) 1005 1006 # Important note: 1007 # - our tests should not leave any other references/registrations 1008 # laying around other than possibly loaded test modules 1009 # referenced from sys.modules, as nothing will clean those up 1010 # automatically 1011 1012 rec = [] 1013 1014 class Collect: 1015 def pytest_configure(x, config: Config) -> None: 1016 rec.append(self.make_hook_recorder(config.pluginmanager)) 1017 1018 plugins.append(Collect()) 1019 ret = pytest.main(list(args), plugins=plugins) 1020 if len(rec) == 1: 1021 reprec = rec.pop() 1022 else: 1023 1024 class reprec: # type: ignore 1025 pass 1026 1027 reprec.ret = ret 1028 1029 # Typically we reraise keyboard interrupts from the child run 1030 # because it's our user requesting interruption of the testing. 1031 if ret == ExitCode.INTERRUPTED and not no_reraise_ctrlc: 1032 calls = reprec.getcalls("pytest_keyboard_interrupt") 1033 if calls and calls[-1].excinfo.type == KeyboardInterrupt: 1034 raise KeyboardInterrupt() 1035 return reprec 1036 finally: 1037 for finalizer in finalizers: 1038 finalizer() 1039 1040 def runpytest_inprocess(self, *args, **kwargs) -> RunResult: 1041 """Return result of running pytest in-process, providing a similar 1042 interface to what self.runpytest() provides.""" 1043 syspathinsert = kwargs.pop("syspathinsert", False) 1044 1045 if syspathinsert: 1046 self.syspathinsert() 1047 now = timing.time() 1048 capture = _get_multicapture("sys") 1049 capture.start_capturing() 1050 try: 1051 try: 1052 reprec = self.inline_run(*args, **kwargs) 1053 except SystemExit as e: 1054 ret = e.args[0] 1055 try: 1056 ret = ExitCode(e.args[0]) 1057 except ValueError: 1058 pass 1059 1060 class reprec: # type: ignore 1061 ret = ret 1062 1063 except Exception: 1064 traceback.print_exc() 1065 1066 class reprec: # type: ignore 1067 ret = ExitCode(3) 1068 1069 finally: 1070 out, err = capture.readouterr() 1071 capture.stop_capturing() 1072 sys.stdout.write(out) 1073 sys.stderr.write(err) 1074 1075 assert reprec.ret is not None 1076 res = RunResult( 1077 reprec.ret, out.splitlines(), err.splitlines(), timing.time() - now 1078 ) 1079 res.reprec = reprec # type: ignore 1080 return res 1081 1082 def runpytest(self, *args, **kwargs) -> RunResult: 1083 """Run pytest inline or in a subprocess, depending on the command line 1084 option "--runpytest" and return a :py:class:`RunResult`.""" 1085 args = self._ensure_basetemp(args) 1086 if self._method == "inprocess": 1087 return self.runpytest_inprocess(*args, **kwargs) 1088 elif self._method == "subprocess": 1089 return self.runpytest_subprocess(*args, **kwargs) 1090 raise RuntimeError("Unrecognized runpytest option: {}".format(self._method)) 1091 1092 def _ensure_basetemp(self, args): 1093 args = list(args) 1094 for x in args: 1095 if str(x).startswith("--basetemp"): 1096 break 1097 else: 1098 args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp")) 1099 return args 1100 1101 def parseconfig(self, *args) -> Config: 1102 """Return a new pytest Config instance from given commandline args. 1103 1104 This invokes the pytest bootstrapping code in _pytest.config to create 1105 a new :py:class:`_pytest.core.PluginManager` and call the 1106 pytest_cmdline_parse hook to create a new 1107 :py:class:`_pytest.config.Config` instance. 1108 1109 If :py:attr:`plugins` has been populated they should be plugin modules 1110 to be registered with the PluginManager. 1111 """ 1112 args = self._ensure_basetemp(args) 1113 1114 import _pytest.config 1115 1116 config = _pytest.config._prepareconfig(args, self.plugins) # type: ignore[arg-type] 1117 # we don't know what the test will do with this half-setup config 1118 # object and thus we make sure it gets unconfigured properly in any 1119 # case (otherwise capturing could still be active, for example) 1120 self.request.addfinalizer(config._ensure_unconfigure) 1121 return config 1122 1123 def parseconfigure(self, *args) -> Config: 1124 """Return a new pytest configured Config instance. 1125 1126 Returns a new :py:class:`_pytest.config.Config` instance like 1127 :py:meth:`parseconfig`, but also calls the pytest_configure hook. 1128 """ 1129 config = self.parseconfig(*args) 1130 config._do_configure() 1131 return config 1132 1133 def getitem(self, source, funcname: str = "test_func") -> Item: 1134 """Return the test item for a test function. 1135 1136 Writes the source to a python file and runs pytest's collection on 1137 the resulting module, returning the test item for the requested 1138 function name. 1139 1140 :param source: 1141 The module source. 1142 :param funcname: 1143 The name of the test function for which to return a test item. 1144 """ 1145 items = self.getitems(source) 1146 for item in items: 1147 if item.name == funcname: 1148 return item 1149 assert 0, "{!r} item not found in module:\n{}\nitems: {}".format( 1150 funcname, source, items 1151 ) 1152 1153 def getitems(self, source) -> List[Item]: 1154 """Return all test items collected from the module. 1155 1156 Writes the source to a Python file and runs pytest's collection on 1157 the resulting module, returning all test items contained within. 1158 """ 1159 modcol = self.getmodulecol(source) 1160 return self.genitems([modcol]) 1161 1162 def getmodulecol(self, source, configargs=(), withinit: bool = False): 1163 """Return the module collection node for ``source``. 1164 1165 Writes ``source`` to a file using :py:meth:`makepyfile` and then 1166 runs the pytest collection on it, returning the collection node for the 1167 test module. 1168 1169 :param source: 1170 The source code of the module to collect. 1171 1172 :param configargs: 1173 Any extra arguments to pass to :py:meth:`parseconfigure`. 1174 1175 :param withinit: 1176 Whether to also write an ``__init__.py`` file to the same 1177 directory to ensure it is a package. 1178 """ 1179 if isinstance(source, Path): 1180 path = self.tmpdir.join(str(source)) 1181 assert not withinit, "not supported for paths" 1182 else: 1183 kw = {self._name: Source(source).strip()} 1184 path = self.makepyfile(**kw) 1185 if withinit: 1186 self.makepyfile(__init__="#") 1187 self.config = config = self.parseconfigure(path, *configargs) 1188 return self.getnode(config, path) 1189 1190 def collect_by_name( 1191 self, modcol: Module, name: str 1192 ) -> Optional[Union[Item, Collector]]: 1193 """Return the collection node for name from the module collection. 1194 1195 Searchs a module collection node for a collection node matching the 1196 given name. 1197 1198 :param modcol: A module collection node; see :py:meth:`getmodulecol`. 1199 :param name: The name of the node to return. 1200 """ 1201 if modcol not in self._mod_collections: 1202 self._mod_collections[modcol] = list(modcol.collect()) 1203 for colitem in self._mod_collections[modcol]: 1204 if colitem.name == name: 1205 return colitem 1206 return None 1207 1208 def popen( 1209 self, 1210 cmdargs, 1211 stdout=subprocess.PIPE, 1212 stderr=subprocess.PIPE, 1213 stdin=CLOSE_STDIN, 1214 **kw 1215 ): 1216 """Invoke subprocess.Popen. 1217 1218 Calls subprocess.Popen making sure the current working directory is 1219 in the PYTHONPATH. 1220 1221 You probably want to use :py:meth:`run` instead. 1222 """ 1223 env = os.environ.copy() 1224 env["PYTHONPATH"] = os.pathsep.join( 1225 filter(None, [os.getcwd(), env.get("PYTHONPATH", "")]) 1226 ) 1227 kw["env"] = env 1228 1229 if stdin is Testdir.CLOSE_STDIN: 1230 kw["stdin"] = subprocess.PIPE 1231 elif isinstance(stdin, bytes): 1232 kw["stdin"] = subprocess.PIPE 1233 else: 1234 kw["stdin"] = stdin 1235 1236 popen = subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) 1237 if stdin is Testdir.CLOSE_STDIN: 1238 assert popen.stdin is not None 1239 popen.stdin.close() 1240 elif isinstance(stdin, bytes): 1241 assert popen.stdin is not None 1242 popen.stdin.write(stdin) 1243 1244 return popen 1245 1246 def run( 1247 self, *cmdargs, timeout: Optional[float] = None, stdin=CLOSE_STDIN 1248 ) -> RunResult: 1249 """Run a command with arguments. 1250 1251 Run a process using subprocess.Popen saving the stdout and stderr. 1252 1253 :param args: 1254 The sequence of arguments to pass to `subprocess.Popen()`. 1255 :param timeout: 1256 The period in seconds after which to timeout and raise 1257 :py:class:`Testdir.TimeoutExpired`. 1258 :param stdin: 1259 Optional standard input. Bytes are being send, closing 1260 the pipe, otherwise it is passed through to ``popen``. 1261 Defaults to ``CLOSE_STDIN``, which translates to using a pipe 1262 (``subprocess.PIPE``) that gets closed. 1263 1264 :rtype: RunResult 1265 """ 1266 __tracebackhide__ = True 1267 1268 cmdargs = tuple( 1269 str(arg) if isinstance(arg, py.path.local) else arg for arg in cmdargs 1270 ) 1271 p1 = self.tmpdir.join("stdout") 1272 p2 = self.tmpdir.join("stderr") 1273 print("running:", *cmdargs) 1274 print(" in:", py.path.local()) 1275 f1 = open(str(p1), "w", encoding="utf8") 1276 f2 = open(str(p2), "w", encoding="utf8") 1277 try: 1278 now = timing.time() 1279 popen = self.popen( 1280 cmdargs, 1281 stdin=stdin, 1282 stdout=f1, 1283 stderr=f2, 1284 close_fds=(sys.platform != "win32"), 1285 ) 1286 if isinstance(stdin, bytes): 1287 popen.stdin.close() 1288 1289 def handle_timeout() -> None: 1290 __tracebackhide__ = True 1291 1292 timeout_message = ( 1293 "{seconds} second timeout expired running:" 1294 " {command}".format(seconds=timeout, command=cmdargs) 1295 ) 1296 1297 popen.kill() 1298 popen.wait() 1299 raise self.TimeoutExpired(timeout_message) 1300 1301 if timeout is None: 1302 ret = popen.wait() 1303 else: 1304 try: 1305 ret = popen.wait(timeout) 1306 except subprocess.TimeoutExpired: 1307 handle_timeout() 1308 finally: 1309 f1.close() 1310 f2.close() 1311 f1 = open(str(p1), encoding="utf8") 1312 f2 = open(str(p2), encoding="utf8") 1313 try: 1314 out = f1.read().splitlines() 1315 err = f2.read().splitlines() 1316 finally: 1317 f1.close() 1318 f2.close() 1319 self._dump_lines(out, sys.stdout) 1320 self._dump_lines(err, sys.stderr) 1321 try: 1322 ret = ExitCode(ret) 1323 except ValueError: 1324 pass 1325 return RunResult(ret, out, err, timing.time() - now) 1326 1327 def _dump_lines(self, lines, fp): 1328 try: 1329 for line in lines: 1330 print(line, file=fp) 1331 except UnicodeEncodeError: 1332 print("couldn't print to {} because of encoding".format(fp)) 1333 1334 def _getpytestargs(self) -> Tuple[str, ...]: 1335 return sys.executable, "-mpytest" 1336 1337 def runpython(self, script) -> RunResult: 1338 """Run a python script using sys.executable as interpreter. 1339 1340 :rtype: RunResult 1341 """ 1342 return self.run(sys.executable, script) 1343 1344 def runpython_c(self, command): 1345 """Run python -c "command". 1346 1347 :rtype: RunResult 1348 """ 1349 return self.run(sys.executable, "-c", command) 1350 1351 def runpytest_subprocess(self, *args, timeout: Optional[float] = None) -> RunResult: 1352 """Run pytest as a subprocess with given arguments. 1353 1354 Any plugins added to the :py:attr:`plugins` list will be added using the 1355 ``-p`` command line option. Additionally ``--basetemp`` is used to put 1356 any temporary files and directories in a numbered directory prefixed 1357 with "runpytest-" to not conflict with the normal numbered pytest 1358 location for temporary files and directories. 1359 1360 :param args: 1361 The sequence of arguments to pass to the pytest subprocess. 1362 :param timeout: 1363 The period in seconds after which to timeout and raise 1364 :py:class:`Testdir.TimeoutExpired`. 1365 1366 :rtype: RunResult 1367 """ 1368 __tracebackhide__ = True 1369 p = make_numbered_dir(root=Path(str(self.tmpdir)), prefix="runpytest-") 1370 args = ("--basetemp=%s" % p,) + args 1371 plugins = [x for x in self.plugins if isinstance(x, str)] 1372 if plugins: 1373 args = ("-p", plugins[0]) + args 1374 args = self._getpytestargs() + args 1375 return self.run(*args, timeout=timeout) 1376 1377 def spawn_pytest( 1378 self, string: str, expect_timeout: float = 10.0 1379 ) -> "pexpect.spawn": 1380 """Run pytest using pexpect. 1381 1382 This makes sure to use the right pytest and sets up the temporary 1383 directory locations. 1384 1385 The pexpect child is returned. 1386 """ 1387 basetemp = self.tmpdir.mkdir("temp-pexpect") 1388 invoke = " ".join(map(str, self._getpytestargs())) 1389 cmd = "{} --basetemp={} {}".format(invoke, basetemp, string) 1390 return self.spawn(cmd, expect_timeout=expect_timeout) 1391 1392 def spawn(self, cmd: str, expect_timeout: float = 10.0) -> "pexpect.spawn": 1393 """Run a command using pexpect. 1394 1395 The pexpect child is returned. 1396 """ 1397 pexpect = pytest.importorskip("pexpect", "3.0") 1398 if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): 1399 pytest.skip("pypy-64 bit not supported") 1400 if not hasattr(pexpect, "spawn"): 1401 pytest.skip("pexpect.spawn not available") 1402 logfile = self.tmpdir.join("spawn.out").open("wb") 1403 1404 child = pexpect.spawn(cmd, logfile=logfile) 1405 self.request.addfinalizer(logfile.close) 1406 child.timeout = expect_timeout 1407 return child 1408 1409 1410class LineComp: 1411 def __init__(self) -> None: 1412 self.stringio = StringIO() 1413 """:class:`python:io.StringIO()` instance used for input.""" 1414 1415 def assert_contains_lines(self, lines2: Sequence[str]) -> None: 1416 """Assert that ``lines2`` are contained (linearly) in :attr:`stringio`'s value. 1417 1418 Lines are matched using :func:`LineMatcher.fnmatch_lines`. 1419 """ 1420 __tracebackhide__ = True 1421 val = self.stringio.getvalue() 1422 self.stringio.truncate(0) 1423 self.stringio.seek(0) 1424 lines1 = val.split("\n") 1425 LineMatcher(lines1).fnmatch_lines(lines2) 1426 1427 1428class LineMatcher: 1429 """Flexible matching of text. 1430 1431 This is a convenience class to test large texts like the output of 1432 commands. 1433 1434 The constructor takes a list of lines without their trailing newlines, i.e. 1435 ``text.splitlines()``. 1436 """ 1437 1438 def __init__(self, lines: List[str]) -> None: 1439 self.lines = lines 1440 self._log_output = [] # type: List[str] 1441 1442 def _getlines(self, lines2: Union[str, Sequence[str], Source]) -> Sequence[str]: 1443 if isinstance(lines2, str): 1444 lines2 = Source(lines2) 1445 if isinstance(lines2, Source): 1446 lines2 = lines2.strip().lines 1447 return lines2 1448 1449 def fnmatch_lines_random(self, lines2: Sequence[str]) -> None: 1450 """Check lines exist in the output in any order (using :func:`python:fnmatch.fnmatch`).""" 1451 __tracebackhide__ = True 1452 self._match_lines_random(lines2, fnmatch) 1453 1454 def re_match_lines_random(self, lines2: Sequence[str]) -> None: 1455 """Check lines exist in the output in any order (using :func:`python:re.match`).""" 1456 __tracebackhide__ = True 1457 self._match_lines_random(lines2, lambda name, pat: bool(re.match(pat, name))) 1458 1459 def _match_lines_random( 1460 self, lines2: Sequence[str], match_func: Callable[[str, str], bool] 1461 ) -> None: 1462 __tracebackhide__ = True 1463 lines2 = self._getlines(lines2) 1464 for line in lines2: 1465 for x in self.lines: 1466 if line == x or match_func(x, line): 1467 self._log("matched: ", repr(line)) 1468 break 1469 else: 1470 msg = "line %r not found in output" % line 1471 self._log(msg) 1472 self._fail(msg) 1473 1474 def get_lines_after(self, fnline: str) -> Sequence[str]: 1475 """Return all lines following the given line in the text. 1476 1477 The given line can contain glob wildcards. 1478 """ 1479 for i, line in enumerate(self.lines): 1480 if fnline == line or fnmatch(line, fnline): 1481 return self.lines[i + 1 :] 1482 raise ValueError("line %r not found in output" % fnline) 1483 1484 def _log(self, *args) -> None: 1485 self._log_output.append(" ".join(str(x) for x in args)) 1486 1487 @property 1488 def _log_text(self) -> str: 1489 return "\n".join(self._log_output) 1490 1491 def fnmatch_lines( 1492 self, lines2: Sequence[str], *, consecutive: bool = False 1493 ) -> None: 1494 """Check lines exist in the output (using :func:`python:fnmatch.fnmatch`). 1495 1496 The argument is a list of lines which have to match and can use glob 1497 wildcards. If they do not match a pytest.fail() is called. The 1498 matches and non-matches are also shown as part of the error message. 1499 1500 :param lines2: String patterns to match. 1501 :param consecutive: Match lines consecutively? 1502 """ 1503 __tracebackhide__ = True 1504 self._match_lines(lines2, fnmatch, "fnmatch", consecutive=consecutive) 1505 1506 def re_match_lines( 1507 self, lines2: Sequence[str], *, consecutive: bool = False 1508 ) -> None: 1509 """Check lines exist in the output (using :func:`python:re.match`). 1510 1511 The argument is a list of lines which have to match using ``re.match``. 1512 If they do not match a pytest.fail() is called. 1513 1514 The matches and non-matches are also shown as part of the error message. 1515 1516 :param lines2: string patterns to match. 1517 :param consecutive: match lines consecutively? 1518 """ 1519 __tracebackhide__ = True 1520 self._match_lines( 1521 lines2, 1522 lambda name, pat: bool(re.match(pat, name)), 1523 "re.match", 1524 consecutive=consecutive, 1525 ) 1526 1527 def _match_lines( 1528 self, 1529 lines2: Sequence[str], 1530 match_func: Callable[[str, str], bool], 1531 match_nickname: str, 1532 *, 1533 consecutive: bool = False 1534 ) -> None: 1535 """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. 1536 1537 :param Sequence[str] lines2: 1538 List of string patterns to match. The actual format depends on 1539 ``match_func``. 1540 :param match_func: 1541 A callable ``match_func(line, pattern)`` where line is the 1542 captured line from stdout/stderr and pattern is the matching 1543 pattern. 1544 :param str match_nickname: 1545 The nickname for the match function that will be logged to stdout 1546 when a match occurs. 1547 :param consecutive: 1548 Match lines consecutively? 1549 """ 1550 if not isinstance(lines2, collections.abc.Sequence): 1551 raise TypeError("invalid type for lines2: {}".format(type(lines2).__name__)) 1552 lines2 = self._getlines(lines2) 1553 lines1 = self.lines[:] 1554 nextline = None 1555 extralines = [] 1556 __tracebackhide__ = True 1557 wnick = len(match_nickname) + 1 1558 started = False 1559 for line in lines2: 1560 nomatchprinted = False 1561 while lines1: 1562 nextline = lines1.pop(0) 1563 if line == nextline: 1564 self._log("exact match:", repr(line)) 1565 started = True 1566 break 1567 elif match_func(nextline, line): 1568 self._log("%s:" % match_nickname, repr(line)) 1569 self._log( 1570 "{:>{width}}".format("with:", width=wnick), repr(nextline) 1571 ) 1572 started = True 1573 break 1574 else: 1575 if consecutive and started: 1576 msg = "no consecutive match: {!r}".format(line) 1577 self._log(msg) 1578 self._log( 1579 "{:>{width}}".format("with:", width=wnick), repr(nextline) 1580 ) 1581 self._fail(msg) 1582 if not nomatchprinted: 1583 self._log( 1584 "{:>{width}}".format("nomatch:", width=wnick), repr(line) 1585 ) 1586 nomatchprinted = True 1587 self._log("{:>{width}}".format("and:", width=wnick), repr(nextline)) 1588 extralines.append(nextline) 1589 else: 1590 msg = "remains unmatched: {!r}".format(line) 1591 self._log(msg) 1592 self._fail(msg) 1593 self._log_output = [] 1594 1595 def no_fnmatch_line(self, pat: str) -> None: 1596 """Ensure captured lines do not match the given pattern, using ``fnmatch.fnmatch``. 1597 1598 :param str pat: The pattern to match lines. 1599 """ 1600 __tracebackhide__ = True 1601 self._no_match_line(pat, fnmatch, "fnmatch") 1602 1603 def no_re_match_line(self, pat: str) -> None: 1604 """Ensure captured lines do not match the given pattern, using ``re.match``. 1605 1606 :param str pat: The regular expression to match lines. 1607 """ 1608 __tracebackhide__ = True 1609 self._no_match_line( 1610 pat, lambda name, pat: bool(re.match(pat, name)), "re.match" 1611 ) 1612 1613 def _no_match_line( 1614 self, pat: str, match_func: Callable[[str, str], bool], match_nickname: str 1615 ) -> None: 1616 """Ensure captured lines does not have a the given pattern, using ``fnmatch.fnmatch``. 1617 1618 :param str pat: The pattern to match lines. 1619 """ 1620 __tracebackhide__ = True 1621 nomatch_printed = False 1622 wnick = len(match_nickname) + 1 1623 for line in self.lines: 1624 if match_func(line, pat): 1625 msg = "{}: {!r}".format(match_nickname, pat) 1626 self._log(msg) 1627 self._log("{:>{width}}".format("with:", width=wnick), repr(line)) 1628 self._fail(msg) 1629 else: 1630 if not nomatch_printed: 1631 self._log("{:>{width}}".format("nomatch:", width=wnick), repr(pat)) 1632 nomatch_printed = True 1633 self._log("{:>{width}}".format("and:", width=wnick), repr(line)) 1634 self._log_output = [] 1635 1636 def _fail(self, msg: str) -> None: 1637 __tracebackhide__ = True 1638 log_text = self._log_text 1639 self._log_output = [] 1640 pytest.fail(log_text) 1641 1642 def str(self) -> str: 1643 """Return the entire original text.""" 1644 return "\n".join(self.lines) 1645