1""" (disabled by default) support for testing pytest and pytest plugins. """
2from __future__ import absolute_import, division, print_function
3
4import codecs
5import gc
6import os
7import platform
8import re
9import subprocess
10import sys
11import time
12import traceback
13from fnmatch import fnmatch
14
15from weakref import WeakKeyDictionary
16
17from _pytest.capture import MultiCapture, SysCapture
18from _pytest._code import Source
19import py
20import pytest
21from _pytest.main import Session, EXIT_OK
22from _pytest.assertion.rewrite import AssertionRewritingHook
23
24
25def pytest_addoption(parser):
26    # group = parser.getgroup("pytester", "pytester (self-tests) options")
27    parser.addoption('--lsof',
28           action="store_true", dest="lsof", default=False,
29           help=("run FD checks if lsof is available"))
30
31    parser.addoption('--runpytest', default="inprocess", dest="runpytest",
32           choices=("inprocess", "subprocess", ),
33           help=("run pytest sub runs in tests using an 'inprocess' "
34                 "or 'subprocess' (python -m main) method"))
35
36
37def pytest_configure(config):
38    # This might be called multiple times. Only take the first.
39    global _pytest_fullpath
40    try:
41        _pytest_fullpath
42    except NameError:
43        _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
44        _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
45
46    if config.getvalue("lsof"):
47        checker = LsofFdLeakChecker()
48        if checker.matching_platform():
49            config.pluginmanager.register(checker)
50
51
52class LsofFdLeakChecker(object):
53    def get_open_files(self):
54        out = self._exec_lsof()
55        open_files = self._parse_lsof_output(out)
56        return open_files
57
58    def _exec_lsof(self):
59        pid = os.getpid()
60        return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
61
62    def _parse_lsof_output(self, out):
63        def isopen(line):
64            return line.startswith('f') and ("deleted" not in line and
65                'mem' not in line and "txt" not in line and 'cwd' not in line)
66
67        open_files = []
68
69        for line in out.split("\n"):
70            if isopen(line):
71                fields = line.split('\0')
72                fd = fields[0][1:]
73                filename = fields[1][1:]
74                if filename.startswith('/'):
75                    open_files.append((fd, filename))
76
77        return open_files
78
79    def matching_platform(self):
80        try:
81            py.process.cmdexec("lsof -v")
82        except (py.process.cmdexec.Error, UnicodeDecodeError):
83            # cmdexec may raise UnicodeDecodeError on Windows systems
84            # with locale other than english:
85            # https://bitbucket.org/pytest-dev/py/issues/66
86            return False
87        else:
88            return True
89
90    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
91    def pytest_runtest_protocol(self, item):
92        lines1 = self.get_open_files()
93        yield
94        if hasattr(sys, "pypy_version_info"):
95            gc.collect()
96        lines2 = self.get_open_files()
97
98        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
99        leaked_files = [t for t in lines2 if t[0] in new_fds]
100        if leaked_files:
101            error = []
102            error.append("***** %s FD leakage detected" % len(leaked_files))
103            error.extend([str(f) for f in leaked_files])
104            error.append("*** Before:")
105            error.extend([str(f) for f in lines1])
106            error.append("*** After:")
107            error.extend([str(f) for f in lines2])
108            error.append(error[0])
109            error.append("*** function %s:%s: %s " % item.location)
110            error.append("See issue #2366")
111            item.warn('', "\n".join(error))
112
113
114# XXX copied from execnet's conftest.py - needs to be merged
115winpymap = {
116    'python2.7': r'C:\Python27\python.exe',
117    'python2.6': r'C:\Python26\python.exe',
118    'python3.1': r'C:\Python31\python.exe',
119    'python3.2': r'C:\Python32\python.exe',
120    'python3.3': r'C:\Python33\python.exe',
121    'python3.4': r'C:\Python34\python.exe',
122    'python3.5': r'C:\Python35\python.exe',
123}
124
125def getexecutable(name, cache={}):
126    try:
127        return cache[name]
128    except KeyError:
129        executable = py.path.local.sysfind(name)
130        if executable:
131            import subprocess
132            popen = subprocess.Popen([str(executable), "--version"],
133                universal_newlines=True, stderr=subprocess.PIPE)
134            out, err = popen.communicate()
135            if name == "jython":
136                if not err or "2.5" not in err:
137                    executable = None
138                if "2.5.2" in err:
139                    executable = None # http://bugs.jython.org/issue1790
140            elif popen.returncode != 0:
141                # Handle pyenv's 127.
142                executable = None
143        cache[name] = executable
144        return executable
145
146@pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4",
147                        'pypy', 'pypy3'])
148def anypython(request):
149    name = request.param
150    executable = getexecutable(name)
151    if executable is None:
152        if sys.platform == "win32":
153            executable = winpymap.get(name, None)
154            if executable:
155                executable = py.path.local(executable)
156                if executable.check():
157                    return executable
158        pytest.skip("no suitable %s found" % (name,))
159    return executable
160
161# used at least by pytest-xdist plugin
162@pytest.fixture
163def _pytest(request):
164    """ Return a helper which offers a gethookrecorder(hook)
165    method which returns a HookRecorder instance which helps
166    to make assertions about called hooks.
167    """
168    return PytestArg(request)
169
170class PytestArg:
171    def __init__(self, request):
172        self.request = request
173
174    def gethookrecorder(self, hook):
175        hookrecorder = HookRecorder(hook._pm)
176        self.request.addfinalizer(hookrecorder.finish_recording)
177        return hookrecorder
178
179
180def get_public_names(l):
181    """Only return names from iterator l without a leading underscore."""
182    return [x for x in l if x[0] != "_"]
183
184
185class ParsedCall:
186    def __init__(self, name, kwargs):
187        self.__dict__.update(kwargs)
188        self._name = name
189
190    def __repr__(self):
191        d = self.__dict__.copy()
192        del d['_name']
193        return "<ParsedCall %r(**%r)>" %(self._name, d)
194
195
196class HookRecorder:
197    """Record all hooks called in a plugin manager.
198
199    This wraps all the hook calls in the plugin manager, recording
200    each call before propagating the normal calls.
201
202    """
203
204    def __init__(self, pluginmanager):
205        self._pluginmanager = pluginmanager
206        self.calls = []
207
208        def before(hook_name, hook_impls, kwargs):
209            self.calls.append(ParsedCall(hook_name, kwargs))
210
211        def after(outcome, hook_name, hook_impls, kwargs):
212            pass
213
214        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
215
216    def finish_recording(self):
217        self._undo_wrapping()
218
219    def getcalls(self, names):
220        if isinstance(names, str):
221            names = names.split()
222        return [call for call in self.calls if call._name in names]
223
224    def assert_contains(self, entries):
225        __tracebackhide__ = True
226        i = 0
227        entries = list(entries)
228        backlocals = sys._getframe(1).f_locals
229        while entries:
230            name, check = entries.pop(0)
231            for ind, call in enumerate(self.calls[i:]):
232                if call._name == name:
233                    print("NAMEMATCH", name, call)
234                    if eval(check, backlocals, call.__dict__):
235                        print("CHECKERMATCH", repr(check), "->", call)
236                    else:
237                        print("NOCHECKERMATCH", repr(check), "-", call)
238                        continue
239                    i += ind + 1
240                    break
241                print("NONAMEMATCH", name, "with", call)
242            else:
243                pytest.fail("could not find %r check %r" % (name, check))
244
245    def popcall(self, name):
246        __tracebackhide__ = True
247        for i, call in enumerate(self.calls):
248            if call._name == name:
249                del self.calls[i]
250                return call
251        lines = ["could not find call %r, in:" % (name,)]
252        lines.extend(["  %s" % str(x) for x in self.calls])
253        pytest.fail("\n".join(lines))
254
255    def getcall(self, name):
256        l = self.getcalls(name)
257        assert len(l) == 1, (name, l)
258        return l[0]
259
260    # functionality for test reports
261
262    def getreports(self,
263                   names="pytest_runtest_logreport pytest_collectreport"):
264        return [x.report for x in self.getcalls(names)]
265
266    def matchreport(self, inamepart="",
267        names="pytest_runtest_logreport pytest_collectreport", when=None):
268        """ return a testreport whose dotted import path matches """
269        l = []
270        for rep in self.getreports(names=names):
271            try:
272                if not when and rep.when != "call" and rep.passed:
273                    # setup/teardown passing reports - let's ignore those
274                    continue
275            except AttributeError:
276                pass
277            if when and getattr(rep, 'when', None) != when:
278                continue
279            if not inamepart or inamepart in rep.nodeid.split("::"):
280                l.append(rep)
281        if not l:
282            raise ValueError("could not find test report matching %r: "
283                             "no test reports at all!" % (inamepart,))
284        if len(l) > 1:
285            raise ValueError(
286                "found 2 or more testreports matching %r: %s" %(inamepart, l))
287        return l[0]
288
289    def getfailures(self,
290                    names='pytest_runtest_logreport pytest_collectreport'):
291        return [rep for rep in self.getreports(names) if rep.failed]
292
293    def getfailedcollections(self):
294        return self.getfailures('pytest_collectreport')
295
296    def listoutcomes(self):
297        passed = []
298        skipped = []
299        failed = []
300        for rep in self.getreports(
301            "pytest_collectreport pytest_runtest_logreport"):
302            if rep.passed:
303                if getattr(rep, "when", None) == "call":
304                    passed.append(rep)
305            elif rep.skipped:
306                skipped.append(rep)
307            elif rep.failed:
308                failed.append(rep)
309        return passed, skipped, failed
310
311    def countoutcomes(self):
312        return [len(x) for x in self.listoutcomes()]
313
314    def assertoutcome(self, passed=0, skipped=0, failed=0):
315        realpassed, realskipped, realfailed = self.listoutcomes()
316        assert passed == len(realpassed)
317        assert skipped == len(realskipped)
318        assert failed == len(realfailed)
319
320    def clear(self):
321        self.calls[:] = []
322
323
324@pytest.fixture
325def linecomp(request):
326    return LineComp()
327
328
329@pytest.fixture(name='LineMatcher')
330def LineMatcher_fixture(request):
331    return LineMatcher
332
333
334@pytest.fixture
335def testdir(request, tmpdir_factory):
336    return Testdir(request, tmpdir_factory)
337
338
339rex_outcome = re.compile(r"(\d+) ([\w-]+)")
340class RunResult:
341    """The result of running a command.
342
343    Attributes:
344
345    :ret: The return value.
346    :outlines: List of lines captured from stdout.
347    :errlines: List of lines captures from stderr.
348    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
349       reconstruct stdout or the commonly used
350       ``stdout.fnmatch_lines()`` method.
351    :stderrr: :py:class:`LineMatcher` of stderr.
352    :duration: Duration in seconds.
353
354    """
355    def __init__(self, ret, outlines, errlines, duration):
356        self.ret = ret
357        self.outlines = outlines
358        self.errlines = errlines
359        self.stdout = LineMatcher(outlines)
360        self.stderr = LineMatcher(errlines)
361        self.duration = duration
362
363    def parseoutcomes(self):
364        """ Return a dictionary of outcomestring->num from parsing
365        the terminal output that the test process produced."""
366        for line in reversed(self.outlines):
367            if 'seconds' in line:
368                outcomes = rex_outcome.findall(line)
369                if outcomes:
370                    d = {}
371                    for num, cat in outcomes:
372                        d[cat] = int(num)
373                    return d
374        raise ValueError("Pytest terminal report not found")
375
376    def assert_outcomes(self, passed=0, skipped=0, failed=0):
377        """ assert that the specified outcomes appear with the respective
378        numbers (0 means it didn't occur) in the text output from a test run."""
379        d = self.parseoutcomes()
380        assert passed == d.get("passed", 0)
381        assert skipped == d.get("skipped", 0)
382        assert failed == d.get("failed", 0)
383
384
385
386class Testdir:
387    """Temporary test directory with tools to test/run pytest itself.
388
389    This is based on the ``tmpdir`` fixture but provides a number of
390    methods which aid with testing pytest itself.  Unless
391    :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as
392    current working directory.
393
394    Attributes:
395
396    :tmpdir: The :py:class:`py.path.local` instance of the temporary
397       directory.
398
399    :plugins: A list of plugins to use with :py:meth:`parseconfig` and
400       :py:meth:`runpytest`.  Initially this is an empty list but
401       plugins can be added to the list.  The type of items to add to
402       the list depend on the method which uses them so refer to them
403       for details.
404
405    """
406
407    def __init__(self, request, tmpdir_factory):
408        self.request = request
409        self._mod_collections  = WeakKeyDictionary()
410        # XXX remove duplication with tmpdir plugin
411        basetmp = tmpdir_factory.ensuretemp("testdir")
412        name = request.function.__name__
413        for i in range(100):
414            try:
415                tmpdir = basetmp.mkdir(name + str(i))
416            except py.error.EEXIST:
417                continue
418            break
419        self.tmpdir = tmpdir
420        self.plugins = []
421        self._savesyspath = (list(sys.path), list(sys.meta_path))
422        self._savemodulekeys = set(sys.modules)
423        self.chdir() # always chdir
424        self.request.addfinalizer(self.finalize)
425        method = self.request.config.getoption("--runpytest")
426        if method == "inprocess":
427            self._runpytest_method = self.runpytest_inprocess
428        elif method == "subprocess":
429            self._runpytest_method = self.runpytest_subprocess
430
431    def __repr__(self):
432        return "<Testdir %r>" % (self.tmpdir,)
433
434    def finalize(self):
435        """Clean up global state artifacts.
436
437        Some methods modify the global interpreter state and this
438        tries to clean this up.  It does not remove the temporary
439        directory however so it can be looked at after the test run
440        has finished.
441
442        """
443        sys.path[:], sys.meta_path[:] = self._savesyspath
444        if hasattr(self, '_olddir'):
445            self._olddir.chdir()
446        self.delete_loaded_modules()
447
448    def delete_loaded_modules(self):
449        """Delete modules that have been loaded during a test.
450
451        This allows the interpreter to catch module changes in case
452        the module is re-imported.
453        """
454        for name in set(sys.modules).difference(self._savemodulekeys):
455            # some zope modules used by twisted-related tests keeps internal
456            # state and can't be deleted; we had some trouble in the past
457            # with zope.interface for example
458            if not name.startswith("zope"):
459                del sys.modules[name]
460
461    def make_hook_recorder(self, pluginmanager):
462        """Create a new :py:class:`HookRecorder` for a PluginManager."""
463        assert not hasattr(pluginmanager, "reprec")
464        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
465        self.request.addfinalizer(reprec.finish_recording)
466        return reprec
467
468    def chdir(self):
469        """Cd into the temporary directory.
470
471        This is done automatically upon instantiation.
472
473        """
474        old = self.tmpdir.chdir()
475        if not hasattr(self, '_olddir'):
476            self._olddir = old
477
478    def _makefile(self, ext, args, kwargs, encoding="utf-8"):
479        items = list(kwargs.items())
480        if args:
481            source = py.builtin._totext("\n").join(
482                map(py.builtin._totext, args)) + py.builtin._totext("\n")
483            basename = self.request.function.__name__
484            items.insert(0, (basename, source))
485        ret = None
486        for name, value in items:
487            p = self.tmpdir.join(name).new(ext=ext)
488            p.dirpath().ensure_dir()
489            source = Source(value)
490
491            def my_totext(s, encoding="utf-8"):
492                if py.builtin._isbytes(s):
493                    s = py.builtin._totext(s, encoding=encoding)
494                return s
495
496            source_unicode = "\n".join([my_totext(line) for line in source.lines])
497            source = py.builtin._totext(source_unicode)
498            content = source.strip().encode(encoding) # + "\n"
499            #content = content.rstrip() + "\n"
500            p.write(content, "wb")
501            if ret is None:
502                ret = p
503        return ret
504
505    def makefile(self, ext, *args, **kwargs):
506        """Create a new file in the testdir.
507
508        ext: The extension the file should use, including the dot.
509           E.g. ".py".
510
511        args: All args will be treated as strings and joined using
512           newlines.  The result will be written as contents to the
513           file.  The name of the file will be based on the test
514           function requesting this fixture.
515           E.g. "testdir.makefile('.txt', 'line1', 'line2')"
516
517        kwargs: Each keyword is the name of a file, while the value of
518           it will be written as contents of the file.
519           E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
520
521        """
522        return self._makefile(ext, args, kwargs)
523
524    def makeconftest(self, source):
525        """Write a contest.py file with 'source' as contents."""
526        return self.makepyfile(conftest=source)
527
528    def makeini(self, source):
529        """Write a tox.ini file with 'source' as contents."""
530        return self.makefile('.ini', tox=source)
531
532    def getinicfg(self, source):
533        """Return the pytest section from the tox.ini config file."""
534        p = self.makeini(source)
535        return py.iniconfig.IniConfig(p)['pytest']
536
537    def makepyfile(self, *args, **kwargs):
538        """Shortcut for .makefile() with a .py extension."""
539        return self._makefile('.py', args, kwargs)
540
541    def maketxtfile(self, *args, **kwargs):
542        """Shortcut for .makefile() with a .txt extension."""
543        return self._makefile('.txt', args, kwargs)
544
545    def syspathinsert(self, path=None):
546        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
547
548        This is undone automatically after the test.
549        """
550        if path is None:
551            path = self.tmpdir
552        sys.path.insert(0, str(path))
553        # a call to syspathinsert() usually means that the caller
554        # wants to import some dynamically created files.
555        # with python3 we thus invalidate import caches.
556        self._possibly_invalidate_import_caches()
557
558    def _possibly_invalidate_import_caches(self):
559        # invalidate caches if we can (py33 and above)
560        try:
561            import importlib
562        except ImportError:
563            pass
564        else:
565            if hasattr(importlib, "invalidate_caches"):
566                importlib.invalidate_caches()
567
568    def mkdir(self, name):
569        """Create a new (sub)directory."""
570        return self.tmpdir.mkdir(name)
571
572    def mkpydir(self, name):
573        """Create a new python package.
574
575        This creates a (sub)directory with an empty ``__init__.py``
576        file so that is recognised as a python package.
577
578        """
579        p = self.mkdir(name)
580        p.ensure("__init__.py")
581        return p
582
583    Session = Session
584    def getnode(self, config, arg):
585        """Return the collection node of a file.
586
587        :param config: :py:class:`_pytest.config.Config` instance, see
588           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to
589           create the configuration.
590
591        :param arg: A :py:class:`py.path.local` instance of the file.
592
593        """
594        session = Session(config)
595        assert '::' not in str(arg)
596        p = py.path.local(arg)
597        config.hook.pytest_sessionstart(session=session)
598        res = session.perform_collect([str(p)], genitems=False)[0]
599        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
600        return res
601
602    def getpathnode(self, path):
603        """Return the collection node of a file.
604
605        This is like :py:meth:`getnode` but uses
606        :py:meth:`parseconfigure` to create the (configured) pytest
607        Config instance.
608
609        :param path: A :py:class:`py.path.local` instance of the file.
610
611        """
612        config = self.parseconfigure(path)
613        session = Session(config)
614        x = session.fspath.bestrelpath(path)
615        config.hook.pytest_sessionstart(session=session)
616        res = session.perform_collect([x], genitems=False)[0]
617        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
618        return res
619
620    def genitems(self, colitems):
621        """Generate all test items from a collection node.
622
623        This recurses into the collection node and returns a list of
624        all the test items contained within.
625
626        """
627        session = colitems[0].session
628        result = []
629        for colitem in colitems:
630            result.extend(session.genitems(colitem))
631        return result
632
633    def runitem(self, source):
634        """Run the "test_func" Item.
635
636        The calling test instance (the class which contains the test
637        method) must provide a ``.getrunner()`` method which should
638        return a runner which can run the test protocol for a single
639        item, like e.g. :py:func:`_pytest.runner.runtestprotocol`.
640
641        """
642        # used from runner functional tests
643        item = self.getitem(source)
644        # the test class where we are called from wants to provide the runner
645        testclassinstance = self.request.instance
646        runner = testclassinstance.getrunner()
647        return runner(item)
648
649    def inline_runsource(self, source, *cmdlineargs):
650        """Run a test module in process using ``pytest.main()``.
651
652        This run writes "source" into a temporary file and runs
653        ``pytest.main()`` on it, returning a :py:class:`HookRecorder`
654        instance for the result.
655
656        :param source: The source code of the test module.
657
658        :param cmdlineargs: Any extra command line arguments to use.
659
660        :return: :py:class:`HookRecorder` instance of the result.
661
662        """
663        p = self.makepyfile(source)
664        l = list(cmdlineargs) + [p]
665        return self.inline_run(*l)
666
667    def inline_genitems(self, *args):
668        """Run ``pytest.main(['--collectonly'])`` in-process.
669
670        Returns a tuple of the collected items and a
671        :py:class:`HookRecorder` instance.
672
673        This runs the :py:func:`pytest.main` function to run all of
674        pytest inside the test process itself like
675        :py:meth:`inline_run`.  However the return value is a tuple of
676        the collection items and a :py:class:`HookRecorder` instance.
677
678        """
679        rec = self.inline_run("--collect-only", *args)
680        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
681        return items, rec
682
683    def inline_run(self, *args, **kwargs):
684        """Run ``pytest.main()`` in-process, returning a HookRecorder.
685
686        This runs the :py:func:`pytest.main` function to run all of
687        pytest inside the test process itself.  This means it can
688        return a :py:class:`HookRecorder` instance which gives more
689        detailed results from then run then can be done by matching
690        stdout/stderr from :py:meth:`runpytest`.
691
692        :param args: Any command line arguments to pass to
693           :py:func:`pytest.main`.
694
695        :param plugin: (keyword-only) Extra plugin instances the
696           ``pytest.main()`` instance should use.
697
698        :return: A :py:class:`HookRecorder` instance.
699        """
700        # When running py.test inline any plugins active in the main
701        # test process are already imported.  So this disables the
702        # warning which will trigger to say they can no longer be
703        # re-written, which is fine as they are already re-written.
704        orig_warn = AssertionRewritingHook._warn_already_imported
705
706        def revert():
707            AssertionRewritingHook._warn_already_imported = orig_warn
708
709        self.request.addfinalizer(revert)
710        AssertionRewritingHook._warn_already_imported = lambda *a: None
711
712        rec = []
713
714        class Collect:
715            def pytest_configure(x, config):
716                rec.append(self.make_hook_recorder(config.pluginmanager))
717
718        plugins = kwargs.get("plugins") or []
719        plugins.append(Collect())
720        ret = pytest.main(list(args), plugins=plugins)
721        self.delete_loaded_modules()
722        if len(rec) == 1:
723            reprec = rec.pop()
724        else:
725            class reprec:
726                pass
727        reprec.ret = ret
728
729        # typically we reraise keyboard interrupts from the child run
730        # because it's our user requesting interruption of the testing
731        if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
732            calls = reprec.getcalls("pytest_keyboard_interrupt")
733            if calls and calls[-1].excinfo.type == KeyboardInterrupt:
734                raise KeyboardInterrupt()
735        return reprec
736
737    def runpytest_inprocess(self, *args, **kwargs):
738        """ Return result of running pytest in-process, providing a similar
739        interface to what self.runpytest() provides. """
740        if kwargs.get("syspathinsert"):
741            self.syspathinsert()
742        now = time.time()
743        capture = MultiCapture(Capture=SysCapture)
744        capture.start_capturing()
745        try:
746            try:
747                reprec = self.inline_run(*args, **kwargs)
748            except SystemExit as e:
749
750                class reprec:
751                    ret = e.args[0]
752
753            except Exception:
754                traceback.print_exc()
755
756                class reprec:
757                    ret = 3
758        finally:
759            out, err = capture.readouterr()
760            capture.stop_capturing()
761            sys.stdout.write(out)
762            sys.stderr.write(err)
763
764        res = RunResult(reprec.ret,
765                        out.split("\n"), err.split("\n"),
766                        time.time()-now)
767        res.reprec = reprec
768        return res
769
770    def runpytest(self, *args, **kwargs):
771        """ Run pytest inline or in a subprocess, depending on the command line
772        option "--runpytest" and return a :py:class:`RunResult`.
773
774        """
775        args = self._ensure_basetemp(args)
776        return self._runpytest_method(*args, **kwargs)
777
778    def _ensure_basetemp(self, args):
779        args = [str(x) for x in args]
780        for x in args:
781            if str(x).startswith('--basetemp'):
782                #print ("basedtemp exists: %s" %(args,))
783                break
784        else:
785            args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
786            #print ("added basetemp: %s" %(args,))
787        return args
788
789    def parseconfig(self, *args):
790        """Return a new pytest Config instance from given commandline args.
791
792        This invokes the pytest bootstrapping code in _pytest.config
793        to create a new :py:class:`_pytest.core.PluginManager` and
794        call the pytest_cmdline_parse hook to create new
795        :py:class:`_pytest.config.Config` instance.
796
797        If :py:attr:`plugins` has been populated they should be plugin
798        modules which will be registered with the PluginManager.
799
800        """
801        args = self._ensure_basetemp(args)
802
803        import _pytest.config
804        config = _pytest.config._prepareconfig(args, self.plugins)
805        # we don't know what the test will do with this half-setup config
806        # object and thus we make sure it gets unconfigured properly in any
807        # case (otherwise capturing could still be active, for example)
808        self.request.addfinalizer(config._ensure_unconfigure)
809        return config
810
811    def parseconfigure(self, *args):
812        """Return a new pytest configured Config instance.
813
814        This returns a new :py:class:`_pytest.config.Config` instance
815        like :py:meth:`parseconfig`, but also calls the
816        pytest_configure hook.
817
818        """
819        config = self.parseconfig(*args)
820        config._do_configure()
821        self.request.addfinalizer(config._ensure_unconfigure)
822        return config
823
824    def getitem(self,  source, funcname="test_func"):
825        """Return the test item for a test function.
826
827        This writes the source to a python file and runs pytest's
828        collection on the resulting module, returning the test item
829        for the requested function name.
830
831        :param source: The module source.
832
833        :param funcname: The name of the test function for which the
834           Item must be returned.
835
836        """
837        items = self.getitems(source)
838        for item in items:
839            if item.name == funcname:
840                return item
841        assert 0, "%r item not found in module:\n%s\nitems: %s" %(
842                  funcname, source, items)
843
844    def getitems(self,  source):
845        """Return all test items collected from the module.
846
847        This writes the source to a python file and runs pytest's
848        collection on the resulting module, returning all test items
849        contained within.
850
851        """
852        modcol = self.getmodulecol(source)
853        return self.genitems([modcol])
854
855    def getmodulecol(self,  source, configargs=(), withinit=False):
856        """Return the module collection node for ``source``.
857
858        This writes ``source`` to a file using :py:meth:`makepyfile`
859        and then runs the pytest collection on it, returning the
860        collection node for the test module.
861
862        :param source: The source code of the module to collect.
863
864        :param configargs: Any extra arguments to pass to
865           :py:meth:`parseconfigure`.
866
867        :param withinit: Whether to also write a ``__init__.py`` file
868           to the temporary directory to ensure it is a package.
869
870        """
871        kw = {self.request.function.__name__: Source(source).strip()}
872        path = self.makepyfile(**kw)
873        if withinit:
874            self.makepyfile(__init__ = "#")
875        self.config = config = self.parseconfigure(path, *configargs)
876        node = self.getnode(config, path)
877
878        return node
879
880    def collect_by_name(self, modcol, name):
881        """Return the collection node for name from the module collection.
882
883        This will search a module collection node for a collection
884        node matching the given name.
885
886        :param modcol: A module collection node, see
887           :py:meth:`getmodulecol`.
888
889        :param name: The name of the node to return.
890
891        """
892        if modcol not in self._mod_collections:
893            self._mod_collections[modcol] = list(modcol.collect())
894        for colitem in self._mod_collections[modcol]:
895            if colitem.name == name:
896                return colitem
897
898    def popen(self, cmdargs, stdout, stderr, **kw):
899        """Invoke subprocess.Popen.
900
901        This calls subprocess.Popen making sure the current working
902        directory is the PYTHONPATH.
903
904        You probably want to use :py:meth:`run` instead.
905
906        """
907        env = os.environ.copy()
908        env['PYTHONPATH'] = os.pathsep.join(filter(None, [
909            str(os.getcwd()), env.get('PYTHONPATH', '')]))
910        kw['env'] = env
911        return subprocess.Popen(cmdargs,
912                                stdout=stdout, stderr=stderr, **kw)
913
914    def run(self, *cmdargs):
915        """Run a command with arguments.
916
917        Run a process using subprocess.Popen saving the stdout and
918        stderr.
919
920        Returns a :py:class:`RunResult`.
921
922        """
923        return self._run(*cmdargs)
924
925    def _run(self, *cmdargs):
926        cmdargs = [str(x) for x in cmdargs]
927        p1 = self.tmpdir.join("stdout")
928        p2 = self.tmpdir.join("stderr")
929        print("running:", ' '.join(cmdargs))
930        print("     in:", str(py.path.local()))
931        f1 = codecs.open(str(p1), "w", encoding="utf8")
932        f2 = codecs.open(str(p2), "w", encoding="utf8")
933        try:
934            now = time.time()
935            popen = self.popen(cmdargs, stdout=f1, stderr=f2,
936                close_fds=(sys.platform != "win32"))
937            ret = popen.wait()
938        finally:
939            f1.close()
940            f2.close()
941        f1 = codecs.open(str(p1), "r", encoding="utf8")
942        f2 = codecs.open(str(p2), "r", encoding="utf8")
943        try:
944            out = f1.read().splitlines()
945            err = f2.read().splitlines()
946        finally:
947            f1.close()
948            f2.close()
949        self._dump_lines(out, sys.stdout)
950        self._dump_lines(err, sys.stderr)
951        return RunResult(ret, out, err, time.time()-now)
952
953    def _dump_lines(self, lines, fp):
954        try:
955            for line in lines:
956                print(line, file=fp)
957        except UnicodeEncodeError:
958            print("couldn't print to %s because of encoding" % (fp,))
959
960    def _getpytestargs(self):
961        # we cannot use "(sys.executable,script)"
962        # because on windows the script is e.g. a pytest.exe
963        return (sys.executable, _pytest_fullpath,) # noqa
964
965    def runpython(self, script):
966        """Run a python script using sys.executable as interpreter.
967
968        Returns a :py:class:`RunResult`.
969        """
970        return self.run(sys.executable, script)
971
972    def runpython_c(self, command):
973        """Run python -c "command", return a :py:class:`RunResult`."""
974        return self.run(sys.executable, "-c", command)
975
976    def runpytest_subprocess(self, *args, **kwargs):
977        """Run pytest as a subprocess with given arguments.
978
979        Any plugins added to the :py:attr:`plugins` list will added
980        using the ``-p`` command line option.  Addtionally
981        ``--basetemp`` is used put any temporary files and directories
982        in a numbered directory prefixed with "runpytest-" so they do
983        not conflict with the normal numberd pytest location for
984        temporary files and directories.
985
986        Returns a :py:class:`RunResult`.
987
988        """
989        p = py.path.local.make_numbered_dir(prefix="runpytest-",
990            keep=None, rootdir=self.tmpdir)
991        args = ('--basetemp=%s' % p, ) + args
992        #for x in args:
993        #    if '--confcutdir' in str(x):
994        #        break
995        #else:
996        #    pass
997        #    args = ('--confcutdir=.',) + args
998        plugins = [x for x in self.plugins if isinstance(x, str)]
999        if plugins:
1000            args = ('-p', plugins[0]) + args
1001        args = self._getpytestargs() + args
1002        return self.run(*args)
1003
1004    def spawn_pytest(self, string, expect_timeout=10.0):
1005        """Run pytest using pexpect.
1006
1007        This makes sure to use the right pytest and sets up the
1008        temporary directory locations.
1009
1010        The pexpect child is returned.
1011
1012        """
1013        basetemp = self.tmpdir.mkdir("temp-pexpect")
1014        invoke = " ".join(map(str, self._getpytestargs()))
1015        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
1016        return self.spawn(cmd, expect_timeout=expect_timeout)
1017
1018    def spawn(self, cmd, expect_timeout=10.0):
1019        """Run a command using pexpect.
1020
1021        The pexpect child is returned.
1022        """
1023        pexpect = pytest.importorskip("pexpect", "3.0")
1024        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
1025            pytest.skip("pypy-64 bit not supported")
1026        if sys.platform.startswith("freebsd"):
1027            pytest.xfail("pexpect does not work reliably on freebsd")
1028        logfile = self.tmpdir.join("spawn.out").open("wb")
1029        child = pexpect.spawn(cmd, logfile=logfile)
1030        self.request.addfinalizer(logfile.close)
1031        child.timeout = expect_timeout
1032        return child
1033
1034def getdecoded(out):
1035        try:
1036            return out.decode("utf-8")
1037        except UnicodeDecodeError:
1038            return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
1039                    py.io.saferepr(out),)
1040
1041
1042class LineComp:
1043    def __init__(self):
1044        self.stringio = py.io.TextIO()
1045
1046    def assert_contains_lines(self, lines2):
1047        """ assert that lines2 are contained (linearly) in lines1.
1048            return a list of extralines found.
1049        """
1050        __tracebackhide__ = True
1051        val = self.stringio.getvalue()
1052        self.stringio.truncate(0)
1053        self.stringio.seek(0)
1054        lines1 = val.split("\n")
1055        return LineMatcher(lines1).fnmatch_lines(lines2)
1056
1057
1058class LineMatcher:
1059    """Flexible matching of text.
1060
1061    This is a convenience class to test large texts like the output of
1062    commands.
1063
1064    The constructor takes a list of lines without their trailing
1065    newlines, i.e. ``text.splitlines()``.
1066
1067    """
1068
1069    def __init__(self,  lines):
1070        self.lines = lines
1071        self._log_output = []
1072
1073    def str(self):
1074        """Return the entire original text."""
1075        return "\n".join(self.lines)
1076
1077    def _getlines(self, lines2):
1078        if isinstance(lines2, str):
1079            lines2 = Source(lines2)
1080        if isinstance(lines2, Source):
1081            lines2 = lines2.strip().lines
1082        return lines2
1083
1084    def fnmatch_lines_random(self, lines2):
1085        """Check lines exist in the output.
1086
1087        The argument is a list of lines which have to occur in the
1088        output, in any order.  Each line can contain glob whildcards.
1089
1090        """
1091        lines2 = self._getlines(lines2)
1092        for line in lines2:
1093            for x in self.lines:
1094                if line == x or fnmatch(x, line):
1095                    self._log("matched: ", repr(line))
1096                    break
1097            else:
1098                self._log("line %r not found in output" % line)
1099                raise ValueError(self._log_text)
1100
1101    def get_lines_after(self, fnline):
1102        """Return all lines following the given line in the text.
1103
1104        The given line can contain glob wildcards.
1105        """
1106        for i, line in enumerate(self.lines):
1107            if fnline == line or fnmatch(line, fnline):
1108                return self.lines[i+1:]
1109        raise ValueError("line %r not found in output" % fnline)
1110
1111    def _log(self, *args):
1112        self._log_output.append(' '.join((str(x) for x in args)))
1113
1114    @property
1115    def _log_text(self):
1116        return '\n'.join(self._log_output)
1117
1118    def fnmatch_lines(self, lines2):
1119        """Search the text for matching lines.
1120
1121        The argument is a list of lines which have to match and can
1122        use glob wildcards.  If they do not match an pytest.fail() is
1123        called.  The matches and non-matches are also printed on
1124        stdout.
1125
1126        """
1127        lines2 = self._getlines(lines2)
1128        lines1 = self.lines[:]
1129        nextline = None
1130        extralines = []
1131        __tracebackhide__ = True
1132        for line in lines2:
1133            nomatchprinted = False
1134            while lines1:
1135                nextline = lines1.pop(0)
1136                if line == nextline:
1137                    self._log("exact match:", repr(line))
1138                    break
1139                elif fnmatch(nextline, line):
1140                    self._log("fnmatch:", repr(line))
1141                    self._log("   with:", repr(nextline))
1142                    break
1143                else:
1144                    if not nomatchprinted:
1145                        self._log("nomatch:", repr(line))
1146                        nomatchprinted = True
1147                    self._log("    and:", repr(nextline))
1148                extralines.append(nextline)
1149            else:
1150                self._log("remains unmatched: %r" % (line,))
1151                pytest.fail(self._log_text)
1152