1""" (disabled by default) support for testing pytest and pytest plugins. """ 2from __future__ import absolute_import, division, print_function 3 4import codecs 5import gc 6import os 7import platform 8import re 9import subprocess 10import six 11import sys 12import time 13import traceback 14from fnmatch import fnmatch 15 16from weakref import WeakKeyDictionary 17 18from _pytest.capture import MultiCapture, SysCapture 19from _pytest._code import Source 20import py 21import pytest 22from _pytest.main import Session, EXIT_OK 23from _pytest.assertion.rewrite import AssertionRewritingHook 24 25 26PYTEST_FULLPATH = os.path.abspath(pytest.__file__.rstrip("oc")).replace("$py.class", ".py") 27 28 29def pytest_addoption(parser): 30 # group = parser.getgroup("pytester", "pytester (self-tests) options") 31 parser.addoption('--lsof', 32 action="store_true", dest="lsof", default=False, 33 help=("run FD checks if lsof is available")) 34 35 parser.addoption('--runpytest', default="inprocess", dest="runpytest", 36 choices=("inprocess", "subprocess", ), 37 help=("run pytest sub runs in tests using an 'inprocess' " 38 "or 'subprocess' (python -m main) method")) 39 40 41def pytest_configure(config): 42 if config.getvalue("lsof"): 43 checker = LsofFdLeakChecker() 44 if checker.matching_platform(): 45 config.pluginmanager.register(checker) 46 47 48class LsofFdLeakChecker(object): 49 def get_open_files(self): 50 out = self._exec_lsof() 51 open_files = self._parse_lsof_output(out) 52 return open_files 53 54 def _exec_lsof(self): 55 pid = os.getpid() 56 return py.process.cmdexec("lsof -Ffn0 -p %d" % pid) 57 58 def _parse_lsof_output(self, out): 59 def isopen(line): 60 return line.startswith('f') and ("deleted" not in line and 61 'mem' not in line and "txt" not in line and 'cwd' not in line) 62 63 open_files = [] 64 65 for line in out.split("\n"): 66 if isopen(line): 67 fields = line.split('\0') 68 fd = fields[0][1:] 69 filename = fields[1][1:] 70 if filename.startswith('/'): 71 open_files.append((fd, filename)) 72 73 return open_files 74 75 def matching_platform(self): 76 try: 77 py.process.cmdexec("lsof -v") 78 except (py.process.cmdexec.Error, UnicodeDecodeError): 79 # cmdexec may raise UnicodeDecodeError on Windows systems 80 # with locale other than english: 81 # https://bitbucket.org/pytest-dev/py/issues/66 82 return False 83 else: 84 return True 85 86 @pytest.hookimpl(hookwrapper=True, tryfirst=True) 87 def pytest_runtest_protocol(self, item): 88 lines1 = self.get_open_files() 89 yield 90 if hasattr(sys, "pypy_version_info"): 91 gc.collect() 92 lines2 = self.get_open_files() 93 94 new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1]) 95 leaked_files = [t for t in lines2 if t[0] in new_fds] 96 if leaked_files: 97 error = [] 98 error.append("***** %s FD leakage detected" % len(leaked_files)) 99 error.extend([str(f) for f in leaked_files]) 100 error.append("*** Before:") 101 error.extend([str(f) for f in lines1]) 102 error.append("*** After:") 103 error.extend([str(f) for f in lines2]) 104 error.append(error[0]) 105 error.append("*** function %s:%s: %s " % item.location) 106 error.append("See issue #2366") 107 item.warn('', "\n".join(error)) 108 109 110# XXX copied from execnet's conftest.py - needs to be merged 111winpymap = { 112 'python2.7': r'C:\Python27\python.exe', 113 'python3.4': r'C:\Python34\python.exe', 114 'python3.5': r'C:\Python35\python.exe', 115 'python3.6': r'C:\Python36\python.exe', 116} 117 118 119def getexecutable(name, cache={}): 120 try: 121 return cache[name] 122 except KeyError: 123 executable = py.path.local.sysfind(name) 124 if executable: 125 import subprocess 126 popen = subprocess.Popen([str(executable), "--version"], 127 universal_newlines=True, stderr=subprocess.PIPE) 128 out, err = popen.communicate() 129 if name == "jython": 130 if not err or "2.5" not in err: 131 executable = None 132 if "2.5.2" in err: 133 executable = None # http://bugs.jython.org/issue1790 134 elif popen.returncode != 0: 135 # Handle pyenv's 127. 136 executable = None 137 cache[name] = executable 138 return executable 139 140 141@pytest.fixture(params=['python2.7', 'python3.4', 'pypy', 'pypy3']) 142def anypython(request): 143 name = request.param 144 executable = getexecutable(name) 145 if executable is None: 146 if sys.platform == "win32": 147 executable = winpymap.get(name, None) 148 if executable: 149 executable = py.path.local(executable) 150 if executable.check(): 151 return executable 152 pytest.skip("no suitable %s found" % (name,)) 153 return executable 154 155# used at least by pytest-xdist plugin 156 157 158@pytest.fixture 159def _pytest(request): 160 """ Return a helper which offers a gethookrecorder(hook) 161 method which returns a HookRecorder instance which helps 162 to make assertions about called hooks. 163 """ 164 return PytestArg(request) 165 166 167class PytestArg: 168 def __init__(self, request): 169 self.request = request 170 171 def gethookrecorder(self, hook): 172 hookrecorder = HookRecorder(hook._pm) 173 self.request.addfinalizer(hookrecorder.finish_recording) 174 return hookrecorder 175 176 177def get_public_names(values): 178 """Only return names from iterator values without a leading underscore.""" 179 return [x for x in values if x[0] != "_"] 180 181 182class ParsedCall: 183 def __init__(self, name, kwargs): 184 self.__dict__.update(kwargs) 185 self._name = name 186 187 def __repr__(self): 188 d = self.__dict__.copy() 189 del d['_name'] 190 return "<ParsedCall %r(**%r)>" % (self._name, d) 191 192 193class HookRecorder: 194 """Record all hooks called in a plugin manager. 195 196 This wraps all the hook calls in the plugin manager, recording 197 each call before propagating the normal calls. 198 199 """ 200 201 def __init__(self, pluginmanager): 202 self._pluginmanager = pluginmanager 203 self.calls = [] 204 205 def before(hook_name, hook_impls, kwargs): 206 self.calls.append(ParsedCall(hook_name, kwargs)) 207 208 def after(outcome, hook_name, hook_impls, kwargs): 209 pass 210 211 self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after) 212 213 def finish_recording(self): 214 self._undo_wrapping() 215 216 def getcalls(self, names): 217 if isinstance(names, str): 218 names = names.split() 219 return [call for call in self.calls if call._name in names] 220 221 def assert_contains(self, entries): 222 __tracebackhide__ = True 223 i = 0 224 entries = list(entries) 225 backlocals = sys._getframe(1).f_locals 226 while entries: 227 name, check = entries.pop(0) 228 for ind, call in enumerate(self.calls[i:]): 229 if call._name == name: 230 print("NAMEMATCH", name, call) 231 if eval(check, backlocals, call.__dict__): 232 print("CHECKERMATCH", repr(check), "->", call) 233 else: 234 print("NOCHECKERMATCH", repr(check), "-", call) 235 continue 236 i += ind + 1 237 break 238 print("NONAMEMATCH", name, "with", call) 239 else: 240 pytest.fail("could not find %r check %r" % (name, check)) 241 242 def popcall(self, name): 243 __tracebackhide__ = True 244 for i, call in enumerate(self.calls): 245 if call._name == name: 246 del self.calls[i] 247 return call 248 lines = ["could not find call %r, in:" % (name,)] 249 lines.extend([" %s" % str(x) for x in self.calls]) 250 pytest.fail("\n".join(lines)) 251 252 def getcall(self, name): 253 values = self.getcalls(name) 254 assert len(values) == 1, (name, values) 255 return values[0] 256 257 # functionality for test reports 258 259 def getreports(self, 260 names="pytest_runtest_logreport pytest_collectreport"): 261 return [x.report for x in self.getcalls(names)] 262 263 def matchreport(self, inamepart="", 264 names="pytest_runtest_logreport pytest_collectreport", when=None): 265 """ return a testreport whose dotted import path matches """ 266 values = [] 267 for rep in self.getreports(names=names): 268 try: 269 if not when and rep.when != "call" and rep.passed: 270 # setup/teardown passing reports - let's ignore those 271 continue 272 except AttributeError: 273 pass 274 if when and getattr(rep, 'when', None) != when: 275 continue 276 if not inamepart or inamepart in rep.nodeid.split("::"): 277 values.append(rep) 278 if not values: 279 raise ValueError("could not find test report matching %r: " 280 "no test reports at all!" % (inamepart,)) 281 if len(values) > 1: 282 raise ValueError( 283 "found 2 or more testreports matching %r: %s" % (inamepart, values)) 284 return values[0] 285 286 def getfailures(self, 287 names='pytest_runtest_logreport pytest_collectreport'): 288 return [rep for rep in self.getreports(names) if rep.failed] 289 290 def getfailedcollections(self): 291 return self.getfailures('pytest_collectreport') 292 293 def listoutcomes(self): 294 passed = [] 295 skipped = [] 296 failed = [] 297 for rep in self.getreports( 298 "pytest_collectreport pytest_runtest_logreport"): 299 if rep.passed: 300 if getattr(rep, "when", None) == "call": 301 passed.append(rep) 302 elif rep.skipped: 303 skipped.append(rep) 304 elif rep.failed: 305 failed.append(rep) 306 return passed, skipped, failed 307 308 def countoutcomes(self): 309 return [len(x) for x in self.listoutcomes()] 310 311 def assertoutcome(self, passed=0, skipped=0, failed=0): 312 realpassed, realskipped, realfailed = self.listoutcomes() 313 assert passed == len(realpassed) 314 assert skipped == len(realskipped) 315 assert failed == len(realfailed) 316 317 def clear(self): 318 self.calls[:] = [] 319 320 321@pytest.fixture 322def linecomp(request): 323 return LineComp() 324 325 326@pytest.fixture(name='LineMatcher') 327def LineMatcher_fixture(request): 328 return LineMatcher 329 330 331@pytest.fixture 332def testdir(request, tmpdir_factory): 333 return Testdir(request, tmpdir_factory) 334 335 336rex_outcome = re.compile(r"(\d+) ([\w-]+)") 337 338 339class RunResult: 340 """The result of running a command. 341 342 Attributes: 343 344 :ret: The return value. 345 :outlines: List of lines captured from stdout. 346 :errlines: List of lines captures from stderr. 347 :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to 348 reconstruct stdout or the commonly used 349 ``stdout.fnmatch_lines()`` method. 350 :stderrr: :py:class:`LineMatcher` of stderr. 351 :duration: Duration in seconds. 352 353 """ 354 355 def __init__(self, ret, outlines, errlines, duration): 356 self.ret = ret 357 self.outlines = outlines 358 self.errlines = errlines 359 self.stdout = LineMatcher(outlines) 360 self.stderr = LineMatcher(errlines) 361 self.duration = duration 362 363 def parseoutcomes(self): 364 """ Return a dictionary of outcomestring->num from parsing 365 the terminal output that the test process produced.""" 366 for line in reversed(self.outlines): 367 if 'seconds' in line: 368 outcomes = rex_outcome.findall(line) 369 if outcomes: 370 d = {} 371 for num, cat in outcomes: 372 d[cat] = int(num) 373 return d 374 raise ValueError("Pytest terminal report not found") 375 376 def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0): 377 """ assert that the specified outcomes appear with the respective 378 numbers (0 means it didn't occur) in the text output from a test run.""" 379 d = self.parseoutcomes() 380 obtained = { 381 'passed': d.get('passed', 0), 382 'skipped': d.get('skipped', 0), 383 'failed': d.get('failed', 0), 384 'error': d.get('error', 0), 385 } 386 assert obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error) 387 388 389class Testdir: 390 """Temporary test directory with tools to test/run pytest itself. 391 392 This is based on the ``tmpdir`` fixture but provides a number of 393 methods which aid with testing pytest itself. Unless 394 :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as 395 current working directory. 396 397 Attributes: 398 399 :tmpdir: The :py:class:`py.path.local` instance of the temporary 400 directory. 401 402 :plugins: A list of plugins to use with :py:meth:`parseconfig` and 403 :py:meth:`runpytest`. Initially this is an empty list but 404 plugins can be added to the list. The type of items to add to 405 the list depend on the method which uses them so refer to them 406 for details. 407 408 """ 409 410 def __init__(self, request, tmpdir_factory): 411 self.request = request 412 self._mod_collections = WeakKeyDictionary() 413 name = request.function.__name__ 414 self.tmpdir = tmpdir_factory.mktemp(name, numbered=True) 415 self.plugins = [] 416 self._savesyspath = (list(sys.path), list(sys.meta_path)) 417 self._savemodulekeys = set(sys.modules) 418 self.chdir() # always chdir 419 self.request.addfinalizer(self.finalize) 420 method = self.request.config.getoption("--runpytest") 421 if method == "inprocess": 422 self._runpytest_method = self.runpytest_inprocess 423 elif method == "subprocess": 424 self._runpytest_method = self.runpytest_subprocess 425 426 def __repr__(self): 427 return "<Testdir %r>" % (self.tmpdir,) 428 429 def finalize(self): 430 """Clean up global state artifacts. 431 432 Some methods modify the global interpreter state and this 433 tries to clean this up. It does not remove the temporary 434 directory however so it can be looked at after the test run 435 has finished. 436 437 """ 438 sys.path[:], sys.meta_path[:] = self._savesyspath 439 if hasattr(self, '_olddir'): 440 self._olddir.chdir() 441 self.delete_loaded_modules() 442 443 def delete_loaded_modules(self): 444 """Delete modules that have been loaded during a test. 445 446 This allows the interpreter to catch module changes in case 447 the module is re-imported. 448 """ 449 for name in set(sys.modules).difference(self._savemodulekeys): 450 # some zope modules used by twisted-related tests keeps internal 451 # state and can't be deleted; we had some trouble in the past 452 # with zope.interface for example 453 if not name.startswith("zope"): 454 del sys.modules[name] 455 456 def make_hook_recorder(self, pluginmanager): 457 """Create a new :py:class:`HookRecorder` for a PluginManager.""" 458 assert not hasattr(pluginmanager, "reprec") 459 pluginmanager.reprec = reprec = HookRecorder(pluginmanager) 460 self.request.addfinalizer(reprec.finish_recording) 461 return reprec 462 463 def chdir(self): 464 """Cd into the temporary directory. 465 466 This is done automatically upon instantiation. 467 468 """ 469 old = self.tmpdir.chdir() 470 if not hasattr(self, '_olddir'): 471 self._olddir = old 472 473 def _makefile(self, ext, args, kwargs, encoding='utf-8'): 474 items = list(kwargs.items()) 475 476 def to_text(s): 477 return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s) 478 479 if args: 480 source = u"\n".join(to_text(x) for x in args) 481 basename = self.request.function.__name__ 482 items.insert(0, (basename, source)) 483 484 ret = None 485 for basename, value in items: 486 p = self.tmpdir.join(basename).new(ext=ext) 487 p.dirpath().ensure_dir() 488 source = Source(value) 489 source = u"\n".join(to_text(line) for line in source.lines) 490 p.write(source.strip().encode(encoding), "wb") 491 if ret is None: 492 ret = p 493 return ret 494 495 def makefile(self, ext, *args, **kwargs): 496 """Create a new file in the testdir. 497 498 ext: The extension the file should use, including the dot. 499 E.g. ".py". 500 501 args: All args will be treated as strings and joined using 502 newlines. The result will be written as contents to the 503 file. The name of the file will be based on the test 504 function requesting this fixture. 505 E.g. "testdir.makefile('.txt', 'line1', 'line2')" 506 507 kwargs: Each keyword is the name of a file, while the value of 508 it will be written as contents of the file. 509 E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')" 510 511 """ 512 return self._makefile(ext, args, kwargs) 513 514 def makeconftest(self, source): 515 """Write a contest.py file with 'source' as contents.""" 516 return self.makepyfile(conftest=source) 517 518 def makeini(self, source): 519 """Write a tox.ini file with 'source' as contents.""" 520 return self.makefile('.ini', tox=source) 521 522 def getinicfg(self, source): 523 """Return the pytest section from the tox.ini config file.""" 524 p = self.makeini(source) 525 return py.iniconfig.IniConfig(p)['pytest'] 526 527 def makepyfile(self, *args, **kwargs): 528 """Shortcut for .makefile() with a .py extension.""" 529 return self._makefile('.py', args, kwargs) 530 531 def maketxtfile(self, *args, **kwargs): 532 """Shortcut for .makefile() with a .txt extension.""" 533 return self._makefile('.txt', args, kwargs) 534 535 def syspathinsert(self, path=None): 536 """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`. 537 538 This is undone automatically after the test. 539 """ 540 if path is None: 541 path = self.tmpdir 542 sys.path.insert(0, str(path)) 543 # a call to syspathinsert() usually means that the caller 544 # wants to import some dynamically created files. 545 # with python3 we thus invalidate import caches. 546 self._possibly_invalidate_import_caches() 547 548 def _possibly_invalidate_import_caches(self): 549 # invalidate caches if we can (py33 and above) 550 try: 551 import importlib 552 except ImportError: 553 pass 554 else: 555 if hasattr(importlib, "invalidate_caches"): 556 importlib.invalidate_caches() 557 558 def mkdir(self, name): 559 """Create a new (sub)directory.""" 560 return self.tmpdir.mkdir(name) 561 562 def mkpydir(self, name): 563 """Create a new python package. 564 565 This creates a (sub)directory with an empty ``__init__.py`` 566 file so that is recognised as a python package. 567 568 """ 569 p = self.mkdir(name) 570 p.ensure("__init__.py") 571 return p 572 573 Session = Session 574 575 def getnode(self, config, arg): 576 """Return the collection node of a file. 577 578 :param config: :py:class:`_pytest.config.Config` instance, see 579 :py:meth:`parseconfig` and :py:meth:`parseconfigure` to 580 create the configuration. 581 582 :param arg: A :py:class:`py.path.local` instance of the file. 583 584 """ 585 session = Session(config) 586 assert '::' not in str(arg) 587 p = py.path.local(arg) 588 config.hook.pytest_sessionstart(session=session) 589 res = session.perform_collect([str(p)], genitems=False)[0] 590 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 591 return res 592 593 def getpathnode(self, path): 594 """Return the collection node of a file. 595 596 This is like :py:meth:`getnode` but uses 597 :py:meth:`parseconfigure` to create the (configured) pytest 598 Config instance. 599 600 :param path: A :py:class:`py.path.local` instance of the file. 601 602 """ 603 config = self.parseconfigure(path) 604 session = Session(config) 605 x = session.fspath.bestrelpath(path) 606 config.hook.pytest_sessionstart(session=session) 607 res = session.perform_collect([x], genitems=False)[0] 608 config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) 609 return res 610 611 def genitems(self, colitems): 612 """Generate all test items from a collection node. 613 614 This recurses into the collection node and returns a list of 615 all the test items contained within. 616 617 """ 618 session = colitems[0].session 619 result = [] 620 for colitem in colitems: 621 result.extend(session.genitems(colitem)) 622 return result 623 624 def runitem(self, source): 625 """Run the "test_func" Item. 626 627 The calling test instance (the class which contains the test 628 method) must provide a ``.getrunner()`` method which should 629 return a runner which can run the test protocol for a single 630 item, like e.g. :py:func:`_pytest.runner.runtestprotocol`. 631 632 """ 633 # used from runner functional tests 634 item = self.getitem(source) 635 # the test class where we are called from wants to provide the runner 636 testclassinstance = self.request.instance 637 runner = testclassinstance.getrunner() 638 return runner(item) 639 640 def inline_runsource(self, source, *cmdlineargs): 641 """Run a test module in process using ``pytest.main()``. 642 643 This run writes "source" into a temporary file and runs 644 ``pytest.main()`` on it, returning a :py:class:`HookRecorder` 645 instance for the result. 646 647 :param source: The source code of the test module. 648 649 :param cmdlineargs: Any extra command line arguments to use. 650 651 :return: :py:class:`HookRecorder` instance of the result. 652 653 """ 654 p = self.makepyfile(source) 655 values = list(cmdlineargs) + [p] 656 return self.inline_run(*values) 657 658 def inline_genitems(self, *args): 659 """Run ``pytest.main(['--collectonly'])`` in-process. 660 661 Returns a tuple of the collected items and a 662 :py:class:`HookRecorder` instance. 663 664 This runs the :py:func:`pytest.main` function to run all of 665 pytest inside the test process itself like 666 :py:meth:`inline_run`. However the return value is a tuple of 667 the collection items and a :py:class:`HookRecorder` instance. 668 669 """ 670 rec = self.inline_run("--collect-only", *args) 671 items = [x.item for x in rec.getcalls("pytest_itemcollected")] 672 return items, rec 673 674 def inline_run(self, *args, **kwargs): 675 """Run ``pytest.main()`` in-process, returning a HookRecorder. 676 677 This runs the :py:func:`pytest.main` function to run all of 678 pytest inside the test process itself. This means it can 679 return a :py:class:`HookRecorder` instance which gives more 680 detailed results from then run then can be done by matching 681 stdout/stderr from :py:meth:`runpytest`. 682 683 :param args: Any command line arguments to pass to 684 :py:func:`pytest.main`. 685 686 :param plugin: (keyword-only) Extra plugin instances the 687 ``pytest.main()`` instance should use. 688 689 :return: A :py:class:`HookRecorder` instance. 690 """ 691 # When running py.test inline any plugins active in the main 692 # test process are already imported. So this disables the 693 # warning which will trigger to say they can no longer be 694 # rewritten, which is fine as they are already rewritten. 695 orig_warn = AssertionRewritingHook._warn_already_imported 696 697 def revert(): 698 AssertionRewritingHook._warn_already_imported = orig_warn 699 700 self.request.addfinalizer(revert) 701 AssertionRewritingHook._warn_already_imported = lambda *a: None 702 703 rec = [] 704 705 class Collect: 706 def pytest_configure(x, config): 707 rec.append(self.make_hook_recorder(config.pluginmanager)) 708 709 plugins = kwargs.get("plugins") or [] 710 plugins.append(Collect()) 711 ret = pytest.main(list(args), plugins=plugins) 712 self.delete_loaded_modules() 713 if len(rec) == 1: 714 reprec = rec.pop() 715 else: 716 class reprec: 717 pass 718 reprec.ret = ret 719 720 # typically we reraise keyboard interrupts from the child run 721 # because it's our user requesting interruption of the testing 722 if ret == 2 and not kwargs.get("no_reraise_ctrlc"): 723 calls = reprec.getcalls("pytest_keyboard_interrupt") 724 if calls and calls[-1].excinfo.type == KeyboardInterrupt: 725 raise KeyboardInterrupt() 726 return reprec 727 728 def runpytest_inprocess(self, *args, **kwargs): 729 """ Return result of running pytest in-process, providing a similar 730 interface to what self.runpytest() provides. """ 731 if kwargs.get("syspathinsert"): 732 self.syspathinsert() 733 now = time.time() 734 capture = MultiCapture(Capture=SysCapture) 735 capture.start_capturing() 736 try: 737 try: 738 reprec = self.inline_run(*args, **kwargs) 739 except SystemExit as e: 740 741 class reprec: 742 ret = e.args[0] 743 744 except Exception: 745 traceback.print_exc() 746 747 class reprec: 748 ret = 3 749 finally: 750 out, err = capture.readouterr() 751 capture.stop_capturing() 752 sys.stdout.write(out) 753 sys.stderr.write(err) 754 755 res = RunResult(reprec.ret, 756 out.split("\n"), err.split("\n"), 757 time.time() - now) 758 res.reprec = reprec 759 return res 760 761 def runpytest(self, *args, **kwargs): 762 """ Run pytest inline or in a subprocess, depending on the command line 763 option "--runpytest" and return a :py:class:`RunResult`. 764 765 """ 766 args = self._ensure_basetemp(args) 767 return self._runpytest_method(*args, **kwargs) 768 769 def _ensure_basetemp(self, args): 770 args = [str(x) for x in args] 771 for x in args: 772 if str(x).startswith('--basetemp'): 773 # print("basedtemp exists: %s" %(args,)) 774 break 775 else: 776 args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) 777 # print("added basetemp: %s" %(args,)) 778 return args 779 780 def parseconfig(self, *args): 781 """Return a new pytest Config instance from given commandline args. 782 783 This invokes the pytest bootstrapping code in _pytest.config 784 to create a new :py:class:`_pytest.core.PluginManager` and 785 call the pytest_cmdline_parse hook to create new 786 :py:class:`_pytest.config.Config` instance. 787 788 If :py:attr:`plugins` has been populated they should be plugin 789 modules which will be registered with the PluginManager. 790 791 """ 792 args = self._ensure_basetemp(args) 793 794 import _pytest.config 795 config = _pytest.config._prepareconfig(args, self.plugins) 796 # we don't know what the test will do with this half-setup config 797 # object and thus we make sure it gets unconfigured properly in any 798 # case (otherwise capturing could still be active, for example) 799 self.request.addfinalizer(config._ensure_unconfigure) 800 return config 801 802 def parseconfigure(self, *args): 803 """Return a new pytest configured Config instance. 804 805 This returns a new :py:class:`_pytest.config.Config` instance 806 like :py:meth:`parseconfig`, but also calls the 807 pytest_configure hook. 808 809 """ 810 config = self.parseconfig(*args) 811 config._do_configure() 812 self.request.addfinalizer(config._ensure_unconfigure) 813 return config 814 815 def getitem(self, source, funcname="test_func"): 816 """Return the test item for a test function. 817 818 This writes the source to a python file and runs pytest's 819 collection on the resulting module, returning the test item 820 for the requested function name. 821 822 :param source: The module source. 823 824 :param funcname: The name of the test function for which the 825 Item must be returned. 826 827 """ 828 items = self.getitems(source) 829 for item in items: 830 if item.name == funcname: 831 return item 832 assert 0, "%r item not found in module:\n%s\nitems: %s" % ( 833 funcname, source, items) 834 835 def getitems(self, source): 836 """Return all test items collected from the module. 837 838 This writes the source to a python file and runs pytest's 839 collection on the resulting module, returning all test items 840 contained within. 841 842 """ 843 modcol = self.getmodulecol(source) 844 return self.genitems([modcol]) 845 846 def getmodulecol(self, source, configargs=(), withinit=False): 847 """Return the module collection node for ``source``. 848 849 This writes ``source`` to a file using :py:meth:`makepyfile` 850 and then runs the pytest collection on it, returning the 851 collection node for the test module. 852 853 :param source: The source code of the module to collect. 854 855 :param configargs: Any extra arguments to pass to 856 :py:meth:`parseconfigure`. 857 858 :param withinit: Whether to also write a ``__init__.py`` file 859 to the temporary directory to ensure it is a package. 860 861 """ 862 kw = {self.request.function.__name__: Source(source).strip()} 863 path = self.makepyfile(**kw) 864 if withinit: 865 self.makepyfile(__init__="#") 866 self.config = config = self.parseconfigure(path, *configargs) 867 node = self.getnode(config, path) 868 869 return node 870 871 def collect_by_name(self, modcol, name): 872 """Return the collection node for name from the module collection. 873 874 This will search a module collection node for a collection 875 node matching the given name. 876 877 :param modcol: A module collection node, see 878 :py:meth:`getmodulecol`. 879 880 :param name: The name of the node to return. 881 882 """ 883 if modcol not in self._mod_collections: 884 self._mod_collections[modcol] = list(modcol.collect()) 885 for colitem in self._mod_collections[modcol]: 886 if colitem.name == name: 887 return colitem 888 889 def popen(self, cmdargs, stdout, stderr, **kw): 890 """Invoke subprocess.Popen. 891 892 This calls subprocess.Popen making sure the current working 893 directory is the PYTHONPATH. 894 895 You probably want to use :py:meth:`run` instead. 896 897 """ 898 env = os.environ.copy() 899 env['PYTHONPATH'] = os.pathsep.join(filter(None, [ 900 str(os.getcwd()), env.get('PYTHONPATH', '')])) 901 kw['env'] = env 902 903 popen = subprocess.Popen(cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw) 904 popen.stdin.close() 905 906 return popen 907 908 def run(self, *cmdargs): 909 """Run a command with arguments. 910 911 Run a process using subprocess.Popen saving the stdout and 912 stderr. 913 914 Returns a :py:class:`RunResult`. 915 916 """ 917 return self._run(*cmdargs) 918 919 def _run(self, *cmdargs): 920 cmdargs = [str(x) for x in cmdargs] 921 p1 = self.tmpdir.join("stdout") 922 p2 = self.tmpdir.join("stderr") 923 print("running:", ' '.join(cmdargs)) 924 print(" in:", str(py.path.local())) 925 f1 = codecs.open(str(p1), "w", encoding="utf8") 926 f2 = codecs.open(str(p2), "w", encoding="utf8") 927 try: 928 now = time.time() 929 popen = self.popen(cmdargs, stdout=f1, stderr=f2, 930 close_fds=(sys.platform != "win32")) 931 ret = popen.wait() 932 finally: 933 f1.close() 934 f2.close() 935 f1 = codecs.open(str(p1), "r", encoding="utf8") 936 f2 = codecs.open(str(p2), "r", encoding="utf8") 937 try: 938 out = f1.read().splitlines() 939 err = f2.read().splitlines() 940 finally: 941 f1.close() 942 f2.close() 943 self._dump_lines(out, sys.stdout) 944 self._dump_lines(err, sys.stderr) 945 return RunResult(ret, out, err, time.time() - now) 946 947 def _dump_lines(self, lines, fp): 948 try: 949 for line in lines: 950 print(line, file=fp) 951 except UnicodeEncodeError: 952 print("couldn't print to %s because of encoding" % (fp,)) 953 954 def _getpytestargs(self): 955 # we cannot use "(sys.executable,script)" 956 # because on windows the script is e.g. a pytest.exe 957 return (sys.executable, PYTEST_FULLPATH) # noqa 958 959 def runpython(self, script): 960 """Run a python script using sys.executable as interpreter. 961 962 Returns a :py:class:`RunResult`. 963 """ 964 return self.run(sys.executable, script) 965 966 def runpython_c(self, command): 967 """Run python -c "command", return a :py:class:`RunResult`.""" 968 return self.run(sys.executable, "-c", command) 969 970 def runpytest_subprocess(self, *args, **kwargs): 971 """Run pytest as a subprocess with given arguments. 972 973 Any plugins added to the :py:attr:`plugins` list will added 974 using the ``-p`` command line option. Addtionally 975 ``--basetemp`` is used put any temporary files and directories 976 in a numbered directory prefixed with "runpytest-" so they do 977 not conflict with the normal numberd pytest location for 978 temporary files and directories. 979 980 Returns a :py:class:`RunResult`. 981 982 """ 983 p = py.path.local.make_numbered_dir(prefix="runpytest-", 984 keep=None, rootdir=self.tmpdir) 985 args = ('--basetemp=%s' % p, ) + args 986 # for x in args: 987 # if '--confcutdir' in str(x): 988 # break 989 # else: 990 # pass 991 # args = ('--confcutdir=.',) + args 992 plugins = [x for x in self.plugins if isinstance(x, str)] 993 if plugins: 994 args = ('-p', plugins[0]) + args 995 args = self._getpytestargs() + args 996 return self.run(*args) 997 998 def spawn_pytest(self, string, expect_timeout=10.0): 999 """Run pytest using pexpect. 1000 1001 This makes sure to use the right pytest and sets up the 1002 temporary directory locations. 1003 1004 The pexpect child is returned. 1005 1006 """ 1007 basetemp = self.tmpdir.mkdir("temp-pexpect") 1008 invoke = " ".join(map(str, self._getpytestargs())) 1009 cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string) 1010 return self.spawn(cmd, expect_timeout=expect_timeout) 1011 1012 def spawn(self, cmd, expect_timeout=10.0): 1013 """Run a command using pexpect. 1014 1015 The pexpect child is returned. 1016 """ 1017 pexpect = pytest.importorskip("pexpect", "3.0") 1018 if hasattr(sys, 'pypy_version_info') and '64' in platform.machine(): 1019 pytest.skip("pypy-64 bit not supported") 1020 if sys.platform.startswith("freebsd"): 1021 pytest.xfail("pexpect does not work reliably on freebsd") 1022 logfile = self.tmpdir.join("spawn.out").open("wb") 1023 child = pexpect.spawn(cmd, logfile=logfile) 1024 self.request.addfinalizer(logfile.close) 1025 child.timeout = expect_timeout 1026 return child 1027 1028 1029def getdecoded(out): 1030 try: 1031 return out.decode("utf-8") 1032 except UnicodeDecodeError: 1033 return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( 1034 py.io.saferepr(out),) 1035 1036 1037class LineComp: 1038 def __init__(self): 1039 self.stringio = py.io.TextIO() 1040 1041 def assert_contains_lines(self, lines2): 1042 """ assert that lines2 are contained (linearly) in lines1. 1043 return a list of extralines found. 1044 """ 1045 __tracebackhide__ = True 1046 val = self.stringio.getvalue() 1047 self.stringio.truncate(0) 1048 self.stringio.seek(0) 1049 lines1 = val.split("\n") 1050 return LineMatcher(lines1).fnmatch_lines(lines2) 1051 1052 1053class LineMatcher: 1054 """Flexible matching of text. 1055 1056 This is a convenience class to test large texts like the output of 1057 commands. 1058 1059 The constructor takes a list of lines without their trailing 1060 newlines, i.e. ``text.splitlines()``. 1061 1062 """ 1063 1064 def __init__(self, lines): 1065 self.lines = lines 1066 self._log_output = [] 1067 1068 def str(self): 1069 """Return the entire original text.""" 1070 return "\n".join(self.lines) 1071 1072 def _getlines(self, lines2): 1073 if isinstance(lines2, str): 1074 lines2 = Source(lines2) 1075 if isinstance(lines2, Source): 1076 lines2 = lines2.strip().lines 1077 return lines2 1078 1079 def fnmatch_lines_random(self, lines2): 1080 """Check lines exist in the output using ``fnmatch.fnmatch``, in any order. 1081 1082 The argument is a list of lines which have to occur in the 1083 output, in any order. 1084 """ 1085 self._match_lines_random(lines2, fnmatch) 1086 1087 def re_match_lines_random(self, lines2): 1088 """Check lines exist in the output using ``re.match``, in any order. 1089 1090 The argument is a list of lines which have to occur in the 1091 output, in any order. 1092 1093 """ 1094 self._match_lines_random(lines2, lambda name, pat: re.match(pat, name)) 1095 1096 def _match_lines_random(self, lines2, match_func): 1097 """Check lines exist in the output. 1098 1099 The argument is a list of lines which have to occur in the 1100 output, in any order. Each line can contain glob whildcards. 1101 1102 """ 1103 lines2 = self._getlines(lines2) 1104 for line in lines2: 1105 for x in self.lines: 1106 if line == x or match_func(x, line): 1107 self._log("matched: ", repr(line)) 1108 break 1109 else: 1110 self._log("line %r not found in output" % line) 1111 raise ValueError(self._log_text) 1112 1113 def get_lines_after(self, fnline): 1114 """Return all lines following the given line in the text. 1115 1116 The given line can contain glob wildcards. 1117 """ 1118 for i, line in enumerate(self.lines): 1119 if fnline == line or fnmatch(line, fnline): 1120 return self.lines[i + 1:] 1121 raise ValueError("line %r not found in output" % fnline) 1122 1123 def _log(self, *args): 1124 self._log_output.append(' '.join((str(x) for x in args))) 1125 1126 @property 1127 def _log_text(self): 1128 return '\n'.join(self._log_output) 1129 1130 def fnmatch_lines(self, lines2): 1131 """Search captured text for matching lines using ``fnmatch.fnmatch``. 1132 1133 The argument is a list of lines which have to match and can 1134 use glob wildcards. If they do not match a pytest.fail() is 1135 called. The matches and non-matches are also printed on 1136 stdout. 1137 1138 """ 1139 self._match_lines(lines2, fnmatch, 'fnmatch') 1140 1141 def re_match_lines(self, lines2): 1142 """Search captured text for matching lines using ``re.match``. 1143 1144 The argument is a list of lines which have to match using ``re.match``. 1145 If they do not match a pytest.fail() is called. 1146 1147 The matches and non-matches are also printed on 1148 stdout. 1149 """ 1150 self._match_lines(lines2, lambda name, pat: re.match(pat, name), 're.match') 1151 1152 def _match_lines(self, lines2, match_func, match_nickname): 1153 """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``. 1154 1155 :param list[str] lines2: list of string patterns to match. The actual format depends on 1156 ``match_func``. 1157 :param match_func: a callable ``match_func(line, pattern)`` where line is the captured 1158 line from stdout/stderr and pattern is the matching pattern. 1159 1160 :param str match_nickname: the nickname for the match function that will be logged 1161 to stdout when a match occurs. 1162 """ 1163 lines2 = self._getlines(lines2) 1164 lines1 = self.lines[:] 1165 nextline = None 1166 extralines = [] 1167 __tracebackhide__ = True 1168 for line in lines2: 1169 nomatchprinted = False 1170 while lines1: 1171 nextline = lines1.pop(0) 1172 if line == nextline: 1173 self._log("exact match:", repr(line)) 1174 break 1175 elif match_func(nextline, line): 1176 self._log("%s:" % match_nickname, repr(line)) 1177 self._log(" with:", repr(nextline)) 1178 break 1179 else: 1180 if not nomatchprinted: 1181 self._log("nomatch:", repr(line)) 1182 nomatchprinted = True 1183 self._log(" and:", repr(nextline)) 1184 extralines.append(nextline) 1185 else: 1186 self._log("remains unmatched: %r" % (line,)) 1187 pytest.fail(self._log_text) 1188