1""" (disabled by default) support for testing pytest and pytest plugins. """ 2import codecs 3import gc 4import os 5import platform 6import re 7import subprocess 8import sys 9import time 10import traceback 11from fnmatch import fnmatch 12 13from py.builtin import print_ 14 15from _pytest._code import Source 16import py 17import pytest 18from _pytest.main import Session, EXIT_OK 19 20 21def pytest_addoption(parser): 22 # group = parser.getgroup("pytester", "pytester (self-tests) options") 23 parser.addoption('--lsof', 24 action="store_true", dest="lsof", default=False, 25 help=("run FD checks if lsof is available")) 26 27 parser.addoption('--runpytest', default="inprocess", dest="runpytest", 28 choices=("inprocess", "subprocess", ), 29 help=("run pytest sub runs in tests using an 'inprocess' " 30 "or 'subprocess' (python -m main) method")) 31 32 33def pytest_configure(config): 34 # This might be called multiple times. Only take the first. 35 global _pytest_fullpath 36 try: 37 _pytest_fullpath 38 except NameError: 39 _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc")) 40 _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py") 41 42 if config.getvalue("lsof"): 43 checker = LsofFdLeakChecker() 44 if checker.matching_platform(): 45 config.pluginmanager.register(checker) 46 47 48class LsofFdLeakChecker(object): 49 def get_open_files(self): 50 out = self._exec_lsof() 51 open_files = self._parse_lsof_output(out) 52 return open_files 53 54 def _exec_lsof(self): 55 pid = os.getpid() 56 return py.process.cmdexec("lsof -Ffn0 -p %d" % pid) 57 58 def _parse_lsof_output(self, out): 59 def isopen(line): 60 return line.startswith('f') and ("deleted" not in line and 61 'mem' not in line and "txt" not in line and 'cwd' not in line) 62 63 open_files = [] 64 65 for line in out.split("\n"): 66 if isopen(line): 67 fields = line.split('\0') 68 fd = fields[0][1:] 69 filename = fields[1][1:] 70 if filename.startswith('/'): 71 open_files.append((fd, filename)) 72 73 return open_files 74 75 def matching_platform(self): 76 try: 77 py.process.cmdexec("lsof -v") 78 except (py.process.cmdexec.Error, UnicodeDecodeError): 79 # cmdexec may raise UnicodeDecodeError on Windows systems 80 # with locale other than english: 81 # https://bitbucket.org/pytest-dev/py/issues/66 82 return False 83 else: 84 return True 85 86 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 87 def pytest_runtest_item(self, item): 88 lines1 = self.get_open_files() 89 yield 90 if hasattr(sys, "pypy_version_info"): 91 gc.collect() 92 lines2 = self.get_open_files() 93 94 new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1]) 95 leaked_files = [t for t in lines2 if t[0] in new_fds] 96 if leaked_files: 97 error = [] 98 error.append("***** %s FD leakage detected" % len(leaked_files)) 99 error.extend([str(f) for f in leaked_files]) 100 error.append("*** Before:") 101 error.extend([str(f) for f in lines1]) 102 error.append("*** After:") 103 error.extend([str(f) for f in lines2]) 104 error.append(error[0]) 105 error.append("*** function %s:%s: %s " % item.location) 106 pytest.fail("\n".join(error), pytrace=False) 107 108 109# XXX copied from execnet's conftest.py - needs to be merged 110winpymap = { 111 'python2.7': r'C:\Python27\python.exe', 112 'python2.6': r'C:\Python26\python.exe', 113 'python3.1': r'C:\Python31\python.exe', 114 'python3.2': r'C:\Python32\python.exe', 115 'python3.3': r'C:\Python33\python.exe', 116 'python3.4': r'C:\Python34\python.exe', 117 'python3.5': r'C:\Python35\python.exe', 118} 119 120def getexecutable(name, cache={}): 121 try: 122 return cache[name] 123 except KeyError: 124 executable = py.path.local.sysfind(name) 125 if executable: 126 if name == "jython": 127 import subprocess 128 popen = subprocess.Popen([str(executable), "--version"], 129 universal_newlines=True, stderr=subprocess.PIPE) 130 out, err = popen.communicate() 131 if not err or "2.5" not in err: 132 executable = None 133 if "2.5.2" in err: 134 executable = None # http://bugs.jython.org/issue1790 135 cache[name] = executable 136 return executable 137 138@pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4", 139 'pypy', 'pypy3']) 140def anypython(request): 141 name = request.param 142 executable = getexecutable(name) 143 if executable is None: 144 if sys.platform == "win32": 145 executable = winpymap.get(name, None) 146 if executable: 147 executable = py.path.local(executable) 148 if executable.check(): 149 return executable 150 pytest.skip("no suitable %s found" % (name,)) 151 return executable 152 153# used at least by pytest-xdist plugin 154@pytest.fixture 155def _pytest(request): 156 """ Return a helper which offers a gethookrecorder(hook) 157 method which returns a HookRecorder instance which helps 158 to make assertions about called hooks. 159 """ 160 return PytestArg(request) 161 162class PytestArg: 163 def __init__(self, request): 164 self.request = request 165 166 def gethookrecorder(self, hook): 167 hookrecorder = HookRecorder(hook._pm) 168 self.request.addfinalizer(hookrecorder.finish_recording) 169 return hookrecorder 170 171 172def get_public_names(l): 173 """Only return names from iterator l without a leading underscore.""" 174 return [x for x in l if x[0] != "_"] 175 176 177class ParsedCall: 178 def __init__(self, name, kwargs): 179 self.__dict__.update(kwargs) 180 self._name = name 181 182 def __repr__(self): 183 d = self.__dict__.copy() 184 del d['_name'] 185 return "<ParsedCall %r(**%r)>" %(self._name, d) 186 187 188class HookRecorder: 189 """Record all hooks called in a plugin manager. 190 191 This wraps all the hook calls in the plugin manager, recording 192 each call before propagating the normal calls. 193 194 """ 195 196 def __init__(self, pluginmanager): 197 self._pluginmanager = pluginmanager 198 self.calls = [] 199 200 def before(hook_name, hook_impls, kwargs): 201 self.calls.append(ParsedCall(hook_name, kwargs)) 202 203 def after(outcome, hook_name, hook_impls, kwargs): 204 pass 205 206 self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) 207 208 def finish_recording(self): 209 self._undo_wrapping() 210 211 def getcalls(self, names): 212 if isinstance(names, str): 213 names = names.split() 214 return [call for call in self.calls if call._name in names] 215 216 def assert_contains(self, entries): 217 __tracebackhide__ = True 218 i = 0 219 entries = list(entries) 220 backlocals = sys._getframe(1).f_locals 221 while entries: 222 name, check = entries.pop(0) 223 for ind, call in enumerate(self.calls[i:]): 224 if call._name == name: 225 print_("NAMEMATCH", name, call) 226 if eval(check, backlocals, call.__dict__): 227 print_("CHECKERMATCH", repr(check), "->", call) 228 else: 229 print_("NOCHECKERMATCH", repr(check), "-", call) 230 continue 231 i += ind + 1 232 break 233 print_("NONAMEMATCH", name, "with", call) 234 else: 235 pytest.fail("could not find %r check %r" % (name, check)) 236 237 def popcall(self, name): 238 __tracebackhide__ = True 239 for i, call in enumerate(self.calls): 240 if call._name == name: 241 del self.calls[i] 242 return call 243 lines = ["could not find call %r, in:" % (name,)] 244 lines.extend([" %s" % str(x) for x in self.calls]) 245 pytest.fail("\n".join(lines)) 246 247 def getcall(self, name): 248 l = self.getcalls(name) 249 assert len(l) == 1, (name, l) 250 return l[0] 251 252 # functionality for test reports 253 254 def getreports(self, 255 names="pytest_runtest_logreport pytest_collectreport"): 256 return [x.report for x in self.getcalls(names)] 257 258 def matchreport(self, inamepart="", 259 names="pytest_runtest_logreport pytest_collectreport", when=None): 260 """ return a testreport whose dotted import path matches """ 261 l = [] 262 for rep in self.getreports(names=names): 263 try: 264 if not when and rep.when != "call" and rep.passed: 265 # setup/teardown passing reports - let's ignore those 266 continue 267 except AttributeError: 268 pass 269 if when and getattr(rep, 'when', None) != when: 270 continue 271 if not inamepart or inamepart in rep.nodeid.split("::"): 272 l.append(rep) 273 if not l: 274 raise ValueError("could not find test report matching %r: " 275 "no test reports at all!" % (inamepart,)) 276 if len(l) > 1: 277 raise ValueError( 278 "found 2 or more testreports matching %r: %s" %(inamepart, l)) 279 return l[0] 280 281 def getfailures(self, 282 names='pytest_runtest_logreport pytest_collectreport'): 283 return [rep for rep in self.getreports(names) if rep.failed] 284 285 def getfailedcollections(self): 286 return self.getfailures('pytest_collectreport') 287 288 def listoutcomes(self): 289 passed = [] 290 skipped = [] 291 failed = [] 292 for rep in self.getreports( 293 "pytest_collectreport pytest_runtest_logreport"): 294 if rep.passed: 295 if getattr(rep, "when", None) == "call": 296 passed.append(rep) 297 elif rep.skipped: 298 skipped.append(rep) 299 elif rep.failed: 300 failed.append(rep) 301 return passed, skipped, failed 302 303 def countoutcomes(self): 304 return [len(x) for x in self.listoutcomes()] 305 306 def assertoutcome(self, passed=0, skipped=0, failed=0): 307 realpassed, realskipped, realfailed = self.listoutcomes() 308 assert passed == len(realpassed) 309 assert skipped == len(realskipped) 310 assert failed == len(realfailed) 311 312 def clear(self): 313 self.calls[:] = [] 314 315 316@pytest.fixture 317def linecomp(request): 318 return LineComp() 319 320 321def pytest_funcarg__LineMatcher(request): 322 return LineMatcher 323 324 325@pytest.fixture 326def testdir(request, tmpdir_factory): 327 return Testdir(request, tmpdir_factory) 328 329 330rex_outcome = re.compile("(\d+) ([\w-]+)") 331class RunResult: 332 """The result of running a command. 333 334 Attributes: 335 336 :ret: The return value. 337 :outlines: List of lines captured from stdout. 338 :errlines: List of lines captures from stderr. 339 :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to 340 reconstruct stdout or the commonly used 341 ``stdout.fnmatch_lines()`` method. 342 :stderrr: :py:class:`LineMatcher` of stderr. 343 :duration: Duration in seconds. 344 345 """ 346 def __init__(self, ret, outlines, errlines, duration): 347 self.ret = ret 348 self.outlines = outlines 349 self.errlines = errlines 350 self.stdout = LineMatcher(outlines) 351 self.stderr = LineMatcher(errlines) 352 self.duration = duration 353 354 def parseoutcomes(self): 355 """ Return a dictionary of outcomestring->num from parsing 356 the terminal output that the test process produced.""" 357 for line in reversed(self.outlines): 358 if 'seconds' in line: 359 outcomes = rex_outcome.findall(line) 360 if outcomes: 361 d = {} 362 for num, cat in outcomes: 363 d[cat] = int(num) 364 return d 365 366 def assert_outcomes(self, passed=0, skipped=0, failed=0): 367 """ assert that the specified outcomes appear with the respective 368 numbers (0 means it didn't occur) in the text output from a test run.""" 369 d = self.parseoutcomes() 370 assert passed == d.get("passed", 0) 371 assert skipped == d.get("skipped", 0) 372 assert failed == d.get("failed", 0) 373 374 375 376class Testdir: 377 """Temporary test directory with tools to test/run py.test itself. 378 379 This is based on the ``tmpdir`` fixture but provides a number of 380 methods which aid with testing py.test itself. Unless 381 :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as 382 current working directory. 383 384 Attributes: 385 386 :tmpdir: The :py:class:`py.path.local` instance of the temporary 387 directory. 388 389 :plugins: A list of plugins to use with :py:meth:`parseconfig` and 390 :py:meth:`runpytest`. Initially this is an empty list but 391 plugins can be added to the list. The type of items to add to 392 the list depend on the method which uses them so refer to them 393 for details. 394 395 """ 396 397 def __init__(self, request, tmpdir_factory): 398 self.request = request 399 # XXX remove duplication with tmpdir plugin 400 basetmp = tmpdir_factory.ensuretemp("testdir") 401 name = request.function.__name__ 402 for i in range(100): 403 try: 404 tmpdir = basetmp.mkdir(name + str(i)) 405 except py.error.EEXIST: 406 continue 407 break 408 self.tmpdir = tmpdir 409 self.plugins = [] 410 self._savesyspath = (list(sys.path), list(sys.meta_path)) 411 self._savemodulekeys = set(sys.modules) 412 self.chdir() # always chdir 413 self.request.addfinalizer(self.finalize) 414 method = self.request.config.getoption("--runpytest") 415 if method == "inprocess": 416 self._runpytest_method = self.runpytest_inprocess 417 elif method == "subprocess": 418 self._runpytest_method = self.runpytest_subprocess 419 420 def __repr__(self): 421 return "<Testdir %r>" % (self.tmpdir,) 422 423 def finalize(self): 424 """Clean up global state artifacts. 425 426 Some methods modify the global interpreter state and this 427 tries to clean this up. It does not remove the temporary 428 directory however so it can be looked at after the test run 429 has finished. 430 431 """ 432 sys.path[:], sys.meta_path[:] = self._savesyspath 433 if hasattr(self, '_olddir'): 434 self._olddir.chdir() 435 self.delete_loaded_modules() 436 437 def delete_loaded_modules(self): 438 """Delete modules that have been loaded during a test. 439 440 This allows the interpreter to catch module changes in case 441 the module is re-imported. 442 """ 443 for name in set(sys.modules).difference(self._savemodulekeys): 444 # it seems zope.interfaces is keeping some state 445 # (used by twisted related tests) 446 if name != "zope.interface": 447 del sys.modules[name] 448 449 def make_hook_recorder(self, pluginmanager): 450 """Create a new :py:class:`HookRecorder` for a PluginManager.""" 451 assert not hasattr(pluginmanager, "reprec") 452 pluginmanager.reprec = reprec = HookRecorder(pluginmanager) 453 self.request.addfinalizer(reprec.finish_recording) 454 return reprec 455 456 def chdir(self): 457 """Cd into the temporary directory. 458 459 This is done automatically upon instantiation. 460 461 """ 462 old = self.tmpdir.chdir() 463 if not hasattr(self, '_olddir'): 464 self._olddir = old 465 466 def _makefile(self, ext, args, kwargs): 467 items = list(kwargs.items()) 468 if args: 469 source = py.builtin._totext("\n").join( 470 map(py.builtin._totext, args)) + py.builtin._totext("\n") 471 basename = self.request.function.__name__ 472 items.insert(0, (basename, source)) 473 ret = None 474 for name, value in items: 475 p = self.tmpdir.join(name).new(ext=ext) 476 source = Source(value) 477 def my_totext(s, encoding="utf-8"): 478 if py.builtin._isbytes(s): 479 s = py.builtin._totext(s, encoding=encoding) 480 return s 481 source_unicode = "\n".join([my_totext(line) for line in source.lines]) 482 source = py.builtin._totext(source_unicode) 483 content = source.strip().encode("utf-8") # + "\n" 484 #content = content.rstrip() + "\n" 485 p.write(content, "wb") 486 if ret is None: 487 ret = p 488 return ret 489 490 def makefile(self, ext, *args, **kwargs): 491 """Create a new file in the testdir. 492 493 ext: The extension the file should use, including the dot. 494 E.g. ".py". 495 496 args: All args will be treated as strings and joined using 497 newlines. The result will be written as contents to the 498 file. The name of the file will be based on the test 499 function requesting this fixture. 500 E.g. "testdir.makefile('.txt', 'line1', 'line2')" 501 502 kwargs: Each keyword is the name of a file, while the value of 503 it will be written as contents of the file. 504 E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')" 505 506 """ 507 return self._makefile(ext, args, kwargs) 508 509 def makeconftest(self, source): 510 """Write a contest.py file with 'source' as contents.""" 511 return self.makepyfile(conftest=source) 512 513 def makeini(self, source): 514 """Write a tox.ini file with 'source' as contents.""" 515 return self.makefile('.ini', tox=source) 516 517 def getinicfg(self, source): 518 """Return the pytest section from the tox.ini config file.""" 519 p = self.makeini(source) 520 return py.iniconfig.IniConfig(p)['pytest'] 521 522 def makepyfile(self, *args, **kwargs): 523 """Shortcut for .makefile() with a .py extension.""" 524 return self._makefile('.py', args, kwargs) 525 526 def maketxtfile(self, *args, **kwargs): 527 """Shortcut for .makefile() with a .txt extension.""" 528 return self._makefile('.txt', args, kwargs) 529 530 def syspathinsert(self, path=None): 531 """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. 532 533 This is undone automatically after the test. 534 """ 535 if path is None: 536 path = self.tmpdir 537 sys.path.insert(0, str(path)) 538 # a call to syspathinsert() usually means that the caller 539 # wants to import some dynamically created files. 540 # with python3 we thus invalidate import caches. 541 self._possibly_invalidate_import_caches() 542 543 def _possibly_invalidate_import_caches(self): 544 # invalidate caches if we can (py33 and above) 545 try: 546 import importlib 547 except ImportError: 548 pass 549 else: 550 if hasattr(importlib, "invalidate_caches"): 551 importlib.invalidate_caches() 552 553 def mkdir(self, name): 554 """Create a new (sub)directory.""" 555 return self.tmpdir.mkdir(name) 556 557 def mkpydir(self, name): 558 """Create a new python package. 559 560 This creates a (sub)direcotry with an empty ``__init__.py`` 561 file so that is recognised as a python package. 562 563 """ 564 p = self.mkdir(name) 565 p.ensure("__init__.py") 566 return p 567 568 Session = Session 569 def getnode(self, config, arg): 570 """Return the collection node of a file. 571 572 :param config: :py:class:`_pytest.config.Config` instance, see 573 :py:meth:`parseconfig` and :py:meth:`parseconfigure` to 574 create the configuration. 575 576 :param arg: A :py:class:`py.path.local` instance of the file. 577 578 """ 579 session = Session(config) 580 assert '::' not in str(arg) 581 p = py.path.local(arg) 582 config.hook.pytest_sessionstart(session=session) 583 res = session.perform_collect([str(p)], genitems=False)[0] 584 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 585 return res 586 587 def getpathnode(self, path): 588 """Return the collection node of a file. 589 590 This is like :py:meth:`getnode` but uses 591 :py:meth:`parseconfigure` to create the (configured) py.test 592 Config instance. 593 594 :param path: A :py:class:`py.path.local` instance of the file. 595 596 """ 597 config = self.parseconfigure(path) 598 session = Session(config) 599 x = session.fspath.bestrelpath(path) 600 config.hook.pytest_sessionstart(session=session) 601 res = session.perform_collect([x], genitems=False)[0] 602 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 603 return res 604 605 def genitems(self, colitems): 606 """Generate all test items from a collection node. 607 608 This recurses into the collection node and returns a list of 609 all the test items contained within. 610 611 """ 612 session = colitems[0].session 613 result = [] 614 for colitem in colitems: 615 result.extend(session.genitems(colitem)) 616 return result 617 618 def runitem(self, source): 619 """Run the "test_func" Item. 620 621 The calling test instance (the class which contains the test 622 method) must provide a ``.getrunner()`` method which should 623 return a runner which can run the test protocol for a single 624 item, like e.g. :py:func:`_pytest.runner.runtestprotocol`. 625 626 """ 627 # used from runner functional tests 628 item = self.getitem(source) 629 # the test class where we are called from wants to provide the runner 630 testclassinstance = self.request.instance 631 runner = testclassinstance.getrunner() 632 return runner(item) 633 634 def inline_runsource(self, source, *cmdlineargs): 635 """Run a test module in process using ``pytest.main()``. 636 637 This run writes "source" into a temporary file and runs 638 ``pytest.main()`` on it, returning a :py:class:`HookRecorder` 639 instance for the result. 640 641 :param source: The source code of the test module. 642 643 :param cmdlineargs: Any extra command line arguments to use. 644 645 :return: :py:class:`HookRecorder` instance of the result. 646 647 """ 648 p = self.makepyfile(source) 649 l = list(cmdlineargs) + [p] 650 return self.inline_run(*l) 651 652 def inline_genitems(self, *args): 653 """Run ``pytest.main(['--collectonly'])`` in-process. 654 655 Retuns a tuple of the collected items and a 656 :py:class:`HookRecorder` instance. 657 658 This runs the :py:func:`pytest.main` function to run all of 659 py.test inside the test process itself like 660 :py:meth:`inline_run`. However the return value is a tuple of 661 the collection items and a :py:class:`HookRecorder` instance. 662 663 """ 664 rec = self.inline_run("--collect-only", *args) 665 items = [x.item for x in rec.getcalls("pytest_itemcollected")] 666 return items, rec 667 668 def inline_run(self, *args, **kwargs): 669 """Run ``pytest.main()`` in-process, returning a HookRecorder. 670 671 This runs the :py:func:`pytest.main` function to run all of 672 py.test inside the test process itself. This means it can 673 return a :py:class:`HookRecorder` instance which gives more 674 detailed results from then run then can be done by matching 675 stdout/stderr from :py:meth:`runpytest`. 676 677 :param args: Any command line arguments to pass to 678 :py:func:`pytest.main`. 679 680 :param plugin: (keyword-only) Extra plugin instances the 681 ``pytest.main()`` instance should use. 682 683 :return: A :py:class:`HookRecorder` instance. 684 685 """ 686 rec = [] 687 class Collect: 688 def pytest_configure(x, config): 689 rec.append(self.make_hook_recorder(config.pluginmanager)) 690 691 plugins = kwargs.get("plugins") or [] 692 plugins.append(Collect()) 693 ret = pytest.main(list(args), plugins=plugins) 694 self.delete_loaded_modules() 695 if len(rec) == 1: 696 reprec = rec.pop() 697 else: 698 class reprec: 699 pass 700 reprec.ret = ret 701 702 # typically we reraise keyboard interrupts from the child run 703 # because it's our user requesting interruption of the testing 704 if ret == 2 and not kwargs.get("no_reraise_ctrlc"): 705 calls = reprec.getcalls("pytest_keyboard_interrupt") 706 if calls and calls[-1].excinfo.type == KeyboardInterrupt: 707 raise KeyboardInterrupt() 708 return reprec 709 710 def runpytest_inprocess(self, *args, **kwargs): 711 """ Return result of running pytest in-process, providing a similar 712 interface to what self.runpytest() provides. """ 713 if kwargs.get("syspathinsert"): 714 self.syspathinsert() 715 now = time.time() 716 capture = py.io.StdCapture() 717 try: 718 try: 719 reprec = self.inline_run(*args, **kwargs) 720 except SystemExit as e: 721 class reprec: 722 ret = e.args[0] 723 except Exception: 724 traceback.print_exc() 725 class reprec: 726 ret = 3 727 finally: 728 out, err = capture.reset() 729 sys.stdout.write(out) 730 sys.stderr.write(err) 731 732 res = RunResult(reprec.ret, 733 out.split("\n"), err.split("\n"), 734 time.time()-now) 735 res.reprec = reprec 736 return res 737 738 def runpytest(self, *args, **kwargs): 739 """ Run pytest inline or in a subprocess, depending on the command line 740 option "--runpytest" and return a :py:class:`RunResult`. 741 742 """ 743 args = self._ensure_basetemp(args) 744 return self._runpytest_method(*args, **kwargs) 745 746 def _ensure_basetemp(self, args): 747 args = [str(x) for x in args] 748 for x in args: 749 if str(x).startswith('--basetemp'): 750 #print ("basedtemp exists: %s" %(args,)) 751 break 752 else: 753 args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) 754 #print ("added basetemp: %s" %(args,)) 755 return args 756 757 def parseconfig(self, *args): 758 """Return a new py.test Config instance from given commandline args. 759 760 This invokes the py.test bootstrapping code in _pytest.config 761 to create a new :py:class:`_pytest.core.PluginManager` and 762 call the pytest_cmdline_parse hook to create new 763 :py:class:`_pytest.config.Config` instance. 764 765 If :py:attr:`plugins` has been populated they should be plugin 766 modules which will be registered with the PluginManager. 767 768 """ 769 args = self._ensure_basetemp(args) 770 771 import _pytest.config 772 config = _pytest.config._prepareconfig(args, self.plugins) 773 # we don't know what the test will do with this half-setup config 774 # object and thus we make sure it gets unconfigured properly in any 775 # case (otherwise capturing could still be active, for example) 776 self.request.addfinalizer(config._ensure_unconfigure) 777 return config 778 779 def parseconfigure(self, *args): 780 """Return a new py.test configured Config instance. 781 782 This returns a new :py:class:`_pytest.config.Config` instance 783 like :py:meth:`parseconfig`, but also calls the 784 pytest_configure hook. 785 786 """ 787 config = self.parseconfig(*args) 788 config._do_configure() 789 self.request.addfinalizer(config._ensure_unconfigure) 790 return config 791 792 def getitem(self, source, funcname="test_func"): 793 """Return the test item for a test function. 794 795 This writes the source to a python file and runs py.test's 796 collection on the resulting module, returning the test item 797 for the requested function name. 798 799 :param source: The module source. 800 801 :param funcname: The name of the test function for which the 802 Item must be returned. 803 804 """ 805 items = self.getitems(source) 806 for item in items: 807 if item.name == funcname: 808 return item 809 assert 0, "%r item not found in module:\n%s\nitems: %s" %( 810 funcname, source, items) 811 812 def getitems(self, source): 813 """Return all test items collected from the module. 814 815 This writes the source to a python file and runs py.test's 816 collection on the resulting module, returning all test items 817 contained within. 818 819 """ 820 modcol = self.getmodulecol(source) 821 return self.genitems([modcol]) 822 823 def getmodulecol(self, source, configargs=(), withinit=False): 824 """Return the module collection node for ``source``. 825 826 This writes ``source`` to a file using :py:meth:`makepyfile` 827 and then runs the py.test collection on it, returning the 828 collection node for the test module. 829 830 :param source: The source code of the module to collect. 831 832 :param configargs: Any extra arguments to pass to 833 :py:meth:`parseconfigure`. 834 835 :param withinit: Whether to also write a ``__init__.py`` file 836 to the temporarly directory to ensure it is a package. 837 838 """ 839 kw = {self.request.function.__name__: Source(source).strip()} 840 path = self.makepyfile(**kw) 841 if withinit: 842 self.makepyfile(__init__ = "#") 843 self.config = config = self.parseconfigure(path, *configargs) 844 node = self.getnode(config, path) 845 return node 846 847 def collect_by_name(self, modcol, name): 848 """Return the collection node for name from the module collection. 849 850 This will search a module collection node for a collection 851 node matching the given name. 852 853 :param modcol: A module collection node, see 854 :py:meth:`getmodulecol`. 855 856 :param name: The name of the node to return. 857 858 """ 859 for colitem in modcol._memocollect(): 860 if colitem.name == name: 861 return colitem 862 863 def popen(self, cmdargs, stdout, stderr, **kw): 864 """Invoke subprocess.Popen. 865 866 This calls subprocess.Popen making sure the current working 867 directory is the PYTHONPATH. 868 869 You probably want to use :py:meth:`run` instead. 870 871 """ 872 env = os.environ.copy() 873 env['PYTHONPATH'] = os.pathsep.join(filter(None, [ 874 str(os.getcwd()), env.get('PYTHONPATH', '')])) 875 kw['env'] = env 876 return subprocess.Popen(cmdargs, 877 stdout=stdout, stderr=stderr, **kw) 878 879 def run(self, *cmdargs): 880 """Run a command with arguments. 881 882 Run a process using subprocess.Popen saving the stdout and 883 stderr. 884 885 Returns a :py:class:`RunResult`. 886 887 """ 888 return self._run(*cmdargs) 889 890 def _run(self, *cmdargs): 891 cmdargs = [str(x) for x in cmdargs] 892 p1 = self.tmpdir.join("stdout") 893 p2 = self.tmpdir.join("stderr") 894 print_("running:", ' '.join(cmdargs)) 895 print_(" in:", str(py.path.local())) 896 f1 = codecs.open(str(p1), "w", encoding="utf8") 897 f2 = codecs.open(str(p2), "w", encoding="utf8") 898 try: 899 now = time.time() 900 popen = self.popen(cmdargs, stdout=f1, stderr=f2, 901 close_fds=(sys.platform != "win32")) 902 ret = popen.wait() 903 finally: 904 f1.close() 905 f2.close() 906 f1 = codecs.open(str(p1), "r", encoding="utf8") 907 f2 = codecs.open(str(p2), "r", encoding="utf8") 908 try: 909 out = f1.read().splitlines() 910 err = f2.read().splitlines() 911 finally: 912 f1.close() 913 f2.close() 914 self._dump_lines(out, sys.stdout) 915 self._dump_lines(err, sys.stderr) 916 return RunResult(ret, out, err, time.time()-now) 917 918 def _dump_lines(self, lines, fp): 919 try: 920 for line in lines: 921 py.builtin.print_(line, file=fp) 922 except UnicodeEncodeError: 923 print("couldn't print to %s because of encoding" % (fp,)) 924 925 def _getpytestargs(self): 926 # we cannot use "(sys.executable,script)" 927 # because on windows the script is e.g. a py.test.exe 928 return (sys.executable, _pytest_fullpath,) # noqa 929 930 def runpython(self, script): 931 """Run a python script using sys.executable as interpreter. 932 933 Returns a :py:class:`RunResult`. 934 """ 935 return self.run(sys.executable, script) 936 937 def runpython_c(self, command): 938 """Run python -c "command", return a :py:class:`RunResult`.""" 939 return self.run(sys.executable, "-c", command) 940 941 def runpytest_subprocess(self, *args, **kwargs): 942 """Run py.test as a subprocess with given arguments. 943 944 Any plugins added to the :py:attr:`plugins` list will added 945 using the ``-p`` command line option. Addtionally 946 ``--basetemp`` is used put any temporary files and directories 947 in a numbered directory prefixed with "runpytest-" so they do 948 not conflict with the normal numberd pytest location for 949 temporary files and directories. 950 951 Returns a :py:class:`RunResult`. 952 953 """ 954 p = py.path.local.make_numbered_dir(prefix="runpytest-", 955 keep=None, rootdir=self.tmpdir) 956 args = ('--basetemp=%s' % p, ) + args 957 #for x in args: 958 # if '--confcutdir' in str(x): 959 # break 960 #else: 961 # pass 962 # args = ('--confcutdir=.',) + args 963 plugins = [x for x in self.plugins if isinstance(x, str)] 964 if plugins: 965 args = ('-p', plugins[0]) + args 966 args = self._getpytestargs() + args 967 return self.run(*args) 968 969 def spawn_pytest(self, string, expect_timeout=10.0): 970 """Run py.test using pexpect. 971 972 This makes sure to use the right py.test and sets up the 973 temporary directory locations. 974 975 The pexpect child is returned. 976 977 """ 978 basetemp = self.tmpdir.mkdir("pexpect") 979 invoke = " ".join(map(str, self._getpytestargs())) 980 cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) 981 return self.spawn(cmd, expect_timeout=expect_timeout) 982 983 def spawn(self, cmd, expect_timeout=10.0): 984 """Run a command using pexpect. 985 986 The pexpect child is returned. 987 """ 988 pexpect = pytest.importorskip("pexpect", "3.0") 989 if hasattr(sys, 'pypy_version_info') and '64' in platform.machine(): 990 pytest.skip("pypy-64 bit not supported") 991 if sys.platform == "darwin": 992 pytest.xfail("pexpect does not work reliably on darwin?!") 993 if sys.platform.startswith("freebsd"): 994 pytest.xfail("pexpect does not work reliably on freebsd") 995 logfile = self.tmpdir.join("spawn.out").open("wb") 996 child = pexpect.spawn(cmd, logfile=logfile) 997 self.request.addfinalizer(logfile.close) 998 child.timeout = expect_timeout 999 return child 1000 1001def getdecoded(out): 1002 try: 1003 return out.decode("utf-8") 1004 except UnicodeDecodeError: 1005 return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( 1006 py.io.saferepr(out),) 1007 1008 1009class LineComp: 1010 def __init__(self): 1011 self.stringio = py.io.TextIO() 1012 1013 def assert_contains_lines(self, lines2): 1014 """ assert that lines2 are contained (linearly) in lines1. 1015 return a list of extralines found. 1016 """ 1017 __tracebackhide__ = True 1018 val = self.stringio.getvalue() 1019 self.stringio.truncate(0) 1020 self.stringio.seek(0) 1021 lines1 = val.split("\n") 1022 return LineMatcher(lines1).fnmatch_lines(lines2) 1023 1024 1025class LineMatcher: 1026 """Flexible matching of text. 1027 1028 This is a convenience class to test large texts like the output of 1029 commands. 1030 1031 The constructor takes a list of lines without their trailing 1032 newlines, i.e. ``text.splitlines()``. 1033 1034 """ 1035 1036 def __init__(self, lines): 1037 self.lines = lines 1038 1039 def str(self): 1040 """Return the entire original text.""" 1041 return "\n".join(self.lines) 1042 1043 def _getlines(self, lines2): 1044 if isinstance(lines2, str): 1045 lines2 = Source(lines2) 1046 if isinstance(lines2, Source): 1047 lines2 = lines2.strip().lines 1048 return lines2 1049 1050 def fnmatch_lines_random(self, lines2): 1051 """Check lines exist in the output. 1052 1053 The argument is a list of lines which have to occur in the 1054 output, in any order. Each line can contain glob whildcards. 1055 1056 """ 1057 lines2 = self._getlines(lines2) 1058 for line in lines2: 1059 for x in self.lines: 1060 if line == x or fnmatch(x, line): 1061 print_("matched: ", repr(line)) 1062 break 1063 else: 1064 raise ValueError("line %r not found in output" % line) 1065 1066 def get_lines_after(self, fnline): 1067 """Return all lines following the given line in the text. 1068 1069 The given line can contain glob wildcards. 1070 """ 1071 for i, line in enumerate(self.lines): 1072 if fnline == line or fnmatch(line, fnline): 1073 return self.lines[i+1:] 1074 raise ValueError("line %r not found in output" % fnline) 1075 1076 def fnmatch_lines(self, lines2): 1077 """Search the text for matching lines. 1078 1079 The argument is a list of lines which have to match and can 1080 use glob wildcards. If they do not match an pytest.fail() is 1081 called. The matches and non-matches are also printed on 1082 stdout. 1083 1084 """ 1085 def show(arg1, arg2): 1086 py.builtin.print_(arg1, arg2, file=sys.stderr) 1087 lines2 = self._getlines(lines2) 1088 lines1 = self.lines[:] 1089 nextline = None 1090 extralines = [] 1091 __tracebackhide__ = True 1092 for line in lines2: 1093 nomatchprinted = False 1094 while lines1: 1095 nextline = lines1.pop(0) 1096 if line == nextline: 1097 show("exact match:", repr(line)) 1098 break 1099 elif fnmatch(nextline, line): 1100 show("fnmatch:", repr(line)) 1101 show(" with:", repr(nextline)) 1102 break 1103 else: 1104 if not nomatchprinted: 1105 show("nomatch:", repr(line)) 1106 nomatchprinted = True 1107 show(" and:", repr(nextline)) 1108 extralines.append(nextline) 1109 else: 1110 pytest.fail("remains unmatched: %r, see stderr" % (line,)) 1111