1""" (disabled by default) support for testing pytest and pytest plugins. """ 2from __future__ import absolute_import, division, print_function 3 4import codecs 5import gc 6import os 7import platform 8import re 9import subprocess 10import sys 11import time 12import traceback 13from fnmatch import fnmatch 14 15from weakref import WeakKeyDictionary 16 17from _pytest.capture import MultiCapture, SysCapture 18from _pytest._code import Source 19import py 20import pytest 21from _pytest.main import Session, EXIT_OK 22from _pytest.assertion.rewrite import AssertionRewritingHook 23 24 25def pytest_addoption(parser): 26 # group = parser.getgroup("pytester", "pytester (self-tests) options") 27 parser.addoption('--lsof', 28 action="store_true", dest="lsof", default=False, 29 help=("run FD checks if lsof is available")) 30 31 parser.addoption('--runpytest', default="inprocess", dest="runpytest", 32 choices=("inprocess", "subprocess", ), 33 help=("run pytest sub runs in tests using an 'inprocess' " 34 "or 'subprocess' (python -m main) method")) 35 36 37def pytest_configure(config): 38 # This might be called multiple times. Only take the first. 39 global _pytest_fullpath 40 try: 41 _pytest_fullpath 42 except NameError: 43 _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc")) 44 _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py") 45 46 if config.getvalue("lsof"): 47 checker = LsofFdLeakChecker() 48 if checker.matching_platform(): 49 config.pluginmanager.register(checker) 50 51 52class LsofFdLeakChecker(object): 53 def get_open_files(self): 54 out = self._exec_lsof() 55 open_files = self._parse_lsof_output(out) 56 return open_files 57 58 def _exec_lsof(self): 59 pid = os.getpid() 60 return py.process.cmdexec("lsof -Ffn0 -p %d" % pid) 61 62 def _parse_lsof_output(self, out): 63 def isopen(line): 64 return line.startswith('f') and ("deleted" not in line and 65 'mem' not in line and "txt" not in line and 'cwd' not in line) 66 67 open_files = [] 68 69 for line in out.split("\n"): 70 if isopen(line): 71 fields = line.split('\0') 72 fd = fields[0][1:] 73 filename = fields[1][1:] 74 if filename.startswith('/'): 75 open_files.append((fd, filename)) 76 77 return open_files 78 79 def matching_platform(self): 80 try: 81 py.process.cmdexec("lsof -v") 82 except (py.process.cmdexec.Error, UnicodeDecodeError): 83 # cmdexec may raise UnicodeDecodeError on Windows systems 84 # with locale other than english: 85 # https://bitbucket.org/pytest-dev/py/issues/66 86 return False 87 else: 88 return True 89 90 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 91 def pytest_runtest_protocol(self, item): 92 lines1 = self.get_open_files() 93 yield 94 if hasattr(sys, "pypy_version_info"): 95 gc.collect() 96 lines2 = self.get_open_files() 97 98 new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1]) 99 leaked_files = [t for t in lines2 if t[0] in new_fds] 100 if leaked_files: 101 error = [] 102 error.append("***** %s FD leakage detected" % len(leaked_files)) 103 error.extend([str(f) for f in leaked_files]) 104 error.append("*** Before:") 105 error.extend([str(f) for f in lines1]) 106 error.append("*** After:") 107 error.extend([str(f) for f in lines2]) 108 error.append(error[0]) 109 error.append("*** function %s:%s: %s " % item.location) 110 error.append("See issue #2366") 111 item.warn('', "\n".join(error)) 112 113 114# XXX copied from execnet's conftest.py - needs to be merged 115winpymap = { 116 'python2.7': r'C:\Python27\python.exe', 117 'python2.6': r'C:\Python26\python.exe', 118 'python3.1': r'C:\Python31\python.exe', 119 'python3.2': r'C:\Python32\python.exe', 120 'python3.3': r'C:\Python33\python.exe', 121 'python3.4': r'C:\Python34\python.exe', 122 'python3.5': r'C:\Python35\python.exe', 123} 124 125def getexecutable(name, cache={}): 126 try: 127 return cache[name] 128 except KeyError: 129 executable = py.path.local.sysfind(name) 130 if executable: 131 import subprocess 132 popen = subprocess.Popen([str(executable), "--version"], 133 universal_newlines=True, stderr=subprocess.PIPE) 134 out, err = popen.communicate() 135 if name == "jython": 136 if not err or "2.5" not in err: 137 executable = None 138 if "2.5.2" in err: 139 executable = None # http://bugs.jython.org/issue1790 140 elif popen.returncode != 0: 141 # Handle pyenv's 127. 142 executable = None 143 cache[name] = executable 144 return executable 145 146@pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4", 147 'pypy', 'pypy3']) 148def anypython(request): 149 name = request.param 150 executable = getexecutable(name) 151 if executable is None: 152 if sys.platform == "win32": 153 executable = winpymap.get(name, None) 154 if executable: 155 executable = py.path.local(executable) 156 if executable.check(): 157 return executable 158 pytest.skip("no suitable %s found" % (name,)) 159 return executable 160 161# used at least by pytest-xdist plugin 162@pytest.fixture 163def _pytest(request): 164 """ Return a helper which offers a gethookrecorder(hook) 165 method which returns a HookRecorder instance which helps 166 to make assertions about called hooks. 167 """ 168 return PytestArg(request) 169 170class PytestArg: 171 def __init__(self, request): 172 self.request = request 173 174 def gethookrecorder(self, hook): 175 hookrecorder = HookRecorder(hook._pm) 176 self.request.addfinalizer(hookrecorder.finish_recording) 177 return hookrecorder 178 179 180def get_public_names(l): 181 """Only return names from iterator l without a leading underscore.""" 182 return [x for x in l if x[0] != "_"] 183 184 185class ParsedCall: 186 def __init__(self, name, kwargs): 187 self.__dict__.update(kwargs) 188 self._name = name 189 190 def __repr__(self): 191 d = self.__dict__.copy() 192 del d['_name'] 193 return "<ParsedCall %r(**%r)>" %(self._name, d) 194 195 196class HookRecorder: 197 """Record all hooks called in a plugin manager. 198 199 This wraps all the hook calls in the plugin manager, recording 200 each call before propagating the normal calls. 201 202 """ 203 204 def __init__(self, pluginmanager): 205 self._pluginmanager = pluginmanager 206 self.calls = [] 207 208 def before(hook_name, hook_impls, kwargs): 209 self.calls.append(ParsedCall(hook_name, kwargs)) 210 211 def after(outcome, hook_name, hook_impls, kwargs): 212 pass 213 214 self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) 215 216 def finish_recording(self): 217 self._undo_wrapping() 218 219 def getcalls(self, names): 220 if isinstance(names, str): 221 names = names.split() 222 return [call for call in self.calls if call._name in names] 223 224 def assert_contains(self, entries): 225 __tracebackhide__ = True 226 i = 0 227 entries = list(entries) 228 backlocals = sys._getframe(1).f_locals 229 while entries: 230 name, check = entries.pop(0) 231 for ind, call in enumerate(self.calls[i:]): 232 if call._name == name: 233 print("NAMEMATCH", name, call) 234 if eval(check, backlocals, call.__dict__): 235 print("CHECKERMATCH", repr(check), "->", call) 236 else: 237 print("NOCHECKERMATCH", repr(check), "-", call) 238 continue 239 i += ind + 1 240 break 241 print("NONAMEMATCH", name, "with", call) 242 else: 243 pytest.fail("could not find %r check %r" % (name, check)) 244 245 def popcall(self, name): 246 __tracebackhide__ = True 247 for i, call in enumerate(self.calls): 248 if call._name == name: 249 del self.calls[i] 250 return call 251 lines = ["could not find call %r, in:" % (name,)] 252 lines.extend([" %s" % str(x) for x in self.calls]) 253 pytest.fail("\n".join(lines)) 254 255 def getcall(self, name): 256 l = self.getcalls(name) 257 assert len(l) == 1, (name, l) 258 return l[0] 259 260 # functionality for test reports 261 262 def getreports(self, 263 names="pytest_runtest_logreport pytest_collectreport"): 264 return [x.report for x in self.getcalls(names)] 265 266 def matchreport(self, inamepart="", 267 names="pytest_runtest_logreport pytest_collectreport", when=None): 268 """ return a testreport whose dotted import path matches """ 269 l = [] 270 for rep in self.getreports(names=names): 271 try: 272 if not when and rep.when != "call" and rep.passed: 273 # setup/teardown passing reports - let's ignore those 274 continue 275 except AttributeError: 276 pass 277 if when and getattr(rep, 'when', None) != when: 278 continue 279 if not inamepart or inamepart in rep.nodeid.split("::"): 280 l.append(rep) 281 if not l: 282 raise ValueError("could not find test report matching %r: " 283 "no test reports at all!" % (inamepart,)) 284 if len(l) > 1: 285 raise ValueError( 286 "found 2 or more testreports matching %r: %s" %(inamepart, l)) 287 return l[0] 288 289 def getfailures(self, 290 names='pytest_runtest_logreport pytest_collectreport'): 291 return [rep for rep in self.getreports(names) if rep.failed] 292 293 def getfailedcollections(self): 294 return self.getfailures('pytest_collectreport') 295 296 def listoutcomes(self): 297 passed = [] 298 skipped = [] 299 failed = [] 300 for rep in self.getreports( 301 "pytest_collectreport pytest_runtest_logreport"): 302 if rep.passed: 303 if getattr(rep, "when", None) == "call": 304 passed.append(rep) 305 elif rep.skipped: 306 skipped.append(rep) 307 elif rep.failed: 308 failed.append(rep) 309 return passed, skipped, failed 310 311 def countoutcomes(self): 312 return [len(x) for x in self.listoutcomes()] 313 314 def assertoutcome(self, passed=0, skipped=0, failed=0): 315 realpassed, realskipped, realfailed = self.listoutcomes() 316 assert passed == len(realpassed) 317 assert skipped == len(realskipped) 318 assert failed == len(realfailed) 319 320 def clear(self): 321 self.calls[:] = [] 322 323 324@pytest.fixture 325def linecomp(request): 326 return LineComp() 327 328 329@pytest.fixture(name='LineMatcher') 330def LineMatcher_fixture(request): 331 return LineMatcher 332 333 334@pytest.fixture 335def testdir(request, tmpdir_factory): 336 return Testdir(request, tmpdir_factory) 337 338 339rex_outcome = re.compile(r"(\d+) ([\w-]+)") 340class RunResult: 341 """The result of running a command. 342 343 Attributes: 344 345 :ret: The return value. 346 :outlines: List of lines captured from stdout. 347 :errlines: List of lines captures from stderr. 348 :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to 349 reconstruct stdout or the commonly used 350 ``stdout.fnmatch_lines()`` method. 351 :stderrr: :py:class:`LineMatcher` of stderr. 352 :duration: Duration in seconds. 353 354 """ 355 def __init__(self, ret, outlines, errlines, duration): 356 self.ret = ret 357 self.outlines = outlines 358 self.errlines = errlines 359 self.stdout = LineMatcher(outlines) 360 self.stderr = LineMatcher(errlines) 361 self.duration = duration 362 363 def parseoutcomes(self): 364 """ Return a dictionary of outcomestring->num from parsing 365 the terminal output that the test process produced.""" 366 for line in reversed(self.outlines): 367 if 'seconds' in line: 368 outcomes = rex_outcome.findall(line) 369 if outcomes: 370 d = {} 371 for num, cat in outcomes: 372 d[cat] = int(num) 373 return d 374 raise ValueError("Pytest terminal report not found") 375 376 def assert_outcomes(self, passed=0, skipped=0, failed=0): 377 """ assert that the specified outcomes appear with the respective 378 numbers (0 means it didn't occur) in the text output from a test run.""" 379 d = self.parseoutcomes() 380 assert passed == d.get("passed", 0) 381 assert skipped == d.get("skipped", 0) 382 assert failed == d.get("failed", 0) 383 384 385 386class Testdir: 387 """Temporary test directory with tools to test/run pytest itself. 388 389 This is based on the ``tmpdir`` fixture but provides a number of 390 methods which aid with testing pytest itself. Unless 391 :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as 392 current working directory. 393 394 Attributes: 395 396 :tmpdir: The :py:class:`py.path.local` instance of the temporary 397 directory. 398 399 :plugins: A list of plugins to use with :py:meth:`parseconfig` and 400 :py:meth:`runpytest`. Initially this is an empty list but 401 plugins can be added to the list. The type of items to add to 402 the list depend on the method which uses them so refer to them 403 for details. 404 405 """ 406 407 def __init__(self, request, tmpdir_factory): 408 self.request = request 409 self._mod_collections = WeakKeyDictionary() 410 # XXX remove duplication with tmpdir plugin 411 basetmp = tmpdir_factory.ensuretemp("testdir") 412 name = request.function.__name__ 413 for i in range(100): 414 try: 415 tmpdir = basetmp.mkdir(name + str(i)) 416 except py.error.EEXIST: 417 continue 418 break 419 self.tmpdir = tmpdir 420 self.plugins = [] 421 self._savesyspath = (list(sys.path), list(sys.meta_path)) 422 self._savemodulekeys = set(sys.modules) 423 self.chdir() # always chdir 424 self.request.addfinalizer(self.finalize) 425 method = self.request.config.getoption("--runpytest") 426 if method == "inprocess": 427 self._runpytest_method = self.runpytest_inprocess 428 elif method == "subprocess": 429 self._runpytest_method = self.runpytest_subprocess 430 431 def __repr__(self): 432 return "<Testdir %r>" % (self.tmpdir,) 433 434 def finalize(self): 435 """Clean up global state artifacts. 436 437 Some methods modify the global interpreter state and this 438 tries to clean this up. It does not remove the temporary 439 directory however so it can be looked at after the test run 440 has finished. 441 442 """ 443 sys.path[:], sys.meta_path[:] = self._savesyspath 444 if hasattr(self, '_olddir'): 445 self._olddir.chdir() 446 self.delete_loaded_modules() 447 448 def delete_loaded_modules(self): 449 """Delete modules that have been loaded during a test. 450 451 This allows the interpreter to catch module changes in case 452 the module is re-imported. 453 """ 454 for name in set(sys.modules).difference(self._savemodulekeys): 455 # some zope modules used by twisted-related tests keeps internal 456 # state and can't be deleted; we had some trouble in the past 457 # with zope.interface for example 458 if not name.startswith("zope"): 459 del sys.modules[name] 460 461 def make_hook_recorder(self, pluginmanager): 462 """Create a new :py:class:`HookRecorder` for a PluginManager.""" 463 assert not hasattr(pluginmanager, "reprec") 464 pluginmanager.reprec = reprec = HookRecorder(pluginmanager) 465 self.request.addfinalizer(reprec.finish_recording) 466 return reprec 467 468 def chdir(self): 469 """Cd into the temporary directory. 470 471 This is done automatically upon instantiation. 472 473 """ 474 old = self.tmpdir.chdir() 475 if not hasattr(self, '_olddir'): 476 self._olddir = old 477 478 def _makefile(self, ext, args, kwargs, encoding="utf-8"): 479 items = list(kwargs.items()) 480 if args: 481 source = py.builtin._totext("\n").join( 482 map(py.builtin._totext, args)) + py.builtin._totext("\n") 483 basename = self.request.function.__name__ 484 items.insert(0, (basename, source)) 485 ret = None 486 for name, value in items: 487 p = self.tmpdir.join(name).new(ext=ext) 488 p.dirpath().ensure_dir() 489 source = Source(value) 490 491 def my_totext(s, encoding="utf-8"): 492 if py.builtin._isbytes(s): 493 s = py.builtin._totext(s, encoding=encoding) 494 return s 495 496 source_unicode = "\n".join([my_totext(line) for line in source.lines]) 497 source = py.builtin._totext(source_unicode) 498 content = source.strip().encode(encoding) # + "\n" 499 #content = content.rstrip() + "\n" 500 p.write(content, "wb") 501 if ret is None: 502 ret = p 503 return ret 504 505 def makefile(self, ext, *args, **kwargs): 506 """Create a new file in the testdir. 507 508 ext: The extension the file should use, including the dot. 509 E.g. ".py". 510 511 args: All args will be treated as strings and joined using 512 newlines. The result will be written as contents to the 513 file. The name of the file will be based on the test 514 function requesting this fixture. 515 E.g. "testdir.makefile('.txt', 'line1', 'line2')" 516 517 kwargs: Each keyword is the name of a file, while the value of 518 it will be written as contents of the file. 519 E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')" 520 521 """ 522 return self._makefile(ext, args, kwargs) 523 524 def makeconftest(self, source): 525 """Write a contest.py file with 'source' as contents.""" 526 return self.makepyfile(conftest=source) 527 528 def makeini(self, source): 529 """Write a tox.ini file with 'source' as contents.""" 530 return self.makefile('.ini', tox=source) 531 532 def getinicfg(self, source): 533 """Return the pytest section from the tox.ini config file.""" 534 p = self.makeini(source) 535 return py.iniconfig.IniConfig(p)['pytest'] 536 537 def makepyfile(self, *args, **kwargs): 538 """Shortcut for .makefile() with a .py extension.""" 539 return self._makefile('.py', args, kwargs) 540 541 def maketxtfile(self, *args, **kwargs): 542 """Shortcut for .makefile() with a .txt extension.""" 543 return self._makefile('.txt', args, kwargs) 544 545 def syspathinsert(self, path=None): 546 """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. 547 548 This is undone automatically after the test. 549 """ 550 if path is None: 551 path = self.tmpdir 552 sys.path.insert(0, str(path)) 553 # a call to syspathinsert() usually means that the caller 554 # wants to import some dynamically created files. 555 # with python3 we thus invalidate import caches. 556 self._possibly_invalidate_import_caches() 557 558 def _possibly_invalidate_import_caches(self): 559 # invalidate caches if we can (py33 and above) 560 try: 561 import importlib 562 except ImportError: 563 pass 564 else: 565 if hasattr(importlib, "invalidate_caches"): 566 importlib.invalidate_caches() 567 568 def mkdir(self, name): 569 """Create a new (sub)directory.""" 570 return self.tmpdir.mkdir(name) 571 572 def mkpydir(self, name): 573 """Create a new python package. 574 575 This creates a (sub)directory with an empty ``__init__.py`` 576 file so that is recognised as a python package. 577 578 """ 579 p = self.mkdir(name) 580 p.ensure("__init__.py") 581 return p 582 583 Session = Session 584 def getnode(self, config, arg): 585 """Return the collection node of a file. 586 587 :param config: :py:class:`_pytest.config.Config` instance, see 588 :py:meth:`parseconfig` and :py:meth:`parseconfigure` to 589 create the configuration. 590 591 :param arg: A :py:class:`py.path.local` instance of the file. 592 593 """ 594 session = Session(config) 595 assert '::' not in str(arg) 596 p = py.path.local(arg) 597 config.hook.pytest_sessionstart(session=session) 598 res = session.perform_collect([str(p)], genitems=False)[0] 599 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 600 return res 601 602 def getpathnode(self, path): 603 """Return the collection node of a file. 604 605 This is like :py:meth:`getnode` but uses 606 :py:meth:`parseconfigure` to create the (configured) pytest 607 Config instance. 608 609 :param path: A :py:class:`py.path.local` instance of the file. 610 611 """ 612 config = self.parseconfigure(path) 613 session = Session(config) 614 x = session.fspath.bestrelpath(path) 615 config.hook.pytest_sessionstart(session=session) 616 res = session.perform_collect([x], genitems=False)[0] 617 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 618 return res 619 620 def genitems(self, colitems): 621 """Generate all test items from a collection node. 622 623 This recurses into the collection node and returns a list of 624 all the test items contained within. 625 626 """ 627 session = colitems[0].session 628 result = [] 629 for colitem in colitems: 630 result.extend(session.genitems(colitem)) 631 return result 632 633 def runitem(self, source): 634 """Run the "test_func" Item. 635 636 The calling test instance (the class which contains the test 637 method) must provide a ``.getrunner()`` method which should 638 return a runner which can run the test protocol for a single 639 item, like e.g. :py:func:`_pytest.runner.runtestprotocol`. 640 641 """ 642 # used from runner functional tests 643 item = self.getitem(source) 644 # the test class where we are called from wants to provide the runner 645 testclassinstance = self.request.instance 646 runner = testclassinstance.getrunner() 647 return runner(item) 648 649 def inline_runsource(self, source, *cmdlineargs): 650 """Run a test module in process using ``pytest.main()``. 651 652 This run writes "source" into a temporary file and runs 653 ``pytest.main()`` on it, returning a :py:class:`HookRecorder` 654 instance for the result. 655 656 :param source: The source code of the test module. 657 658 :param cmdlineargs: Any extra command line arguments to use. 659 660 :return: :py:class:`HookRecorder` instance of the result. 661 662 """ 663 p = self.makepyfile(source) 664 l = list(cmdlineargs) + [p] 665 return self.inline_run(*l) 666 667 def inline_genitems(self, *args): 668 """Run ``pytest.main(['--collectonly'])`` in-process. 669 670 Returns a tuple of the collected items and a 671 :py:class:`HookRecorder` instance. 672 673 This runs the :py:func:`pytest.main` function to run all of 674 pytest inside the test process itself like 675 :py:meth:`inline_run`. However the return value is a tuple of 676 the collection items and a :py:class:`HookRecorder` instance. 677 678 """ 679 rec = self.inline_run("--collect-only", *args) 680 items = [x.item for x in rec.getcalls("pytest_itemcollected")] 681 return items, rec 682 683 def inline_run(self, *args, **kwargs): 684 """Run ``pytest.main()`` in-process, returning a HookRecorder. 685 686 This runs the :py:func:`pytest.main` function to run all of 687 pytest inside the test process itself. This means it can 688 return a :py:class:`HookRecorder` instance which gives more 689 detailed results from then run then can be done by matching 690 stdout/stderr from :py:meth:`runpytest`. 691 692 :param args: Any command line arguments to pass to 693 :py:func:`pytest.main`. 694 695 :param plugin: (keyword-only) Extra plugin instances the 696 ``pytest.main()`` instance should use. 697 698 :return: A :py:class:`HookRecorder` instance. 699 """ 700 # When running py.test inline any plugins active in the main 701 # test process are already imported. So this disables the 702 # warning which will trigger to say they can no longer be 703 # re-written, which is fine as they are already re-written. 704 orig_warn = AssertionRewritingHook._warn_already_imported 705 706 def revert(): 707 AssertionRewritingHook._warn_already_imported = orig_warn 708 709 self.request.addfinalizer(revert) 710 AssertionRewritingHook._warn_already_imported = lambda *a: None 711 712 rec = [] 713 714 class Collect: 715 def pytest_configure(x, config): 716 rec.append(self.make_hook_recorder(config.pluginmanager)) 717 718 plugins = kwargs.get("plugins") or [] 719 plugins.append(Collect()) 720 ret = pytest.main(list(args), plugins=plugins) 721 self.delete_loaded_modules() 722 if len(rec) == 1: 723 reprec = rec.pop() 724 else: 725 class reprec: 726 pass 727 reprec.ret = ret 728 729 # typically we reraise keyboard interrupts from the child run 730 # because it's our user requesting interruption of the testing 731 if ret == 2 and not kwargs.get("no_reraise_ctrlc"): 732 calls = reprec.getcalls("pytest_keyboard_interrupt") 733 if calls and calls[-1].excinfo.type == KeyboardInterrupt: 734 raise KeyboardInterrupt() 735 return reprec 736 737 def runpytest_inprocess(self, *args, **kwargs): 738 """ Return result of running pytest in-process, providing a similar 739 interface to what self.runpytest() provides. """ 740 if kwargs.get("syspathinsert"): 741 self.syspathinsert() 742 now = time.time() 743 capture = MultiCapture(Capture=SysCapture) 744 capture.start_capturing() 745 try: 746 try: 747 reprec = self.inline_run(*args, **kwargs) 748 except SystemExit as e: 749 750 class reprec: 751 ret = e.args[0] 752 753 except Exception: 754 traceback.print_exc() 755 756 class reprec: 757 ret = 3 758 finally: 759 out, err = capture.readouterr() 760 capture.stop_capturing() 761 sys.stdout.write(out) 762 sys.stderr.write(err) 763 764 res = RunResult(reprec.ret, 765 out.split("\n"), err.split("\n"), 766 time.time()-now) 767 res.reprec = reprec 768 return res 769 770 def runpytest(self, *args, **kwargs): 771 """ Run pytest inline or in a subprocess, depending on the command line 772 option "--runpytest" and return a :py:class:`RunResult`. 773 774 """ 775 args = self._ensure_basetemp(args) 776 return self._runpytest_method(*args, **kwargs) 777 778 def _ensure_basetemp(self, args): 779 args = [str(x) for x in args] 780 for x in args: 781 if str(x).startswith('--basetemp'): 782 #print ("basedtemp exists: %s" %(args,)) 783 break 784 else: 785 args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) 786 #print ("added basetemp: %s" %(args,)) 787 return args 788 789 def parseconfig(self, *args): 790 """Return a new pytest Config instance from given commandline args. 791 792 This invokes the pytest bootstrapping code in _pytest.config 793 to create a new :py:class:`_pytest.core.PluginManager` and 794 call the pytest_cmdline_parse hook to create new 795 :py:class:`_pytest.config.Config` instance. 796 797 If :py:attr:`plugins` has been populated they should be plugin 798 modules which will be registered with the PluginManager. 799 800 """ 801 args = self._ensure_basetemp(args) 802 803 import _pytest.config 804 config = _pytest.config._prepareconfig(args, self.plugins) 805 # we don't know what the test will do with this half-setup config 806 # object and thus we make sure it gets unconfigured properly in any 807 # case (otherwise capturing could still be active, for example) 808 self.request.addfinalizer(config._ensure_unconfigure) 809 return config 810 811 def parseconfigure(self, *args): 812 """Return a new pytest configured Config instance. 813 814 This returns a new :py:class:`_pytest.config.Config` instance 815 like :py:meth:`parseconfig`, but also calls the 816 pytest_configure hook. 817 818 """ 819 config = self.parseconfig(*args) 820 config._do_configure() 821 self.request.addfinalizer(config._ensure_unconfigure) 822 return config 823 824 def getitem(self, source, funcname="test_func"): 825 """Return the test item for a test function. 826 827 This writes the source to a python file and runs pytest's 828 collection on the resulting module, returning the test item 829 for the requested function name. 830 831 :param source: The module source. 832 833 :param funcname: The name of the test function for which the 834 Item must be returned. 835 836 """ 837 items = self.getitems(source) 838 for item in items: 839 if item.name == funcname: 840 return item 841 assert 0, "%r item not found in module:\n%s\nitems: %s" %( 842 funcname, source, items) 843 844 def getitems(self, source): 845 """Return all test items collected from the module. 846 847 This writes the source to a python file and runs pytest's 848 collection on the resulting module, returning all test items 849 contained within. 850 851 """ 852 modcol = self.getmodulecol(source) 853 return self.genitems([modcol]) 854 855 def getmodulecol(self, source, configargs=(), withinit=False): 856 """Return the module collection node for ``source``. 857 858 This writes ``source`` to a file using :py:meth:`makepyfile` 859 and then runs the pytest collection on it, returning the 860 collection node for the test module. 861 862 :param source: The source code of the module to collect. 863 864 :param configargs: Any extra arguments to pass to 865 :py:meth:`parseconfigure`. 866 867 :param withinit: Whether to also write a ``__init__.py`` file 868 to the temporary directory to ensure it is a package. 869 870 """ 871 kw = {self.request.function.__name__: Source(source).strip()} 872 path = self.makepyfile(**kw) 873 if withinit: 874 self.makepyfile(__init__ = "#") 875 self.config = config = self.parseconfigure(path, *configargs) 876 node = self.getnode(config, path) 877 878 return node 879 880 def collect_by_name(self, modcol, name): 881 """Return the collection node for name from the module collection. 882 883 This will search a module collection node for a collection 884 node matching the given name. 885 886 :param modcol: A module collection node, see 887 :py:meth:`getmodulecol`. 888 889 :param name: The name of the node to return. 890 891 """ 892 if modcol not in self._mod_collections: 893 self._mod_collections[modcol] = list(modcol.collect()) 894 for colitem in self._mod_collections[modcol]: 895 if colitem.name == name: 896 return colitem 897 898 def popen(self, cmdargs, stdout, stderr, **kw): 899 """Invoke subprocess.Popen. 900 901 This calls subprocess.Popen making sure the current working 902 directory is the PYTHONPATH. 903 904 You probably want to use :py:meth:`run` instead. 905 906 """ 907 env = os.environ.copy() 908 env['PYTHONPATH'] = os.pathsep.join(filter(None, [ 909 str(os.getcwd()), env.get('PYTHONPATH', '')])) 910 kw['env'] = env 911 return subprocess.Popen(cmdargs, 912 stdout=stdout, stderr=stderr, **kw) 913 914 def run(self, *cmdargs): 915 """Run a command with arguments. 916 917 Run a process using subprocess.Popen saving the stdout and 918 stderr. 919 920 Returns a :py:class:`RunResult`. 921 922 """ 923 return self._run(*cmdargs) 924 925 def _run(self, *cmdargs): 926 cmdargs = [str(x) for x in cmdargs] 927 p1 = self.tmpdir.join("stdout") 928 p2 = self.tmpdir.join("stderr") 929 print("running:", ' '.join(cmdargs)) 930 print(" in:", str(py.path.local())) 931 f1 = codecs.open(str(p1), "w", encoding="utf8") 932 f2 = codecs.open(str(p2), "w", encoding="utf8") 933 try: 934 now = time.time() 935 popen = self.popen(cmdargs, stdout=f1, stderr=f2, 936 close_fds=(sys.platform != "win32")) 937 ret = popen.wait() 938 finally: 939 f1.close() 940 f2.close() 941 f1 = codecs.open(str(p1), "r", encoding="utf8") 942 f2 = codecs.open(str(p2), "r", encoding="utf8") 943 try: 944 out = f1.read().splitlines() 945 err = f2.read().splitlines() 946 finally: 947 f1.close() 948 f2.close() 949 self._dump_lines(out, sys.stdout) 950 self._dump_lines(err, sys.stderr) 951 return RunResult(ret, out, err, time.time()-now) 952 953 def _dump_lines(self, lines, fp): 954 try: 955 for line in lines: 956 print(line, file=fp) 957 except UnicodeEncodeError: 958 print("couldn't print to %s because of encoding" % (fp,)) 959 960 def _getpytestargs(self): 961 # we cannot use "(sys.executable,script)" 962 # because on windows the script is e.g. a pytest.exe 963 return (sys.executable, _pytest_fullpath,) # noqa 964 965 def runpython(self, script): 966 """Run a python script using sys.executable as interpreter. 967 968 Returns a :py:class:`RunResult`. 969 """ 970 return self.run(sys.executable, script) 971 972 def runpython_c(self, command): 973 """Run python -c "command", return a :py:class:`RunResult`.""" 974 return self.run(sys.executable, "-c", command) 975 976 def runpytest_subprocess(self, *args, **kwargs): 977 """Run pytest as a subprocess with given arguments. 978 979 Any plugins added to the :py:attr:`plugins` list will added 980 using the ``-p`` command line option. Addtionally 981 ``--basetemp`` is used put any temporary files and directories 982 in a numbered directory prefixed with "runpytest-" so they do 983 not conflict with the normal numberd pytest location for 984 temporary files and directories. 985 986 Returns a :py:class:`RunResult`. 987 988 """ 989 p = py.path.local.make_numbered_dir(prefix="runpytest-", 990 keep=None, rootdir=self.tmpdir) 991 args = ('--basetemp=%s' % p, ) + args 992 #for x in args: 993 # if '--confcutdir' in str(x): 994 # break 995 #else: 996 # pass 997 # args = ('--confcutdir=.',) + args 998 plugins = [x for x in self.plugins if isinstance(x, str)] 999 if plugins: 1000 args = ('-p', plugins[0]) + args 1001 args = self._getpytestargs() + args 1002 return self.run(*args) 1003 1004 def spawn_pytest(self, string, expect_timeout=10.0): 1005 """Run pytest using pexpect. 1006 1007 This makes sure to use the right pytest and sets up the 1008 temporary directory locations. 1009 1010 The pexpect child is returned. 1011 1012 """ 1013 basetemp = self.tmpdir.mkdir("temp-pexpect") 1014 invoke = " ".join(map(str, self._getpytestargs())) 1015 cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) 1016 return self.spawn(cmd, expect_timeout=expect_timeout) 1017 1018 def spawn(self, cmd, expect_timeout=10.0): 1019 """Run a command using pexpect. 1020 1021 The pexpect child is returned. 1022 """ 1023 pexpect = pytest.importorskip("pexpect", "3.0") 1024 if hasattr(sys, 'pypy_version_info') and '64' in platform.machine(): 1025 pytest.skip("pypy-64 bit not supported") 1026 if sys.platform.startswith("freebsd"): 1027 pytest.xfail("pexpect does not work reliably on freebsd") 1028 logfile = self.tmpdir.join("spawn.out").open("wb") 1029 child = pexpect.spawn(cmd, logfile=logfile) 1030 self.request.addfinalizer(logfile.close) 1031 child.timeout = expect_timeout 1032 return child 1033 1034def getdecoded(out): 1035 try: 1036 return out.decode("utf-8") 1037 except UnicodeDecodeError: 1038 return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( 1039 py.io.saferepr(out),) 1040 1041 1042class LineComp: 1043 def __init__(self): 1044 self.stringio = py.io.TextIO() 1045 1046 def assert_contains_lines(self, lines2): 1047 """ assert that lines2 are contained (linearly) in lines1. 1048 return a list of extralines found. 1049 """ 1050 __tracebackhide__ = True 1051 val = self.stringio.getvalue() 1052 self.stringio.truncate(0) 1053 self.stringio.seek(0) 1054 lines1 = val.split("\n") 1055 return LineMatcher(lines1).fnmatch_lines(lines2) 1056 1057 1058class LineMatcher: 1059 """Flexible matching of text. 1060 1061 This is a convenience class to test large texts like the output of 1062 commands. 1063 1064 The constructor takes a list of lines without their trailing 1065 newlines, i.e. ``text.splitlines()``. 1066 1067 """ 1068 1069 def __init__(self, lines): 1070 self.lines = lines 1071 self._log_output = [] 1072 1073 def str(self): 1074 """Return the entire original text.""" 1075 return "\n".join(self.lines) 1076 1077 def _getlines(self, lines2): 1078 if isinstance(lines2, str): 1079 lines2 = Source(lines2) 1080 if isinstance(lines2, Source): 1081 lines2 = lines2.strip().lines 1082 return lines2 1083 1084 def fnmatch_lines_random(self, lines2): 1085 """Check lines exist in the output. 1086 1087 The argument is a list of lines which have to occur in the 1088 output, in any order. Each line can contain glob whildcards. 1089 1090 """ 1091 lines2 = self._getlines(lines2) 1092 for line in lines2: 1093 for x in self.lines: 1094 if line == x or fnmatch(x, line): 1095 self._log("matched: ", repr(line)) 1096 break 1097 else: 1098 self._log("line %r not found in output" % line) 1099 raise ValueError(self._log_text) 1100 1101 def get_lines_after(self, fnline): 1102 """Return all lines following the given line in the text. 1103 1104 The given line can contain glob wildcards. 1105 """ 1106 for i, line in enumerate(self.lines): 1107 if fnline == line or fnmatch(line, fnline): 1108 return self.lines[i+1:] 1109 raise ValueError("line %r not found in output" % fnline) 1110 1111 def _log(self, *args): 1112 self._log_output.append(' '.join((str(x) for x in args))) 1113 1114 @property 1115 def _log_text(self): 1116 return '\n'.join(self._log_output) 1117 1118 def fnmatch_lines(self, lines2): 1119 """Search the text for matching lines. 1120 1121 The argument is a list of lines which have to match and can 1122 use glob wildcards. If they do not match an pytest.fail() is 1123 called. The matches and non-matches are also printed on 1124 stdout. 1125 1126 """ 1127 lines2 = self._getlines(lines2) 1128 lines1 = self.lines[:] 1129 nextline = None 1130 extralines = [] 1131 __tracebackhide__ = True 1132 for line in lines2: 1133 nomatchprinted = False 1134 while lines1: 1135 nextline = lines1.pop(0) 1136 if line == nextline: 1137 self._log("exact match:", repr(line)) 1138 break 1139 elif fnmatch(nextline, line): 1140 self._log("fnmatch:", repr(line)) 1141 self._log(" with:", repr(nextline)) 1142 break 1143 else: 1144 if not nomatchprinted: 1145 self._log("nomatch:", repr(line)) 1146 nomatchprinted = True 1147 self._log(" and:", repr(nextline)) 1148 extralines.append(nextline) 1149 else: 1150 self._log("remains unmatched: %r" % (line,)) 1151 pytest.fail(self._log_text) 1152