1""" (disabled by default) support for testing pytest and pytest plugins. """
2import codecs
3import gc
4import os
5import platform
6import re
7import subprocess
8import sys
9import time
10import traceback
11from fnmatch import fnmatch
12
13from py.builtin import print_
14
15from _pytest._code import Source
16import py
17import pytest
18from _pytest.main import Session, EXIT_OK
19
20
21def pytest_addoption(parser):
22    # group = parser.getgroup("pytester", "pytester (self-tests) options")
23    parser.addoption('--lsof',
24           action="store_true", dest="lsof", default=False,
25           help=("run FD checks if lsof is available"))
26
27    parser.addoption('--runpytest', default="inprocess", dest="runpytest",
28           choices=("inprocess", "subprocess", ),
29           help=("run pytest sub runs in tests using an 'inprocess' "
30                 "or 'subprocess' (python -m main) method"))
31
32
33def pytest_configure(config):
34    # This might be called multiple times. Only take the first.
35    global _pytest_fullpath
36    try:
37        _pytest_fullpath
38    except NameError:
39        _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc"))
40        _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py")
41
42    if config.getvalue("lsof"):
43        checker = LsofFdLeakChecker()
44        if checker.matching_platform():
45            config.pluginmanager.register(checker)
46
47
48class LsofFdLeakChecker(object):
49    def get_open_files(self):
50        out = self._exec_lsof()
51        open_files = self._parse_lsof_output(out)
52        return open_files
53
54    def _exec_lsof(self):
55        pid = os.getpid()
56        return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
57
58    def _parse_lsof_output(self, out):
59        def isopen(line):
60            return line.startswith('f') and ("deleted" not in line and
61                'mem' not in line and "txt" not in line and 'cwd' not in line)
62
63        open_files = []
64
65        for line in out.split("\n"):
66            if isopen(line):
67                fields = line.split('\0')
68                fd = fields[0][1:]
69                filename = fields[1][1:]
70                if filename.startswith('/'):
71                    open_files.append((fd, filename))
72
73        return open_files
74
75    def matching_platform(self):
76        try:
77            py.process.cmdexec("lsof -v")
78        except (py.process.cmdexec.Error, UnicodeDecodeError):
79            # cmdexec may raise UnicodeDecodeError on Windows systems
80            # with locale other than english:
81            # https://bitbucket.org/pytest-dev/py/issues/66
82            return False
83        else:
84            return True
85
86    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
87    def pytest_runtest_item(self, item):
88        lines1 = self.get_open_files()
89        yield
90        if hasattr(sys, "pypy_version_info"):
91            gc.collect()
92        lines2 = self.get_open_files()
93
94        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
95        leaked_files = [t for t in lines2 if t[0] in new_fds]
96        if leaked_files:
97            error = []
98            error.append("***** %s FD leakage detected" % len(leaked_files))
99            error.extend([str(f) for f in leaked_files])
100            error.append("*** Before:")
101            error.extend([str(f) for f in lines1])
102            error.append("*** After:")
103            error.extend([str(f) for f in lines2])
104            error.append(error[0])
105            error.append("*** function %s:%s: %s " % item.location)
106            pytest.fail("\n".join(error), pytrace=False)
107
108
109# XXX copied from execnet's conftest.py - needs to be merged
110winpymap = {
111    'python2.7': r'C:\Python27\python.exe',
112    'python2.6': r'C:\Python26\python.exe',
113    'python3.1': r'C:\Python31\python.exe',
114    'python3.2': r'C:\Python32\python.exe',
115    'python3.3': r'C:\Python33\python.exe',
116    'python3.4': r'C:\Python34\python.exe',
117    'python3.5': r'C:\Python35\python.exe',
118}
119
120def getexecutable(name, cache={}):
121    try:
122        return cache[name]
123    except KeyError:
124        executable = py.path.local.sysfind(name)
125        if executable:
126            if name == "jython":
127                import subprocess
128                popen = subprocess.Popen([str(executable), "--version"],
129                    universal_newlines=True, stderr=subprocess.PIPE)
130                out, err = popen.communicate()
131                if not err or "2.5" not in err:
132                    executable = None
133                if "2.5.2" in err:
134                    executable = None # http://bugs.jython.org/issue1790
135        cache[name] = executable
136        return executable
137
138@pytest.fixture(params=['python2.6', 'python2.7', 'python3.3', "python3.4",
139                        'pypy', 'pypy3'])
140def anypython(request):
141    name = request.param
142    executable = getexecutable(name)
143    if executable is None:
144        if sys.platform == "win32":
145            executable = winpymap.get(name, None)
146            if executable:
147                executable = py.path.local(executable)
148                if executable.check():
149                    return executable
150        pytest.skip("no suitable %s found" % (name,))
151    return executable
152
153# used at least by pytest-xdist plugin
154@pytest.fixture
155def _pytest(request):
156    """ Return a helper which offers a gethookrecorder(hook)
157    method which returns a HookRecorder instance which helps
158    to make assertions about called hooks.
159    """
160    return PytestArg(request)
161
162class PytestArg:
163    def __init__(self, request):
164        self.request = request
165
166    def gethookrecorder(self, hook):
167        hookrecorder = HookRecorder(hook._pm)
168        self.request.addfinalizer(hookrecorder.finish_recording)
169        return hookrecorder
170
171
172def get_public_names(l):
173    """Only return names from iterator l without a leading underscore."""
174    return [x for x in l if x[0] != "_"]
175
176
177class ParsedCall:
178    def __init__(self, name, kwargs):
179        self.__dict__.update(kwargs)
180        self._name = name
181
182    def __repr__(self):
183        d = self.__dict__.copy()
184        del d['_name']
185        return "<ParsedCall %r(**%r)>" %(self._name, d)
186
187
188class HookRecorder:
189    """Record all hooks called in a plugin manager.
190
191    This wraps all the hook calls in the plugin manager, recording
192    each call before propagating the normal calls.
193
194    """
195
196    def __init__(self, pluginmanager):
197        self._pluginmanager = pluginmanager
198        self.calls = []
199
200        def before(hook_name, hook_impls, kwargs):
201            self.calls.append(ParsedCall(hook_name, kwargs))
202
203        def after(outcome, hook_name, hook_impls, kwargs):
204            pass
205
206        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
207
208    def finish_recording(self):
209        self._undo_wrapping()
210
211    def getcalls(self, names):
212        if isinstance(names, str):
213            names = names.split()
214        return [call for call in self.calls if call._name in names]
215
216    def assert_contains(self, entries):
217        __tracebackhide__ = True
218        i = 0
219        entries = list(entries)
220        backlocals = sys._getframe(1).f_locals
221        while entries:
222            name, check = entries.pop(0)
223            for ind, call in enumerate(self.calls[i:]):
224                if call._name == name:
225                    print_("NAMEMATCH", name, call)
226                    if eval(check, backlocals, call.__dict__):
227                        print_("CHECKERMATCH", repr(check), "->", call)
228                    else:
229                        print_("NOCHECKERMATCH", repr(check), "-", call)
230                        continue
231                    i += ind + 1
232                    break
233                print_("NONAMEMATCH", name, "with", call)
234            else:
235                pytest.fail("could not find %r check %r" % (name, check))
236
237    def popcall(self, name):
238        __tracebackhide__ = True
239        for i, call in enumerate(self.calls):
240            if call._name == name:
241                del self.calls[i]
242                return call
243        lines = ["could not find call %r, in:" % (name,)]
244        lines.extend(["  %s" % str(x) for x in self.calls])
245        pytest.fail("\n".join(lines))
246
247    def getcall(self, name):
248        l = self.getcalls(name)
249        assert len(l) == 1, (name, l)
250        return l[0]
251
252    # functionality for test reports
253
254    def getreports(self,
255                   names="pytest_runtest_logreport pytest_collectreport"):
256        return [x.report for x in self.getcalls(names)]
257
258    def matchreport(self, inamepart="",
259        names="pytest_runtest_logreport pytest_collectreport", when=None):
260        """ return a testreport whose dotted import path matches """
261        l = []
262        for rep in self.getreports(names=names):
263            try:
264                if not when and rep.when != "call" and rep.passed:
265                    # setup/teardown passing reports - let's ignore those
266                    continue
267            except AttributeError:
268                pass
269            if when and getattr(rep, 'when', None) != when:
270                continue
271            if not inamepart or inamepart in rep.nodeid.split("::"):
272                l.append(rep)
273        if not l:
274            raise ValueError("could not find test report matching %r: "
275                             "no test reports at all!" % (inamepart,))
276        if len(l) > 1:
277            raise ValueError(
278                "found 2 or more testreports matching %r: %s" %(inamepart, l))
279        return l[0]
280
281    def getfailures(self,
282                    names='pytest_runtest_logreport pytest_collectreport'):
283        return [rep for rep in self.getreports(names) if rep.failed]
284
285    def getfailedcollections(self):
286        return self.getfailures('pytest_collectreport')
287
288    def listoutcomes(self):
289        passed = []
290        skipped = []
291        failed = []
292        for rep in self.getreports(
293            "pytest_collectreport pytest_runtest_logreport"):
294            if rep.passed:
295                if getattr(rep, "when", None) == "call":
296                    passed.append(rep)
297            elif rep.skipped:
298                skipped.append(rep)
299            elif rep.failed:
300                failed.append(rep)
301        return passed, skipped, failed
302
303    def countoutcomes(self):
304        return [len(x) for x in self.listoutcomes()]
305
306    def assertoutcome(self, passed=0, skipped=0, failed=0):
307        realpassed, realskipped, realfailed = self.listoutcomes()
308        assert passed == len(realpassed)
309        assert skipped == len(realskipped)
310        assert failed == len(realfailed)
311
312    def clear(self):
313        self.calls[:] = []
314
315
316@pytest.fixture
317def linecomp(request):
318    return LineComp()
319
320
321def pytest_funcarg__LineMatcher(request):
322    return LineMatcher
323
324
325@pytest.fixture
326def testdir(request, tmpdir_factory):
327    return Testdir(request, tmpdir_factory)
328
329
330rex_outcome = re.compile("(\d+) ([\w-]+)")
331class RunResult:
332    """The result of running a command.
333
334    Attributes:
335
336    :ret: The return value.
337    :outlines: List of lines captured from stdout.
338    :errlines: List of lines captures from stderr.
339    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
340       reconstruct stdout or the commonly used
341       ``stdout.fnmatch_lines()`` method.
342    :stderrr: :py:class:`LineMatcher` of stderr.
343    :duration: Duration in seconds.
344
345    """
346    def __init__(self, ret, outlines, errlines, duration):
347        self.ret = ret
348        self.outlines = outlines
349        self.errlines = errlines
350        self.stdout = LineMatcher(outlines)
351        self.stderr = LineMatcher(errlines)
352        self.duration = duration
353
354    def parseoutcomes(self):
355        """ Return a dictionary of outcomestring->num from parsing
356        the terminal output that the test process produced."""
357        for line in reversed(self.outlines):
358            if 'seconds' in line:
359                outcomes = rex_outcome.findall(line)
360                if outcomes:
361                    d = {}
362                    for num, cat in outcomes:
363                        d[cat] = int(num)
364                    return d
365
366    def assert_outcomes(self, passed=0, skipped=0, failed=0):
367        """ assert that the specified outcomes appear with the respective
368        numbers (0 means it didn't occur) in the text output from a test run."""
369        d = self.parseoutcomes()
370        assert passed == d.get("passed", 0)
371        assert skipped == d.get("skipped", 0)
372        assert failed == d.get("failed", 0)
373
374
375
376class Testdir:
377    """Temporary test directory with tools to test/run py.test itself.
378
379    This is based on the ``tmpdir`` fixture but provides a number of
380    methods which aid with testing py.test itself.  Unless
381    :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as
382    current working directory.
383
384    Attributes:
385
386    :tmpdir: The :py:class:`py.path.local` instance of the temporary
387       directory.
388
389    :plugins: A list of plugins to use with :py:meth:`parseconfig` and
390       :py:meth:`runpytest`.  Initially this is an empty list but
391       plugins can be added to the list.  The type of items to add to
392       the list depend on the method which uses them so refer to them
393       for details.
394
395    """
396
397    def __init__(self, request, tmpdir_factory):
398        self.request = request
399        # XXX remove duplication with tmpdir plugin
400        basetmp = tmpdir_factory.ensuretemp("testdir")
401        name = request.function.__name__
402        for i in range(100):
403            try:
404                tmpdir = basetmp.mkdir(name + str(i))
405            except py.error.EEXIST:
406                continue
407            break
408        self.tmpdir = tmpdir
409        self.plugins = []
410        self._savesyspath = (list(sys.path), list(sys.meta_path))
411        self._savemodulekeys = set(sys.modules)
412        self.chdir() # always chdir
413        self.request.addfinalizer(self.finalize)
414        method = self.request.config.getoption("--runpytest")
415        if method == "inprocess":
416            self._runpytest_method = self.runpytest_inprocess
417        elif method == "subprocess":
418            self._runpytest_method = self.runpytest_subprocess
419
420    def __repr__(self):
421        return "<Testdir %r>" % (self.tmpdir,)
422
423    def finalize(self):
424        """Clean up global state artifacts.
425
426        Some methods modify the global interpreter state and this
427        tries to clean this up.  It does not remove the temporary
428        directory however so it can be looked at after the test run
429        has finished.
430
431        """
432        sys.path[:], sys.meta_path[:] = self._savesyspath
433        if hasattr(self, '_olddir'):
434            self._olddir.chdir()
435        self.delete_loaded_modules()
436
437    def delete_loaded_modules(self):
438        """Delete modules that have been loaded during a test.
439
440        This allows the interpreter to catch module changes in case
441        the module is re-imported.
442        """
443        for name in set(sys.modules).difference(self._savemodulekeys):
444            # it seems zope.interfaces is keeping some state
445            # (used by twisted related tests)
446            if name != "zope.interface":
447                del sys.modules[name]
448
449    def make_hook_recorder(self, pluginmanager):
450        """Create a new :py:class:`HookRecorder` for a PluginManager."""
451        assert not hasattr(pluginmanager, "reprec")
452        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
453        self.request.addfinalizer(reprec.finish_recording)
454        return reprec
455
456    def chdir(self):
457        """Cd into the temporary directory.
458
459        This is done automatically upon instantiation.
460
461        """
462        old = self.tmpdir.chdir()
463        if not hasattr(self, '_olddir'):
464            self._olddir = old
465
466    def _makefile(self, ext, args, kwargs):
467        items = list(kwargs.items())
468        if args:
469            source = py.builtin._totext("\n").join(
470                map(py.builtin._totext, args)) + py.builtin._totext("\n")
471            basename = self.request.function.__name__
472            items.insert(0, (basename, source))
473        ret = None
474        for name, value in items:
475            p = self.tmpdir.join(name).new(ext=ext)
476            source = Source(value)
477            def my_totext(s, encoding="utf-8"):
478                if py.builtin._isbytes(s):
479                    s = py.builtin._totext(s, encoding=encoding)
480                return s
481            source_unicode = "\n".join([my_totext(line) for line in source.lines])
482            source = py.builtin._totext(source_unicode)
483            content = source.strip().encode("utf-8") # + "\n"
484            #content = content.rstrip() + "\n"
485            p.write(content, "wb")
486            if ret is None:
487                ret = p
488        return ret
489
490    def makefile(self, ext, *args, **kwargs):
491        """Create a new file in the testdir.
492
493        ext: The extension the file should use, including the dot.
494           E.g. ".py".
495
496        args: All args will be treated as strings and joined using
497           newlines.  The result will be written as contents to the
498           file.  The name of the file will be based on the test
499           function requesting this fixture.
500           E.g. "testdir.makefile('.txt', 'line1', 'line2')"
501
502        kwargs: Each keyword is the name of a file, while the value of
503           it will be written as contents of the file.
504           E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
505
506        """
507        return self._makefile(ext, args, kwargs)
508
509    def makeconftest(self, source):
510        """Write a contest.py file with 'source' as contents."""
511        return self.makepyfile(conftest=source)
512
513    def makeini(self, source):
514        """Write a tox.ini file with 'source' as contents."""
515        return self.makefile('.ini', tox=source)
516
517    def getinicfg(self, source):
518        """Return the pytest section from the tox.ini config file."""
519        p = self.makeini(source)
520        return py.iniconfig.IniConfig(p)['pytest']
521
522    def makepyfile(self, *args, **kwargs):
523        """Shortcut for .makefile() with a .py extension."""
524        return self._makefile('.py', args, kwargs)
525
526    def maketxtfile(self, *args, **kwargs):
527        """Shortcut for .makefile() with a .txt extension."""
528        return self._makefile('.txt', args, kwargs)
529
530    def syspathinsert(self, path=None):
531        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
532
533        This is undone automatically after the test.
534        """
535        if path is None:
536            path = self.tmpdir
537        sys.path.insert(0, str(path))
538        # a call to syspathinsert() usually means that the caller
539        # wants to import some dynamically created files.
540        # with python3 we thus invalidate import caches.
541        self._possibly_invalidate_import_caches()
542
543    def _possibly_invalidate_import_caches(self):
544        # invalidate caches if we can (py33 and above)
545        try:
546            import importlib
547        except ImportError:
548            pass
549        else:
550            if hasattr(importlib, "invalidate_caches"):
551                importlib.invalidate_caches()
552
553    def mkdir(self, name):
554        """Create a new (sub)directory."""
555        return self.tmpdir.mkdir(name)
556
557    def mkpydir(self, name):
558        """Create a new python package.
559
560        This creates a (sub)direcotry with an empty ``__init__.py``
561        file so that is recognised as a python package.
562
563        """
564        p = self.mkdir(name)
565        p.ensure("__init__.py")
566        return p
567
568    Session = Session
569    def getnode(self, config, arg):
570        """Return the collection node of a file.
571
572        :param config: :py:class:`_pytest.config.Config` instance, see
573           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to
574           create the configuration.
575
576        :param arg: A :py:class:`py.path.local` instance of the file.
577
578        """
579        session = Session(config)
580        assert '::' not in str(arg)
581        p = py.path.local(arg)
582        config.hook.pytest_sessionstart(session=session)
583        res = session.perform_collect([str(p)], genitems=False)[0]
584        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
585        return res
586
587    def getpathnode(self, path):
588        """Return the collection node of a file.
589
590        This is like :py:meth:`getnode` but uses
591        :py:meth:`parseconfigure` to create the (configured) py.test
592        Config instance.
593
594        :param path: A :py:class:`py.path.local` instance of the file.
595
596        """
597        config = self.parseconfigure(path)
598        session = Session(config)
599        x = session.fspath.bestrelpath(path)
600        config.hook.pytest_sessionstart(session=session)
601        res = session.perform_collect([x], genitems=False)[0]
602        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
603        return res
604
605    def genitems(self, colitems):
606        """Generate all test items from a collection node.
607
608        This recurses into the collection node and returns a list of
609        all the test items contained within.
610
611        """
612        session = colitems[0].session
613        result = []
614        for colitem in colitems:
615            result.extend(session.genitems(colitem))
616        return result
617
618    def runitem(self, source):
619        """Run the "test_func" Item.
620
621        The calling test instance (the class which contains the test
622        method) must provide a ``.getrunner()`` method which should
623        return a runner which can run the test protocol for a single
624        item, like e.g. :py:func:`_pytest.runner.runtestprotocol`.
625
626        """
627        # used from runner functional tests
628        item = self.getitem(source)
629        # the test class where we are called from wants to provide the runner
630        testclassinstance = self.request.instance
631        runner = testclassinstance.getrunner()
632        return runner(item)
633
634    def inline_runsource(self, source, *cmdlineargs):
635        """Run a test module in process using ``pytest.main()``.
636
637        This run writes "source" into a temporary file and runs
638        ``pytest.main()`` on it, returning a :py:class:`HookRecorder`
639        instance for the result.
640
641        :param source: The source code of the test module.
642
643        :param cmdlineargs: Any extra command line arguments to use.
644
645        :return: :py:class:`HookRecorder` instance of the result.
646
647        """
648        p = self.makepyfile(source)
649        l = list(cmdlineargs) + [p]
650        return self.inline_run(*l)
651
652    def inline_genitems(self, *args):
653        """Run ``pytest.main(['--collectonly'])`` in-process.
654
655        Retuns a tuple of the collected items and a
656        :py:class:`HookRecorder` instance.
657
658        This runs the :py:func:`pytest.main` function to run all of
659        py.test inside the test process itself like
660        :py:meth:`inline_run`.  However the return value is a tuple of
661        the collection items and a :py:class:`HookRecorder` instance.
662
663        """
664        rec = self.inline_run("--collect-only", *args)
665        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
666        return items, rec
667
668    def inline_run(self, *args, **kwargs):
669        """Run ``pytest.main()`` in-process, returning a HookRecorder.
670
671        This runs the :py:func:`pytest.main` function to run all of
672        py.test inside the test process itself.  This means it can
673        return a :py:class:`HookRecorder` instance which gives more
674        detailed results from then run then can be done by matching
675        stdout/stderr from :py:meth:`runpytest`.
676
677        :param args: Any command line arguments to pass to
678           :py:func:`pytest.main`.
679
680        :param plugin: (keyword-only) Extra plugin instances the
681           ``pytest.main()`` instance should use.
682
683        :return: A :py:class:`HookRecorder` instance.
684
685        """
686        rec = []
687        class Collect:
688            def pytest_configure(x, config):
689                rec.append(self.make_hook_recorder(config.pluginmanager))
690
691        plugins = kwargs.get("plugins") or []
692        plugins.append(Collect())
693        ret = pytest.main(list(args), plugins=plugins)
694        self.delete_loaded_modules()
695        if len(rec) == 1:
696            reprec = rec.pop()
697        else:
698            class reprec:
699                pass
700        reprec.ret = ret
701
702        # typically we reraise keyboard interrupts from the child run
703        # because it's our user requesting interruption of the testing
704        if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
705            calls = reprec.getcalls("pytest_keyboard_interrupt")
706            if calls and calls[-1].excinfo.type == KeyboardInterrupt:
707                raise KeyboardInterrupt()
708        return reprec
709
710    def runpytest_inprocess(self, *args, **kwargs):
711        """ Return result of running pytest in-process, providing a similar
712        interface to what self.runpytest() provides. """
713        if kwargs.get("syspathinsert"):
714            self.syspathinsert()
715        now = time.time()
716        capture = py.io.StdCapture()
717        try:
718            try:
719                reprec = self.inline_run(*args, **kwargs)
720            except SystemExit as e:
721                class reprec:
722                    ret = e.args[0]
723            except Exception:
724                traceback.print_exc()
725                class reprec:
726                    ret = 3
727        finally:
728            out, err = capture.reset()
729            sys.stdout.write(out)
730            sys.stderr.write(err)
731
732        res = RunResult(reprec.ret,
733                        out.split("\n"), err.split("\n"),
734                        time.time()-now)
735        res.reprec = reprec
736        return res
737
738    def runpytest(self, *args, **kwargs):
739        """ Run pytest inline or in a subprocess, depending on the command line
740        option "--runpytest" and return a :py:class:`RunResult`.
741
742        """
743        args = self._ensure_basetemp(args)
744        return self._runpytest_method(*args, **kwargs)
745
746    def _ensure_basetemp(self, args):
747        args = [str(x) for x in args]
748        for x in args:
749            if str(x).startswith('--basetemp'):
750                #print ("basedtemp exists: %s" %(args,))
751                break
752        else:
753            args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
754            #print ("added basetemp: %s" %(args,))
755        return args
756
757    def parseconfig(self, *args):
758        """Return a new py.test Config instance from given commandline args.
759
760        This invokes the py.test bootstrapping code in _pytest.config
761        to create a new :py:class:`_pytest.core.PluginManager` and
762        call the pytest_cmdline_parse hook to create new
763        :py:class:`_pytest.config.Config` instance.
764
765        If :py:attr:`plugins` has been populated they should be plugin
766        modules which will be registered with the PluginManager.
767
768        """
769        args = self._ensure_basetemp(args)
770
771        import _pytest.config
772        config = _pytest.config._prepareconfig(args, self.plugins)
773        # we don't know what the test will do with this half-setup config
774        # object and thus we make sure it gets unconfigured properly in any
775        # case (otherwise capturing could still be active, for example)
776        self.request.addfinalizer(config._ensure_unconfigure)
777        return config
778
779    def parseconfigure(self, *args):
780        """Return a new py.test configured Config instance.
781
782        This returns a new :py:class:`_pytest.config.Config` instance
783        like :py:meth:`parseconfig`, but also calls the
784        pytest_configure hook.
785
786        """
787        config = self.parseconfig(*args)
788        config._do_configure()
789        self.request.addfinalizer(config._ensure_unconfigure)
790        return config
791
792    def getitem(self,  source, funcname="test_func"):
793        """Return the test item for a test function.
794
795        This writes the source to a python file and runs py.test's
796        collection on the resulting module, returning the test item
797        for the requested function name.
798
799        :param source: The module source.
800
801        :param funcname: The name of the test function for which the
802           Item must be returned.
803
804        """
805        items = self.getitems(source)
806        for item in items:
807            if item.name == funcname:
808                return item
809        assert 0, "%r item not found in module:\n%s\nitems: %s" %(
810                  funcname, source, items)
811
812    def getitems(self,  source):
813        """Return all test items collected from the module.
814
815        This writes the source to a python file and runs py.test's
816        collection on the resulting module, returning all test items
817        contained within.
818
819        """
820        modcol = self.getmodulecol(source)
821        return self.genitems([modcol])
822
823    def getmodulecol(self,  source, configargs=(), withinit=False):
824        """Return the module collection node for ``source``.
825
826        This writes ``source`` to a file using :py:meth:`makepyfile`
827        and then runs the py.test collection on it, returning the
828        collection node for the test module.
829
830        :param source: The source code of the module to collect.
831
832        :param configargs: Any extra arguments to pass to
833           :py:meth:`parseconfigure`.
834
835        :param withinit: Whether to also write a ``__init__.py`` file
836           to the temporarly directory to ensure it is a package.
837
838        """
839        kw = {self.request.function.__name__: Source(source).strip()}
840        path = self.makepyfile(**kw)
841        if withinit:
842            self.makepyfile(__init__ = "#")
843        self.config = config = self.parseconfigure(path, *configargs)
844        node = self.getnode(config, path)
845        return node
846
847    def collect_by_name(self, modcol, name):
848        """Return the collection node for name from the module collection.
849
850        This will search a module collection node for a collection
851        node matching the given name.
852
853        :param modcol: A module collection node, see
854           :py:meth:`getmodulecol`.
855
856        :param name: The name of the node to return.
857
858        """
859        for colitem in modcol._memocollect():
860            if colitem.name == name:
861                return colitem
862
863    def popen(self, cmdargs, stdout, stderr, **kw):
864        """Invoke subprocess.Popen.
865
866        This calls subprocess.Popen making sure the current working
867        directory is the PYTHONPATH.
868
869        You probably want to use :py:meth:`run` instead.
870
871        """
872        env = os.environ.copy()
873        env['PYTHONPATH'] = os.pathsep.join(filter(None, [
874            str(os.getcwd()), env.get('PYTHONPATH', '')]))
875        kw['env'] = env
876        return subprocess.Popen(cmdargs,
877                                stdout=stdout, stderr=stderr, **kw)
878
879    def run(self, *cmdargs):
880        """Run a command with arguments.
881
882        Run a process using subprocess.Popen saving the stdout and
883        stderr.
884
885        Returns a :py:class:`RunResult`.
886
887        """
888        return self._run(*cmdargs)
889
890    def _run(self, *cmdargs):
891        cmdargs = [str(x) for x in cmdargs]
892        p1 = self.tmpdir.join("stdout")
893        p2 = self.tmpdir.join("stderr")
894        print_("running:", ' '.join(cmdargs))
895        print_("     in:", str(py.path.local()))
896        f1 = codecs.open(str(p1), "w", encoding="utf8")
897        f2 = codecs.open(str(p2), "w", encoding="utf8")
898        try:
899            now = time.time()
900            popen = self.popen(cmdargs, stdout=f1, stderr=f2,
901                close_fds=(sys.platform != "win32"))
902            ret = popen.wait()
903        finally:
904            f1.close()
905            f2.close()
906        f1 = codecs.open(str(p1), "r", encoding="utf8")
907        f2 = codecs.open(str(p2), "r", encoding="utf8")
908        try:
909            out = f1.read().splitlines()
910            err = f2.read().splitlines()
911        finally:
912            f1.close()
913            f2.close()
914        self._dump_lines(out, sys.stdout)
915        self._dump_lines(err, sys.stderr)
916        return RunResult(ret, out, err, time.time()-now)
917
918    def _dump_lines(self, lines, fp):
919        try:
920            for line in lines:
921                py.builtin.print_(line, file=fp)
922        except UnicodeEncodeError:
923            print("couldn't print to %s because of encoding" % (fp,))
924
925    def _getpytestargs(self):
926        # we cannot use "(sys.executable,script)"
927        # because on windows the script is e.g. a py.test.exe
928        return (sys.executable, _pytest_fullpath,) # noqa
929
930    def runpython(self, script):
931        """Run a python script using sys.executable as interpreter.
932
933        Returns a :py:class:`RunResult`.
934        """
935        return self.run(sys.executable, script)
936
937    def runpython_c(self, command):
938        """Run python -c "command", return a :py:class:`RunResult`."""
939        return self.run(sys.executable, "-c", command)
940
941    def runpytest_subprocess(self, *args, **kwargs):
942        """Run py.test as a subprocess with given arguments.
943
944        Any plugins added to the :py:attr:`plugins` list will added
945        using the ``-p`` command line option.  Addtionally
946        ``--basetemp`` is used put any temporary files and directories
947        in a numbered directory prefixed with "runpytest-" so they do
948        not conflict with the normal numberd pytest location for
949        temporary files and directories.
950
951        Returns a :py:class:`RunResult`.
952
953        """
954        p = py.path.local.make_numbered_dir(prefix="runpytest-",
955            keep=None, rootdir=self.tmpdir)
956        args = ('--basetemp=%s' % p, ) + args
957        #for x in args:
958        #    if '--confcutdir' in str(x):
959        #        break
960        #else:
961        #    pass
962        #    args = ('--confcutdir=.',) + args
963        plugins = [x for x in self.plugins if isinstance(x, str)]
964        if plugins:
965            args = ('-p', plugins[0]) + args
966        args = self._getpytestargs() + args
967        return self.run(*args)
968
969    def spawn_pytest(self, string, expect_timeout=10.0):
970        """Run py.test using pexpect.
971
972        This makes sure to use the right py.test and sets up the
973        temporary directory locations.
974
975        The pexpect child is returned.
976
977        """
978        basetemp = self.tmpdir.mkdir("pexpect")
979        invoke = " ".join(map(str, self._getpytestargs()))
980        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
981        return self.spawn(cmd, expect_timeout=expect_timeout)
982
983    def spawn(self, cmd, expect_timeout=10.0):
984        """Run a command using pexpect.
985
986        The pexpect child is returned.
987        """
988        pexpect = pytest.importorskip("pexpect", "3.0")
989        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
990            pytest.skip("pypy-64 bit not supported")
991        if sys.platform == "darwin":
992            pytest.xfail("pexpect does not work reliably on darwin?!")
993        if sys.platform.startswith("freebsd"):
994            pytest.xfail("pexpect does not work reliably on freebsd")
995        logfile = self.tmpdir.join("spawn.out").open("wb")
996        child = pexpect.spawn(cmd, logfile=logfile)
997        self.request.addfinalizer(logfile.close)
998        child.timeout = expect_timeout
999        return child
1000
1001def getdecoded(out):
1002        try:
1003            return out.decode("utf-8")
1004        except UnicodeDecodeError:
1005            return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
1006                    py.io.saferepr(out),)
1007
1008
1009class LineComp:
1010    def __init__(self):
1011        self.stringio = py.io.TextIO()
1012
1013    def assert_contains_lines(self, lines2):
1014        """ assert that lines2 are contained (linearly) in lines1.
1015            return a list of extralines found.
1016        """
1017        __tracebackhide__ = True
1018        val = self.stringio.getvalue()
1019        self.stringio.truncate(0)
1020        self.stringio.seek(0)
1021        lines1 = val.split("\n")
1022        return LineMatcher(lines1).fnmatch_lines(lines2)
1023
1024
1025class LineMatcher:
1026    """Flexible matching of text.
1027
1028    This is a convenience class to test large texts like the output of
1029    commands.
1030
1031    The constructor takes a list of lines without their trailing
1032    newlines, i.e. ``text.splitlines()``.
1033
1034    """
1035
1036    def __init__(self,  lines):
1037        self.lines = lines
1038
1039    def str(self):
1040        """Return the entire original text."""
1041        return "\n".join(self.lines)
1042
1043    def _getlines(self, lines2):
1044        if isinstance(lines2, str):
1045            lines2 = Source(lines2)
1046        if isinstance(lines2, Source):
1047            lines2 = lines2.strip().lines
1048        return lines2
1049
1050    def fnmatch_lines_random(self, lines2):
1051        """Check lines exist in the output.
1052
1053        The argument is a list of lines which have to occur in the
1054        output, in any order.  Each line can contain glob whildcards.
1055
1056        """
1057        lines2 = self._getlines(lines2)
1058        for line in lines2:
1059            for x in self.lines:
1060                if line == x or fnmatch(x, line):
1061                    print_("matched: ", repr(line))
1062                    break
1063            else:
1064                raise ValueError("line %r not found in output" % line)
1065
1066    def get_lines_after(self, fnline):
1067        """Return all lines following the given line in the text.
1068
1069        The given line can contain glob wildcards.
1070        """
1071        for i, line in enumerate(self.lines):
1072            if fnline == line or fnmatch(line, fnline):
1073                return self.lines[i+1:]
1074        raise ValueError("line %r not found in output" % fnline)
1075
1076    def fnmatch_lines(self, lines2):
1077        """Search the text for matching lines.
1078
1079        The argument is a list of lines which have to match and can
1080        use glob wildcards.  If they do not match an pytest.fail() is
1081        called.  The matches and non-matches are also printed on
1082        stdout.
1083
1084        """
1085        def show(arg1, arg2):
1086            py.builtin.print_(arg1, arg2, file=sys.stderr)
1087        lines2 = self._getlines(lines2)
1088        lines1 = self.lines[:]
1089        nextline = None
1090        extralines = []
1091        __tracebackhide__ = True
1092        for line in lines2:
1093            nomatchprinted = False
1094            while lines1:
1095                nextline = lines1.pop(0)
1096                if line == nextline:
1097                    show("exact match:", repr(line))
1098                    break
1099                elif fnmatch(nextline, line):
1100                    show("fnmatch:", repr(line))
1101                    show("   with:", repr(nextline))
1102                    break
1103                else:
1104                    if not nomatchprinted:
1105                        show("nomatch:", repr(line))
1106                        nomatchprinted = True
1107                    show("    and:", repr(nextline))
1108                extralines.append(nextline)
1109            else:
1110                pytest.fail("remains unmatched: %r, see stderr" % (line,))
1111