1"""(disabled by default) support for testing pytest and pytest plugins.""" 2from __future__ import absolute_import, division, print_function 3 4import codecs 5import gc 6import os 7import platform 8import re 9import subprocess 10import six 11import sys 12import time 13import traceback 14from fnmatch import fnmatch 15 16from weakref import WeakKeyDictionary 17 18from _pytest.capture import MultiCapture, SysCapture 19from _pytest._code import Source 20import py 21import pytest 22from _pytest.main import Session, EXIT_OK 23from _pytest.assertion.rewrite import AssertionRewritingHook 24 25 26PYTEST_FULLPATH = os.path.abspath(pytest.__file__.rstrip("oc")).replace( 27 "$py.class", ".py" 28) 29 30 31IGNORE_PAM = [ # filenames added when obtaining details about the current user 32 u"/var/lib/sss/mc/passwd" 33] 34 35 36def pytest_addoption(parser): 37 parser.addoption( 38 "--lsof", 39 action="store_true", 40 dest="lsof", 41 default=False, 42 help=("run FD checks if lsof is available"), 43 ) 44 45 parser.addoption( 46 "--runpytest", 47 default="inprocess", 48 dest="runpytest", 49 choices=("inprocess", "subprocess"), 50 help=( 51 "run pytest sub runs in tests using an 'inprocess' " 52 "or 'subprocess' (python -m main) method" 53 ), 54 ) 55 56 57def pytest_configure(config): 58 if config.getvalue("lsof"): 59 checker = LsofFdLeakChecker() 60 if checker.matching_platform(): 61 config.pluginmanager.register(checker) 62 63 64class LsofFdLeakChecker(object): 65 66 def get_open_files(self): 67 out = self._exec_lsof() 68 open_files = self._parse_lsof_output(out) 69 return open_files 70 71 def _exec_lsof(self): 72 pid = os.getpid() 73 return py.process.cmdexec("lsof -Ffn0 -p %d" % pid) 74 75 def _parse_lsof_output(self, out): 76 77 def isopen(line): 78 return line.startswith("f") and ( 79 "deleted" not in line 80 and "mem" not in line 81 and "txt" not in line 82 and "cwd" not in line 83 ) 84 85 open_files = [] 86 87 for line in out.split("\n"): 88 if isopen(line): 89 fields = line.split("\0") 90 fd = fields[0][1:] 91 filename = fields[1][1:] 92 if filename in IGNORE_PAM: 93 continue 94 if filename.startswith("/"): 95 open_files.append((fd, filename)) 96 97 return open_files 98 99 def matching_platform(self): 100 try: 101 py.process.cmdexec("lsof -v") 102 except (py.process.cmdexec.Error, UnicodeDecodeError): 103 # cmdexec may raise UnicodeDecodeError on Windows systems with 104 # locale other than English: 105 # https://bitbucket.org/pytest-dev/py/issues/66 106 return False 107 else: 108 return True 109 110 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 111 def pytest_runtest_protocol(self, item): 112 lines1 = self.get_open_files() 113 yield 114 if hasattr(sys, "pypy_version_info"): 115 gc.collect() 116 lines2 = self.get_open_files() 117 118 new_fds = {t[0] for t in lines2} - {t[0] for t in lines1} 119 leaked_files = [t for t in lines2 if t[0] in new_fds] 120 if leaked_files: 121 error = [] 122 error.append("***** %s FD leakage detected" % len(leaked_files)) 123 error.extend([str(f) for f in leaked_files]) 124 error.append("*** Before:") 125 error.extend([str(f) for f in lines1]) 126 error.append("*** After:") 127 error.extend([str(f) for f in lines2]) 128 error.append(error[0]) 129 error.append("*** function %s:%s: %s " % item.location) 130 error.append("See issue #2366") 131 item.warn("", "\n".join(error)) 132 133 134# XXX copied from execnet's conftest.py - needs to be merged 135winpymap = { 136 "python2.7": r"C:\Python27\python.exe", 137 "python3.4": r"C:\Python34\python.exe", 138 "python3.5": r"C:\Python35\python.exe", 139 "python3.6": r"C:\Python36\python.exe", 140} 141 142 143def getexecutable(name, cache={}): 144 try: 145 return cache[name] 146 except KeyError: 147 executable = py.path.local.sysfind(name) 148 if executable: 149 import subprocess 150 151 popen = subprocess.Popen( 152 [str(executable), "--version"], 153 universal_newlines=True, 154 stderr=subprocess.PIPE, 155 ) 156 out, err = popen.communicate() 157 if name == "jython": 158 if not err or "2.5" not in err: 159 executable = None 160 if "2.5.2" in err: 161 executable = None # http://bugs.jython.org/issue1790 162 elif popen.returncode != 0: 163 # handle pyenv's 127 164 executable = None 165 cache[name] = executable 166 return executable 167 168 169@pytest.fixture(params=["python2.7", "python3.4", "pypy", "pypy3"]) 170def anypython(request): 171 name = request.param 172 executable = getexecutable(name) 173 if executable is None: 174 if sys.platform == "win32": 175 executable = winpymap.get(name, None) 176 if executable: 177 executable = py.path.local(executable) 178 if executable.check(): 179 return executable 180 pytest.skip("no suitable %s found" % (name,)) 181 return executable 182 183 184# used at least by pytest-xdist plugin 185 186 187@pytest.fixture 188def _pytest(request): 189 """Return a helper which offers a gethookrecorder(hook) method which 190 returns a HookRecorder instance which helps to make assertions about called 191 hooks. 192 193 """ 194 return PytestArg(request) 195 196 197class PytestArg(object): 198 199 def __init__(self, request): 200 self.request = request 201 202 def gethookrecorder(self, hook): 203 hookrecorder = HookRecorder(hook._pm) 204 self.request.addfinalizer(hookrecorder.finish_recording) 205 return hookrecorder 206 207 208def get_public_names(values): 209 """Only return names from iterator values without a leading underscore.""" 210 return [x for x in values if x[0] != "_"] 211 212 213class ParsedCall(object): 214 215 def __init__(self, name, kwargs): 216 self.__dict__.update(kwargs) 217 self._name = name 218 219 def __repr__(self): 220 d = self.__dict__.copy() 221 del d["_name"] 222 return "<ParsedCall %r(**%r)>" % (self._name, d) 223 224 225class HookRecorder(object): 226 """Record all hooks called in a plugin manager. 227 228 This wraps all the hook calls in the plugin manager, recording each call 229 before propagating the normal calls. 230 231 """ 232 233 def __init__(self, pluginmanager): 234 self._pluginmanager = pluginmanager 235 self.calls = [] 236 237 def before(hook_name, hook_impls, kwargs): 238 self.calls.append(ParsedCall(hook_name, kwargs)) 239 240 def after(outcome, hook_name, hook_impls, kwargs): 241 pass 242 243 self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) 244 245 def finish_recording(self): 246 self._undo_wrapping() 247 248 def getcalls(self, names): 249 if isinstance(names, str): 250 names = names.split() 251 return [call for call in self.calls if call._name in names] 252 253 def assert_contains(self, entries): 254 __tracebackhide__ = True 255 i = 0 256 entries = list(entries) 257 backlocals = sys._getframe(1).f_locals 258 while entries: 259 name, check = entries.pop(0) 260 for ind, call in enumerate(self.calls[i:]): 261 if call._name == name: 262 print("NAMEMATCH", name, call) 263 if eval(check, backlocals, call.__dict__): 264 print("CHECKERMATCH", repr(check), "->", call) 265 else: 266 print("NOCHECKERMATCH", repr(check), "-", call) 267 continue 268 i += ind + 1 269 break 270 print("NONAMEMATCH", name, "with", call) 271 else: 272 pytest.fail("could not find %r check %r" % (name, check)) 273 274 def popcall(self, name): 275 __tracebackhide__ = True 276 for i, call in enumerate(self.calls): 277 if call._name == name: 278 del self.calls[i] 279 return call 280 lines = ["could not find call %r, in:" % (name,)] 281 lines.extend([" %s" % str(x) for x in self.calls]) 282 pytest.fail("\n".join(lines)) 283 284 def getcall(self, name): 285 values = self.getcalls(name) 286 assert len(values) == 1, (name, values) 287 return values[0] 288 289 # functionality for test reports 290 291 def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): 292 return [x.report for x in self.getcalls(names)] 293 294 def matchreport( 295 self, 296 inamepart="", 297 names="pytest_runtest_logreport pytest_collectreport", 298 when=None, 299 ): 300 """return a testreport whose dotted import path matches""" 301 values = [] 302 for rep in self.getreports(names=names): 303 try: 304 if not when and rep.when != "call" and rep.passed: 305 # setup/teardown passing reports - let's ignore those 306 continue 307 except AttributeError: 308 pass 309 if when and getattr(rep, "when", None) != when: 310 continue 311 if not inamepart or inamepart in rep.nodeid.split("::"): 312 values.append(rep) 313 if not values: 314 raise ValueError( 315 "could not find test report matching %r: " 316 "no test reports at all!" % (inamepart,) 317 ) 318 if len(values) > 1: 319 raise ValueError( 320 "found 2 or more testreports matching %r: %s" % (inamepart, values) 321 ) 322 return values[0] 323 324 def getfailures(self, names="pytest_runtest_logreport pytest_collectreport"): 325 return [rep for rep in self.getreports(names) if rep.failed] 326 327 def getfailedcollections(self): 328 return self.getfailures("pytest_collectreport") 329 330 def listoutcomes(self): 331 passed = [] 332 skipped = [] 333 failed = [] 334 for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"): 335 if rep.passed: 336 if getattr(rep, "when", None) == "call": 337 passed.append(rep) 338 elif rep.skipped: 339 skipped.append(rep) 340 elif rep.failed: 341 failed.append(rep) 342 return passed, skipped, failed 343 344 def countoutcomes(self): 345 return [len(x) for x in self.listoutcomes()] 346 347 def assertoutcome(self, passed=0, skipped=0, failed=0): 348 realpassed, realskipped, realfailed = self.listoutcomes() 349 assert passed == len(realpassed) 350 assert skipped == len(realskipped) 351 assert failed == len(realfailed) 352 353 def clear(self): 354 self.calls[:] = [] 355 356 357@pytest.fixture 358def linecomp(request): 359 return LineComp() 360 361 362@pytest.fixture(name="LineMatcher") 363def LineMatcher_fixture(request): 364 return LineMatcher 365 366 367@pytest.fixture 368def testdir(request, tmpdir_factory): 369 return Testdir(request, tmpdir_factory) 370 371 372rex_outcome = re.compile(r"(\d+) ([\w-]+)") 373 374 375class RunResult(object): 376 """The result of running a command. 377 378 Attributes: 379 380 :ret: the return value 381 :outlines: list of lines captured from stdout 382 :errlines: list of lines captures from stderr 383 :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to 384 reconstruct stdout or the commonly used ``stdout.fnmatch_lines()`` 385 method 386 :stderr: :py:class:`LineMatcher` of stderr 387 :duration: duration in seconds 388 389 """ 390 391 def __init__(self, ret, outlines, errlines, duration): 392 self.ret = ret 393 self.outlines = outlines 394 self.errlines = errlines 395 self.stdout = LineMatcher(outlines) 396 self.stderr = LineMatcher(errlines) 397 self.duration = duration 398 399 def parseoutcomes(self): 400 """Return a dictionary of outcomestring->num from parsing the terminal 401 output that the test process produced. 402 403 """ 404 for line in reversed(self.outlines): 405 if "seconds" in line: 406 outcomes = rex_outcome.findall(line) 407 if outcomes: 408 d = {} 409 for num, cat in outcomes: 410 d[cat] = int(num) 411 return d 412 raise ValueError("Pytest terminal report not found") 413 414 def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0): 415 """Assert that the specified outcomes appear with the respective 416 numbers (0 means it didn't occur) in the text output from a test run. 417 418 """ 419 d = self.parseoutcomes() 420 obtained = { 421 "passed": d.get("passed", 0), 422 "skipped": d.get("skipped", 0), 423 "failed": d.get("failed", 0), 424 "error": d.get("error", 0), 425 } 426 assert ( 427 obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error) 428 ) 429 430 431class CwdSnapshot(object): 432 433 def __init__(self): 434 self.__saved = os.getcwd() 435 436 def restore(self): 437 os.chdir(self.__saved) 438 439 440class SysModulesSnapshot(object): 441 442 def __init__(self, preserve=None): 443 self.__preserve = preserve 444 self.__saved = dict(sys.modules) 445 446 def restore(self): 447 if self.__preserve: 448 self.__saved.update( 449 (k, m) for k, m in sys.modules.items() if self.__preserve(k) 450 ) 451 sys.modules.clear() 452 sys.modules.update(self.__saved) 453 454 455class SysPathsSnapshot(object): 456 457 def __init__(self): 458 self.__saved = list(sys.path), list(sys.meta_path) 459 460 def restore(self): 461 sys.path[:], sys.meta_path[:] = self.__saved 462 463 464class Testdir(object): 465 """Temporary test directory with tools to test/run pytest itself. 466 467 This is based on the ``tmpdir`` fixture but provides a number of methods 468 which aid with testing pytest itself. Unless :py:meth:`chdir` is used all 469 methods will use :py:attr:`tmpdir` as their current working directory. 470 471 Attributes: 472 473 :tmpdir: The :py:class:`py.path.local` instance of the temporary directory. 474 475 :plugins: A list of plugins to use with :py:meth:`parseconfig` and 476 :py:meth:`runpytest`. Initially this is an empty list but plugins can 477 be added to the list. The type of items to add to the list depends on 478 the method using them so refer to them for details. 479 480 """ 481 482 def __init__(self, request, tmpdir_factory): 483 self.request = request 484 self._mod_collections = WeakKeyDictionary() 485 name = request.function.__name__ 486 self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) 487 self.plugins = [] 488 self._cwd_snapshot = CwdSnapshot() 489 self._sys_path_snapshot = SysPathsSnapshot() 490 self._sys_modules_snapshot = self.__take_sys_modules_snapshot() 491 self.chdir() 492 self.request.addfinalizer(self.finalize) 493 method = self.request.config.getoption("--runpytest") 494 if method == "inprocess": 495 self._runpytest_method = self.runpytest_inprocess 496 elif method == "subprocess": 497 self._runpytest_method = self.runpytest_subprocess 498 499 def __repr__(self): 500 return "<Testdir %r>" % (self.tmpdir,) 501 502 def finalize(self): 503 """Clean up global state artifacts. 504 505 Some methods modify the global interpreter state and this tries to 506 clean this up. It does not remove the temporary directory however so 507 it can be looked at after the test run has finished. 508 509 """ 510 self._sys_modules_snapshot.restore() 511 self._sys_path_snapshot.restore() 512 self._cwd_snapshot.restore() 513 514 def __take_sys_modules_snapshot(self): 515 # some zope modules used by twisted-related tests keep internal state 516 # and can't be deleted; we had some trouble in the past with 517 # `zope.interface` for example 518 def preserve_module(name): 519 return name.startswith("zope") 520 521 return SysModulesSnapshot(preserve=preserve_module) 522 523 def make_hook_recorder(self, pluginmanager): 524 """Create a new :py:class:`HookRecorder` for a PluginManager.""" 525 assert not hasattr(pluginmanager, "reprec") 526 pluginmanager.reprec = reprec = HookRecorder(pluginmanager) 527 self.request.addfinalizer(reprec.finish_recording) 528 return reprec 529 530 def chdir(self): 531 """Cd into the temporary directory. 532 533 This is done automatically upon instantiation. 534 535 """ 536 self.tmpdir.chdir() 537 538 def _makefile(self, ext, args, kwargs, encoding="utf-8"): 539 items = list(kwargs.items()) 540 541 def to_text(s): 542 return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s) 543 544 if args: 545 source = u"\n".join(to_text(x) for x in args) 546 basename = self.request.function.__name__ 547 items.insert(0, (basename, source)) 548 549 ret = None 550 for basename, value in items: 551 p = self.tmpdir.join(basename).new(ext=ext) 552 p.dirpath().ensure_dir() 553 source = Source(value) 554 source = u"\n".join(to_text(line) for line in source.lines) 555 p.write(source.strip().encode(encoding), "wb") 556 if ret is None: 557 ret = p 558 return ret 559 560 def makefile(self, ext, *args, **kwargs): 561 """Create a new file in the testdir. 562 563 ext: The extension the file should use, including the dot, e.g. `.py`. 564 565 args: All args will be treated as strings and joined using newlines. 566 The result will be written as contents to the file. The name of the 567 file will be based on the test function requesting this fixture. 568 E.g. "testdir.makefile('.txt', 'line1', 'line2')" 569 570 kwargs: Each keyword is the name of a file, while the value of it will 571 be written as contents of the file. 572 E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')" 573 574 """ 575 return self._makefile(ext, args, kwargs) 576 577 def makeconftest(self, source): 578 """Write a contest.py file with 'source' as contents.""" 579 return self.makepyfile(conftest=source) 580 581 def makeini(self, source): 582 """Write a tox.ini file with 'source' as contents.""" 583 return self.makefile(".ini", tox=source) 584 585 def getinicfg(self, source): 586 """Return the pytest section from the tox.ini config file.""" 587 p = self.makeini(source) 588 return py.iniconfig.IniConfig(p)["pytest"] 589 590 def makepyfile(self, *args, **kwargs): 591 """Shortcut for .makefile() with a .py extension.""" 592 return self._makefile(".py", args, kwargs) 593 594 def maketxtfile(self, *args, **kwargs): 595 """Shortcut for .makefile() with a .txt extension.""" 596 return self._makefile(".txt", args, kwargs) 597 598 def syspathinsert(self, path=None): 599 """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. 600 601 This is undone automatically when this object dies at the end of each 602 test. 603 604 """ 605 if path is None: 606 path = self.tmpdir 607 sys.path.insert(0, str(path)) 608 # a call to syspathinsert() usually means that the caller wants to 609 # import some dynamically created files, thus with python3 we 610 # invalidate its import caches 611 self._possibly_invalidate_import_caches() 612 613 def _possibly_invalidate_import_caches(self): 614 # invalidate caches if we can (py33 and above) 615 try: 616 import importlib 617 except ImportError: 618 pass 619 else: 620 if hasattr(importlib, "invalidate_caches"): 621 importlib.invalidate_caches() 622 623 def mkdir(self, name): 624 """Create a new (sub)directory.""" 625 return self.tmpdir.mkdir(name) 626 627 def mkpydir(self, name): 628 """Create a new python package. 629 630 This creates a (sub)directory with an empty ``__init__.py`` file so it 631 gets recognised as a python package. 632 633 """ 634 p = self.mkdir(name) 635 p.ensure("__init__.py") 636 return p 637 638 Session = Session 639 640 def getnode(self, config, arg): 641 """Return the collection node of a file. 642 643 :param config: :py:class:`_pytest.config.Config` instance, see 644 :py:meth:`parseconfig` and :py:meth:`parseconfigure` to create the 645 configuration 646 647 :param arg: a :py:class:`py.path.local` instance of the file 648 649 """ 650 session = Session(config) 651 assert "::" not in str(arg) 652 p = py.path.local(arg) 653 config.hook.pytest_sessionstart(session=session) 654 res = session.perform_collect([str(p)], genitems=False)[0] 655 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 656 return res 657 658 def getpathnode(self, path): 659 """Return the collection node of a file. 660 661 This is like :py:meth:`getnode` but uses :py:meth:`parseconfigure` to 662 create the (configured) pytest Config instance. 663 664 :param path: a :py:class:`py.path.local` instance of the file 665 666 """ 667 config = self.parseconfigure(path) 668 session = Session(config) 669 x = session.fspath.bestrelpath(path) 670 config.hook.pytest_sessionstart(session=session) 671 res = session.perform_collect([x], genitems=False)[0] 672 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 673 return res 674 675 def genitems(self, colitems): 676 """Generate all test items from a collection node. 677 678 This recurses into the collection node and returns a list of all the 679 test items contained within. 680 681 """ 682 session = colitems[0].session 683 result = [] 684 for colitem in colitems: 685 result.extend(session.genitems(colitem)) 686 return result 687 688 def runitem(self, source): 689 """Run the "test_func" Item. 690 691 The calling test instance (class containing the test method) must 692 provide a ``.getrunner()`` method which should return a runner which 693 can run the test protocol for a single item, e.g. 694 :py:func:`_pytest.runner.runtestprotocol`. 695 696 """ 697 # used from runner functional tests 698 item = self.getitem(source) 699 # the test class where we are called from wants to provide the runner 700 testclassinstance = self.request.instance 701 runner = testclassinstance.getrunner() 702 return runner(item) 703 704 def inline_runsource(self, source, *cmdlineargs): 705 """Run a test module in process using ``pytest.main()``. 706 707 This run writes "source" into a temporary file and runs 708 ``pytest.main()`` on it, returning a :py:class:`HookRecorder` instance 709 for the result. 710 711 :param source: the source code of the test module 712 713 :param cmdlineargs: any extra command line arguments to use 714 715 :return: :py:class:`HookRecorder` instance of the result 716 717 """ 718 p = self.makepyfile(source) 719 values = list(cmdlineargs) + [p] 720 return self.inline_run(*values) 721 722 def inline_genitems(self, *args): 723 """Run ``pytest.main(['--collectonly'])`` in-process. 724 725 Runs the :py:func:`pytest.main` function to run all of pytest inside 726 the test process itself like :py:meth:`inline_run`, but returns a 727 tuple of the collected items and a :py:class:`HookRecorder` instance. 728 729 """ 730 rec = self.inline_run("--collect-only", *args) 731 items = [x.item for x in rec.getcalls("pytest_itemcollected")] 732 return items, rec 733 734 def inline_run(self, *args, **kwargs): 735 """Run ``pytest.main()`` in-process, returning a HookRecorder. 736 737 Runs the :py:func:`pytest.main` function to run all of pytest inside 738 the test process itself. This means it can return a 739 :py:class:`HookRecorder` instance which gives more detailed results 740 from that run than can be done by matching stdout/stderr from 741 :py:meth:`runpytest`. 742 743 :param args: command line arguments to pass to :py:func:`pytest.main` 744 745 :param plugin: (keyword-only) extra plugin instances the 746 ``pytest.main()`` instance should use 747 748 :return: a :py:class:`HookRecorder` instance 749 750 """ 751 finalizers = [] 752 try: 753 # When running pytest inline any plugins active in the main test 754 # process are already imported. So this disables the warning which 755 # will trigger to say they can no longer be rewritten, which is 756 # fine as they have already been rewritten. 757 orig_warn = AssertionRewritingHook._warn_already_imported 758 759 def revert_warn_already_imported(): 760 AssertionRewritingHook._warn_already_imported = orig_warn 761 762 finalizers.append(revert_warn_already_imported) 763 AssertionRewritingHook._warn_already_imported = lambda *a: None 764 765 # Any sys.module or sys.path changes done while running pytest 766 # inline should be reverted after the test run completes to avoid 767 # clashing with later inline tests run within the same pytest test, 768 # e.g. just because they use matching test module names. 769 finalizers.append(self.__take_sys_modules_snapshot().restore) 770 finalizers.append(SysPathsSnapshot().restore) 771 772 # Important note: 773 # - our tests should not leave any other references/registrations 774 # laying around other than possibly loaded test modules 775 # referenced from sys.modules, as nothing will clean those up 776 # automatically 777 778 rec = [] 779 780 class Collect(object): 781 782 def pytest_configure(x, config): 783 rec.append(self.make_hook_recorder(config.pluginmanager)) 784 785 plugins = kwargs.get("plugins") or [] 786 plugins.append(Collect()) 787 ret = pytest.main(list(args), plugins=plugins) 788 if len(rec) == 1: 789 reprec = rec.pop() 790 else: 791 792 class reprec(object): 793 pass 794 795 reprec.ret = ret 796 797 # typically we reraise keyboard interrupts from the child run 798 # because it's our user requesting interruption of the testing 799 if ret == 2 and not kwargs.get("no_reraise_ctrlc"): 800 calls = reprec.getcalls("pytest_keyboard_interrupt") 801 if calls and calls[-1].excinfo.type == KeyboardInterrupt: 802 raise KeyboardInterrupt() 803 return reprec 804 finally: 805 for finalizer in finalizers: 806 finalizer() 807 808 def runpytest_inprocess(self, *args, **kwargs): 809 """Return result of running pytest in-process, providing a similar 810 interface to what self.runpytest() provides. 811 812 """ 813 if kwargs.get("syspathinsert"): 814 self.syspathinsert() 815 now = time.time() 816 capture = MultiCapture(Capture=SysCapture) 817 capture.start_capturing() 818 try: 819 try: 820 reprec = self.inline_run(*args, **kwargs) 821 except SystemExit as e: 822 823 class reprec(object): 824 ret = e.args[0] 825 826 except Exception: 827 traceback.print_exc() 828 829 class reprec(object): 830 ret = 3 831 832 finally: 833 out, err = capture.readouterr() 834 capture.stop_capturing() 835 sys.stdout.write(out) 836 sys.stderr.write(err) 837 838 res = RunResult(reprec.ret, out.split("\n"), err.split("\n"), time.time() - now) 839 res.reprec = reprec 840 return res 841 842 def runpytest(self, *args, **kwargs): 843 """Run pytest inline or in a subprocess, depending on the command line 844 option "--runpytest" and return a :py:class:`RunResult`. 845 846 """ 847 args = self._ensure_basetemp(args) 848 return self._runpytest_method(*args, **kwargs) 849 850 def _ensure_basetemp(self, args): 851 args = [str(x) for x in args] 852 for x in args: 853 if str(x).startswith("--basetemp"): 854 # print("basedtemp exists: %s" %(args,)) 855 break 856 else: 857 args.append("--basetemp=%s" % self.tmpdir.dirpath("basetemp")) 858 # print("added basetemp: %s" %(args,)) 859 return args 860 861 def parseconfig(self, *args): 862 """Return a new pytest Config instance from given commandline args. 863 864 This invokes the pytest bootstrapping code in _pytest.config to create 865 a new :py:class:`_pytest.core.PluginManager` and call the 866 pytest_cmdline_parse hook to create a new 867 :py:class:`_pytest.config.Config` instance. 868 869 If :py:attr:`plugins` has been populated they should be plugin modules 870 to be registered with the PluginManager. 871 872 """ 873 args = self._ensure_basetemp(args) 874 875 import _pytest.config 876 877 config = _pytest.config._prepareconfig(args, self.plugins) 878 # we don't know what the test will do with this half-setup config 879 # object and thus we make sure it gets unconfigured properly in any 880 # case (otherwise capturing could still be active, for example) 881 self.request.addfinalizer(config._ensure_unconfigure) 882 return config 883 884 def parseconfigure(self, *args): 885 """Return a new pytest configured Config instance. 886 887 This returns a new :py:class:`_pytest.config.Config` instance like 888 :py:meth:`parseconfig`, but also calls the pytest_configure hook. 889 890 """ 891 config = self.parseconfig(*args) 892 config._do_configure() 893 self.request.addfinalizer(config._ensure_unconfigure) 894 return config 895 896 def getitem(self, source, funcname="test_func"): 897 """Return the test item for a test function. 898 899 This writes the source to a python file and runs pytest's collection on 900 the resulting module, returning the test item for the requested 901 function name. 902 903 :param source: the module source 904 905 :param funcname: the name of the test function for which to return a 906 test item 907 908 """ 909 items = self.getitems(source) 910 for item in items: 911 if item.name == funcname: 912 return item 913 assert 0, ( 914 "%r item not found in module:\n%s\nitems: %s" % (funcname, source, items) 915 ) 916 917 def getitems(self, source): 918 """Return all test items collected from the module. 919 920 This writes the source to a python file and runs pytest's collection on 921 the resulting module, returning all test items contained within. 922 923 """ 924 modcol = self.getmodulecol(source) 925 return self.genitems([modcol]) 926 927 def getmodulecol(self, source, configargs=(), withinit=False): 928 """Return the module collection node for ``source``. 929 930 This writes ``source`` to a file using :py:meth:`makepyfile` and then 931 runs the pytest collection on it, returning the collection node for the 932 test module. 933 934 :param source: the source code of the module to collect 935 936 :param configargs: any extra arguments to pass to 937 :py:meth:`parseconfigure` 938 939 :param withinit: whether to also write an ``__init__.py`` file to the 940 same directory to ensure it is a package 941 942 """ 943 kw = {self.request.function.__name__: Source(source).strip()} 944 path = self.makepyfile(**kw) 945 if withinit: 946 self.makepyfile(__init__="#") 947 self.config = config = self.parseconfigure(path, *configargs) 948 node = self.getnode(config, path) 949 950 return node 951 952 def collect_by_name(self, modcol, name): 953 """Return the collection node for name from the module collection. 954 955 This will search a module collection node for a collection node 956 matching the given name. 957 958 :param modcol: a module collection node; see :py:meth:`getmodulecol` 959 960 :param name: the name of the node to return 961 962 """ 963 if modcol not in self._mod_collections: 964 self._mod_collections[modcol] = list(modcol.collect()) 965 for colitem in self._mod_collections[modcol]: 966 if colitem.name == name: 967 return colitem 968 969 def popen(self, cmdargs, stdout, stderr, **kw): 970 """Invoke subprocess.Popen. 971 972 This calls subprocess.Popen making sure the current working directory 973 is in the PYTHONPATH. 974 975 You probably want to use :py:meth:`run` instead. 976 977 """ 978 env = os.environ.copy() 979 env["PYTHONPATH"] = os.pathsep.join( 980 filter(None, [str(os.getcwd()), env.get("PYTHONPATH", "")]) 981 ) 982 kw["env"] = env 983 984 popen = subprocess.Popen( 985 cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw 986 ) 987 popen.stdin.close() 988 989 return popen 990 991 def run(self, *cmdargs): 992 """Run a command with arguments. 993 994 Run a process using subprocess.Popen saving the stdout and stderr. 995 996 Returns a :py:class:`RunResult`. 997 998 """ 999 return self._run(*cmdargs) 1000 1001 def _run(self, *cmdargs): 1002 cmdargs = [str(x) for x in cmdargs] 1003 p1 = self.tmpdir.join("stdout") 1004 p2 = self.tmpdir.join("stderr") 1005 print("running:", " ".join(cmdargs)) 1006 print(" in:", str(py.path.local())) 1007 f1 = codecs.open(str(p1), "w", encoding="utf8") 1008 f2 = codecs.open(str(p2), "w", encoding="utf8") 1009 try: 1010 now = time.time() 1011 popen = self.popen( 1012 cmdargs, stdout=f1, stderr=f2, close_fds=(sys.platform != "win32") 1013 ) 1014 ret = popen.wait() 1015 finally: 1016 f1.close() 1017 f2.close() 1018 f1 = codecs.open(str(p1), "r", encoding="utf8") 1019 f2 = codecs.open(str(p2), "r", encoding="utf8") 1020 try: 1021 out = f1.read().splitlines() 1022 err = f2.read().splitlines() 1023 finally: 1024 f1.close() 1025 f2.close() 1026 self._dump_lines(out, sys.stdout) 1027 self._dump_lines(err, sys.stderr) 1028 return RunResult(ret, out, err, time.time() - now) 1029 1030 def _dump_lines(self, lines, fp): 1031 try: 1032 for line in lines: 1033 print(line, file=fp) 1034 except UnicodeEncodeError: 1035 print("couldn't print to %s because of encoding" % (fp,)) 1036 1037 def _getpytestargs(self): 1038 # we cannot use `(sys.executable, script)` because on Windows the 1039 # script is e.g. `pytest.exe` 1040 return (sys.executable, PYTEST_FULLPATH) # noqa 1041 1042 def runpython(self, script): 1043 """Run a python script using sys.executable as interpreter. 1044 1045 Returns a :py:class:`RunResult`. 1046 1047 """ 1048 return self.run(sys.executable, script) 1049 1050 def runpython_c(self, command): 1051 """Run python -c "command", return a :py:class:`RunResult`.""" 1052 return self.run(sys.executable, "-c", command) 1053 1054 def runpytest_subprocess(self, *args, **kwargs): 1055 """Run pytest as a subprocess with given arguments. 1056 1057 Any plugins added to the :py:attr:`plugins` list will added using the 1058 ``-p`` command line option. Additionally ``--basetemp`` is used put 1059 any temporary files and directories in a numbered directory prefixed 1060 with "runpytest-" so they do not conflict with the normal numbered 1061 pytest location for temporary files and directories. 1062 1063 Returns a :py:class:`RunResult`. 1064 1065 """ 1066 p = py.path.local.make_numbered_dir( 1067 prefix="runpytest-", keep=None, rootdir=self.tmpdir 1068 ) 1069 args = ("--basetemp=%s" % p,) + args 1070 plugins = [x for x in self.plugins if isinstance(x, str)] 1071 if plugins: 1072 args = ("-p", plugins[0]) + args 1073 args = self._getpytestargs() + args 1074 return self.run(*args) 1075 1076 def spawn_pytest(self, string, expect_timeout=10.0): 1077 """Run pytest using pexpect. 1078 1079 This makes sure to use the right pytest and sets up the temporary 1080 directory locations. 1081 1082 The pexpect child is returned. 1083 1084 """ 1085 basetemp = self.tmpdir.mkdir("temp-pexpect") 1086 invoke = " ".join(map(str, self._getpytestargs())) 1087 cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) 1088 return self.spawn(cmd, expect_timeout=expect_timeout) 1089 1090 def spawn(self, cmd, expect_timeout=10.0): 1091 """Run a command using pexpect. 1092 1093 The pexpect child is returned. 1094 1095 """ 1096 pexpect = pytest.importorskip("pexpect", "3.0") 1097 if hasattr(sys, "pypy_version_info") and "64" in platform.machine(): 1098 pytest.skip("pypy-64 bit not supported") 1099 if sys.platform.startswith("freebsd"): 1100 pytest.xfail("pexpect does not work reliably on freebsd") 1101 logfile = self.tmpdir.join("spawn.out").open("wb") 1102 child = pexpect.spawn(cmd, logfile=logfile) 1103 self.request.addfinalizer(logfile.close) 1104 child.timeout = expect_timeout 1105 return child 1106 1107 1108def getdecoded(out): 1109 try: 1110 return out.decode("utf-8") 1111 except UnicodeDecodeError: 1112 return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( 1113 py.io.saferepr(out), 1114 ) 1115 1116 1117class LineComp(object): 1118 1119 def __init__(self): 1120 self.stringio = py.io.TextIO() 1121 1122 def assert_contains_lines(self, lines2): 1123 """Assert that lines2 are contained (linearly) in lines1. 1124 1125 Return a list of extralines found. 1126 1127 """ 1128 __tracebackhide__ = True 1129 val = self.stringio.getvalue() 1130 self.stringio.truncate(0) 1131 self.stringio.seek(0) 1132 lines1 = val.split("\n") 1133 return LineMatcher(lines1).fnmatch_lines(lines2) 1134 1135 1136class LineMatcher(object): 1137 """Flexible matching of text. 1138 1139 This is a convenience class to test large texts like the output of 1140 commands. 1141 1142 The constructor takes a list of lines without their trailing newlines, i.e. 1143 ``text.splitlines()``. 1144 1145 """ 1146 1147 def __init__(self, lines): 1148 self.lines = lines 1149 self._log_output = [] 1150 1151 def str(self): 1152 """Return the entire original text.""" 1153 return "\n".join(self.lines) 1154 1155 def _getlines(self, lines2): 1156 if isinstance(lines2, str): 1157 lines2 = Source(lines2) 1158 if isinstance(lines2, Source): 1159 lines2 = lines2.strip().lines 1160 return lines2 1161 1162 def fnmatch_lines_random(self, lines2): 1163 """Check lines exist in the output using in any order. 1164 1165 Lines are checked using ``fnmatch.fnmatch``. The argument is a list of 1166 lines which have to occur in the output, in any order. 1167 1168 """ 1169 self._match_lines_random(lines2, fnmatch) 1170 1171 def re_match_lines_random(self, lines2): 1172 """Check lines exist in the output using ``re.match``, in any order. 1173 1174 The argument is a list of lines which have to occur in the output, in 1175 any order. 1176 1177 """ 1178 self._match_lines_random(lines2, lambda name, pat: re.match(pat, name)) 1179 1180 def _match_lines_random(self, lines2, match_func): 1181 """Check lines exist in the output. 1182 1183 The argument is a list of lines which have to occur in the output, in 1184 any order. Each line can contain glob whildcards. 1185 1186 """ 1187 lines2 = self._getlines(lines2) 1188 for line in lines2: 1189 for x in self.lines: 1190 if line == x or match_func(x, line): 1191 self._log("matched: ", repr(line)) 1192 break 1193 else: 1194 self._log("line %r not found in output" % line) 1195 raise ValueError(self._log_text) 1196 1197 def get_lines_after(self, fnline): 1198 """Return all lines following the given line in the text. 1199 1200 The given line can contain glob wildcards. 1201 1202 """ 1203 for i, line in enumerate(self.lines): 1204 if fnline == line or fnmatch(line, fnline): 1205 return self.lines[i + 1:] 1206 raise ValueError("line %r not found in output" % fnline) 1207 1208 def _log(self, *args): 1209 self._log_output.append(" ".join((str(x) for x in args))) 1210 1211 @property 1212 def _log_text(self): 1213 return "\n".join(self._log_output) 1214 1215 def fnmatch_lines(self, lines2): 1216 """Search captured text for matching lines using ``fnmatch.fnmatch``. 1217 1218 The argument is a list of lines which have to match and can use glob 1219 wildcards. If they do not match a pytest.fail() is called. The 1220 matches and non-matches are also printed on stdout. 1221 1222 """ 1223 self._match_lines(lines2, fnmatch, "fnmatch") 1224 1225 def re_match_lines(self, lines2): 1226 """Search captured text for matching lines using ``re.match``. 1227 1228 The argument is a list of lines which have to match using ``re.match``. 1229 If they do not match a pytest.fail() is called. 1230 1231 The matches and non-matches are also printed on stdout. 1232 1233 """ 1234 self._match_lines(lines2, lambda name, pat: re.match(pat, name), "re.match") 1235 1236 def _match_lines(self, lines2, match_func, match_nickname): 1237 """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. 1238 1239 :param list[str] lines2: list of string patterns to match. The actual 1240 format depends on ``match_func`` 1241 :param match_func: a callable ``match_func(line, pattern)`` where line 1242 is the captured line from stdout/stderr and pattern is the matching 1243 pattern 1244 :param str match_nickname: the nickname for the match function that 1245 will be logged to stdout when a match occurs 1246 1247 """ 1248 lines2 = self._getlines(lines2) 1249 lines1 = self.lines[:] 1250 nextline = None 1251 extralines = [] 1252 __tracebackhide__ = True 1253 for line in lines2: 1254 nomatchprinted = False 1255 while lines1: 1256 nextline = lines1.pop(0) 1257 if line == nextline: 1258 self._log("exact match:", repr(line)) 1259 break 1260 elif match_func(nextline, line): 1261 self._log("%s:" % match_nickname, repr(line)) 1262 self._log(" with:", repr(nextline)) 1263 break 1264 else: 1265 if not nomatchprinted: 1266 self._log("nomatch:", repr(line)) 1267 nomatchprinted = True 1268 self._log(" and:", repr(nextline)) 1269 extralines.append(nextline) 1270 else: 1271 self._log("remains unmatched: %r" % (line,)) 1272 pytest.fail(self._log_text) 1273