1""" (disabled by default) support for testing pytest and pytest plugins. """
2from __future__ import absolute_import, division, print_function
3
4import codecs
5import gc
6import os
7import platform
8import re
9import subprocess
10import six
11import sys
12import time
13import traceback
14from fnmatch import fnmatch
15
16from weakref import WeakKeyDictionary
17
18from _pytest.capture import MultiCapture, SysCapture
19from _pytest._code import Source
20import py
21import pytest
22from _pytest.main import Session, EXIT_OK
23from _pytest.assertion.rewrite import AssertionRewritingHook
24
25
26PYTEST_FULLPATH = os.path.abspath(pytest.__file__.rstrip("oc")).replace("$py.class", ".py")
27
28
29def pytest_addoption(parser):
30    # group = parser.getgroup("pytester", "pytester (self-tests) options")
31    parser.addoption('--lsof',
32                     action="store_true", dest="lsof", default=False,
33                     help=("run FD checks if lsof is available"))
34
35    parser.addoption('--runpytest', default="inprocess", dest="runpytest",
36                     choices=("inprocess", "subprocess", ),
37                     help=("run pytest sub runs in tests using an 'inprocess' "
38                           "or 'subprocess' (python -m main) method"))
39
40
41def pytest_configure(config):
42    if config.getvalue("lsof"):
43        checker = LsofFdLeakChecker()
44        if checker.matching_platform():
45            config.pluginmanager.register(checker)
46
47
48class LsofFdLeakChecker(object):
49    def get_open_files(self):
50        out = self._exec_lsof()
51        open_files = self._parse_lsof_output(out)
52        return open_files
53
54    def _exec_lsof(self):
55        pid = os.getpid()
56        return py.process.cmdexec("lsof -Ffn0 -p %d" % pid)
57
58    def _parse_lsof_output(self, out):
59        def isopen(line):
60            return line.startswith('f') and ("deleted" not in line and
61                                             'mem' not in line and "txt" not in line and 'cwd' not in line)
62
63        open_files = []
64
65        for line in out.split("\n"):
66            if isopen(line):
67                fields = line.split('\0')
68                fd = fields[0][1:]
69                filename = fields[1][1:]
70                if filename.startswith('/'):
71                    open_files.append((fd, filename))
72
73        return open_files
74
75    def matching_platform(self):
76        try:
77            py.process.cmdexec("lsof -v")
78        except (py.process.cmdexec.Error, UnicodeDecodeError):
79            # cmdexec may raise UnicodeDecodeError on Windows systems
80            # with locale other than english:
81            # https://bitbucket.org/pytest-dev/py/issues/66
82            return False
83        else:
84            return True
85
86    @pytest.hookimpl(hookwrapper=True, tryfirst=True)
87    def pytest_runtest_protocol(self, item):
88        lines1 = self.get_open_files()
89        yield
90        if hasattr(sys, "pypy_version_info"):
91            gc.collect()
92        lines2 = self.get_open_files()
93
94        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
95        leaked_files = [t for t in lines2 if t[0] in new_fds]
96        if leaked_files:
97            error = []
98            error.append("***** %s FD leakage detected" % len(leaked_files))
99            error.extend([str(f) for f in leaked_files])
100            error.append("*** Before:")
101            error.extend([str(f) for f in lines1])
102            error.append("*** After:")
103            error.extend([str(f) for f in lines2])
104            error.append(error[0])
105            error.append("*** function %s:%s: %s " % item.location)
106            error.append("See issue #2366")
107            item.warn('', "\n".join(error))
108
109
110# XXX copied from execnet's conftest.py - needs to be merged
111winpymap = {
112    'python2.7': r'C:\Python27\python.exe',
113    'python3.4': r'C:\Python34\python.exe',
114    'python3.5': r'C:\Python35\python.exe',
115    'python3.6': r'C:\Python36\python.exe',
116}
117
118
119def getexecutable(name, cache={}):
120    try:
121        return cache[name]
122    except KeyError:
123        executable = py.path.local.sysfind(name)
124        if executable:
125            import subprocess
126            popen = subprocess.Popen([str(executable), "--version"],
127                                     universal_newlines=True, stderr=subprocess.PIPE)
128            out, err = popen.communicate()
129            if name == "jython":
130                if not err or "2.5" not in err:
131                    executable = None
132                if "2.5.2" in err:
133                    executable = None  # http://bugs.jython.org/issue1790
134            elif popen.returncode != 0:
135                # Handle pyenv's 127.
136                executable = None
137        cache[name] = executable
138        return executable
139
140
141@pytest.fixture(params=['python2.7', 'python3.4', 'pypy', 'pypy3'])
142def anypython(request):
143    name = request.param
144    executable = getexecutable(name)
145    if executable is None:
146        if sys.platform == "win32":
147            executable = winpymap.get(name, None)
148            if executable:
149                executable = py.path.local(executable)
150                if executable.check():
151                    return executable
152        pytest.skip("no suitable %s found" % (name,))
153    return executable
154
155# used at least by pytest-xdist plugin
156
157
158@pytest.fixture
159def _pytest(request):
160    """ Return a helper which offers a gethookrecorder(hook)
161    method which returns a HookRecorder instance which helps
162    to make assertions about called hooks.
163    """
164    return PytestArg(request)
165
166
167class PytestArg:
168    def __init__(self, request):
169        self.request = request
170
171    def gethookrecorder(self, hook):
172        hookrecorder = HookRecorder(hook._pm)
173        self.request.addfinalizer(hookrecorder.finish_recording)
174        return hookrecorder
175
176
177def get_public_names(values):
178    """Only return names from iterator values without a leading underscore."""
179    return [x for x in values if x[0] != "_"]
180
181
182class ParsedCall:
183    def __init__(self, name, kwargs):
184        self.__dict__.update(kwargs)
185        self._name = name
186
187    def __repr__(self):
188        d = self.__dict__.copy()
189        del d['_name']
190        return "<ParsedCall %r(**%r)>" % (self._name, d)
191
192
193class HookRecorder:
194    """Record all hooks called in a plugin manager.
195
196    This wraps all the hook calls in the plugin manager, recording
197    each call before propagating the normal calls.
198
199    """
200
201    def __init__(self, pluginmanager):
202        self._pluginmanager = pluginmanager
203        self.calls = []
204
205        def before(hook_name, hook_impls, kwargs):
206            self.calls.append(ParsedCall(hook_name, kwargs))
207
208        def after(outcome, hook_name, hook_impls, kwargs):
209            pass
210
211        self._undo_wrapping = pluginmanager.add_hookcall_monitoring(before, after)
212
213    def finish_recording(self):
214        self._undo_wrapping()
215
216    def getcalls(self, names):
217        if isinstance(names, str):
218            names = names.split()
219        return [call for call in self.calls if call._name in names]
220
221    def assert_contains(self, entries):
222        __tracebackhide__ = True
223        i = 0
224        entries = list(entries)
225        backlocals = sys._getframe(1).f_locals
226        while entries:
227            name, check = entries.pop(0)
228            for ind, call in enumerate(self.calls[i:]):
229                if call._name == name:
230                    print("NAMEMATCH", name, call)
231                    if eval(check, backlocals, call.__dict__):
232                        print("CHECKERMATCH", repr(check), "->", call)
233                    else:
234                        print("NOCHECKERMATCH", repr(check), "-", call)
235                        continue
236                    i += ind + 1
237                    break
238                print("NONAMEMATCH", name, "with", call)
239            else:
240                pytest.fail("could not find %r check %r" % (name, check))
241
242    def popcall(self, name):
243        __tracebackhide__ = True
244        for i, call in enumerate(self.calls):
245            if call._name == name:
246                del self.calls[i]
247                return call
248        lines = ["could not find call %r, in:" % (name,)]
249        lines.extend(["  %s" % str(x) for x in self.calls])
250        pytest.fail("\n".join(lines))
251
252    def getcall(self, name):
253        values = self.getcalls(name)
254        assert len(values) == 1, (name, values)
255        return values[0]
256
257    # functionality for test reports
258
259    def getreports(self,
260                   names="pytest_runtest_logreport pytest_collectreport"):
261        return [x.report for x in self.getcalls(names)]
262
263    def matchreport(self, inamepart="",
264                    names="pytest_runtest_logreport pytest_collectreport", when=None):
265        """ return a testreport whose dotted import path matches """
266        values = []
267        for rep in self.getreports(names=names):
268            try:
269                if not when and rep.when != "call" and rep.passed:
270                    # setup/teardown passing reports - let's ignore those
271                    continue
272            except AttributeError:
273                pass
274            if when and getattr(rep, 'when', None) != when:
275                continue
276            if not inamepart or inamepart in rep.nodeid.split("::"):
277                values.append(rep)
278        if not values:
279            raise ValueError("could not find test report matching %r: "
280                             "no test reports at all!" % (inamepart,))
281        if len(values) > 1:
282            raise ValueError(
283                "found 2 or more testreports matching %r: %s" % (inamepart, values))
284        return values[0]
285
286    def getfailures(self,
287                    names='pytest_runtest_logreport pytest_collectreport'):
288        return [rep for rep in self.getreports(names) if rep.failed]
289
290    def getfailedcollections(self):
291        return self.getfailures('pytest_collectreport')
292
293    def listoutcomes(self):
294        passed = []
295        skipped = []
296        failed = []
297        for rep in self.getreports(
298                "pytest_collectreport pytest_runtest_logreport"):
299            if rep.passed:
300                if getattr(rep, "when", None) == "call":
301                    passed.append(rep)
302            elif rep.skipped:
303                skipped.append(rep)
304            elif rep.failed:
305                failed.append(rep)
306        return passed, skipped, failed
307
308    def countoutcomes(self):
309        return [len(x) for x in self.listoutcomes()]
310
311    def assertoutcome(self, passed=0, skipped=0, failed=0):
312        realpassed, realskipped, realfailed = self.listoutcomes()
313        assert passed == len(realpassed)
314        assert skipped == len(realskipped)
315        assert failed == len(realfailed)
316
317    def clear(self):
318        self.calls[:] = []
319
320
321@pytest.fixture
322def linecomp(request):
323    return LineComp()
324
325
326@pytest.fixture(name='LineMatcher')
327def LineMatcher_fixture(request):
328    return LineMatcher
329
330
331@pytest.fixture
332def testdir(request, tmpdir_factory):
333    return Testdir(request, tmpdir_factory)
334
335
336rex_outcome = re.compile(r"(\d+) ([\w-]+)")
337
338
339class RunResult:
340    """The result of running a command.
341
342    Attributes:
343
344    :ret: The return value.
345    :outlines: List of lines captured from stdout.
346    :errlines: List of lines captures from stderr.
347    :stdout: :py:class:`LineMatcher` of stdout, use ``stdout.str()`` to
348       reconstruct stdout or the commonly used
349       ``stdout.fnmatch_lines()`` method.
350    :stderrr: :py:class:`LineMatcher` of stderr.
351    :duration: Duration in seconds.
352
353    """
354
355    def __init__(self, ret, outlines, errlines, duration):
356        self.ret = ret
357        self.outlines = outlines
358        self.errlines = errlines
359        self.stdout = LineMatcher(outlines)
360        self.stderr = LineMatcher(errlines)
361        self.duration = duration
362
363    def parseoutcomes(self):
364        """ Return a dictionary of outcomestring->num from parsing
365        the terminal output that the test process produced."""
366        for line in reversed(self.outlines):
367            if 'seconds' in line:
368                outcomes = rex_outcome.findall(line)
369                if outcomes:
370                    d = {}
371                    for num, cat in outcomes:
372                        d[cat] = int(num)
373                    return d
374        raise ValueError("Pytest terminal report not found")
375
376    def assert_outcomes(self, passed=0, skipped=0, failed=0, error=0):
377        """ assert that the specified outcomes appear with the respective
378        numbers (0 means it didn't occur) in the text output from a test run."""
379        d = self.parseoutcomes()
380        obtained = {
381            'passed': d.get('passed', 0),
382            'skipped': d.get('skipped', 0),
383            'failed': d.get('failed', 0),
384            'error': d.get('error', 0),
385        }
386        assert obtained == dict(passed=passed, skipped=skipped, failed=failed, error=error)
387
388
389class Testdir:
390    """Temporary test directory with tools to test/run pytest itself.
391
392    This is based on the ``tmpdir`` fixture but provides a number of
393    methods which aid with testing pytest itself.  Unless
394    :py:meth:`chdir` is used all methods will use :py:attr:`tmpdir` as
395    current working directory.
396
397    Attributes:
398
399    :tmpdir: The :py:class:`py.path.local` instance of the temporary
400       directory.
401
402    :plugins: A list of plugins to use with :py:meth:`parseconfig` and
403       :py:meth:`runpytest`.  Initially this is an empty list but
404       plugins can be added to the list.  The type of items to add to
405       the list depend on the method which uses them so refer to them
406       for details.
407
408    """
409
410    def __init__(self, request, tmpdir_factory):
411        self.request = request
412        self._mod_collections = WeakKeyDictionary()
413        name = request.function.__name__
414        self.tmpdir = tmpdir_factory.mktemp(name, numbered=True)
415        self.plugins = []
416        self._savesyspath = (list(sys.path), list(sys.meta_path))
417        self._savemodulekeys = set(sys.modules)
418        self.chdir()  # always chdir
419        self.request.addfinalizer(self.finalize)
420        method = self.request.config.getoption("--runpytest")
421        if method == "inprocess":
422            self._runpytest_method = self.runpytest_inprocess
423        elif method == "subprocess":
424            self._runpytest_method = self.runpytest_subprocess
425
426    def __repr__(self):
427        return "<Testdir %r>" % (self.tmpdir,)
428
429    def finalize(self):
430        """Clean up global state artifacts.
431
432        Some methods modify the global interpreter state and this
433        tries to clean this up.  It does not remove the temporary
434        directory however so it can be looked at after the test run
435        has finished.
436
437        """
438        sys.path[:], sys.meta_path[:] = self._savesyspath
439        if hasattr(self, '_olddir'):
440            self._olddir.chdir()
441        self.delete_loaded_modules()
442
443    def delete_loaded_modules(self):
444        """Delete modules that have been loaded during a test.
445
446        This allows the interpreter to catch module changes in case
447        the module is re-imported.
448        """
449        for name in set(sys.modules).difference(self._savemodulekeys):
450            # some zope modules used by twisted-related tests keeps internal
451            # state and can't be deleted; we had some trouble in the past
452            # with zope.interface for example
453            if not name.startswith("zope"):
454                del sys.modules[name]
455
456    def make_hook_recorder(self, pluginmanager):
457        """Create a new :py:class:`HookRecorder` for a PluginManager."""
458        assert not hasattr(pluginmanager, "reprec")
459        pluginmanager.reprec = reprec = HookRecorder(pluginmanager)
460        self.request.addfinalizer(reprec.finish_recording)
461        return reprec
462
463    def chdir(self):
464        """Cd into the temporary directory.
465
466        This is done automatically upon instantiation.
467
468        """
469        old = self.tmpdir.chdir()
470        if not hasattr(self, '_olddir'):
471            self._olddir = old
472
473    def _makefile(self, ext, args, kwargs, encoding='utf-8'):
474        items = list(kwargs.items())
475
476        def to_text(s):
477            return s.decode(encoding) if isinstance(s, bytes) else six.text_type(s)
478
479        if args:
480            source = u"\n".join(to_text(x) for x in args)
481            basename = self.request.function.__name__
482            items.insert(0, (basename, source))
483
484        ret = None
485        for basename, value in items:
486            p = self.tmpdir.join(basename).new(ext=ext)
487            p.dirpath().ensure_dir()
488            source = Source(value)
489            source = u"\n".join(to_text(line) for line in source.lines)
490            p.write(source.strip().encode(encoding), "wb")
491            if ret is None:
492                ret = p
493        return ret
494
495    def makefile(self, ext, *args, **kwargs):
496        """Create a new file in the testdir.
497
498        ext: The extension the file should use, including the dot.
499           E.g. ".py".
500
501        args: All args will be treated as strings and joined using
502           newlines.  The result will be written as contents to the
503           file.  The name of the file will be based on the test
504           function requesting this fixture.
505           E.g. "testdir.makefile('.txt', 'line1', 'line2')"
506
507        kwargs: Each keyword is the name of a file, while the value of
508           it will be written as contents of the file.
509           E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
510
511        """
512        return self._makefile(ext, args, kwargs)
513
514    def makeconftest(self, source):
515        """Write a contest.py file with 'source' as contents."""
516        return self.makepyfile(conftest=source)
517
518    def makeini(self, source):
519        """Write a tox.ini file with 'source' as contents."""
520        return self.makefile('.ini', tox=source)
521
522    def getinicfg(self, source):
523        """Return the pytest section from the tox.ini config file."""
524        p = self.makeini(source)
525        return py.iniconfig.IniConfig(p)['pytest']
526
527    def makepyfile(self, *args, **kwargs):
528        """Shortcut for .makefile() with a .py extension."""
529        return self._makefile('.py', args, kwargs)
530
531    def maketxtfile(self, *args, **kwargs):
532        """Shortcut for .makefile() with a .txt extension."""
533        return self._makefile('.txt', args, kwargs)
534
535    def syspathinsert(self, path=None):
536        """Prepend a directory to sys.path, defaults to :py:attr:`tmpdir`.
537
538        This is undone automatically after the test.
539        """
540        if path is None:
541            path = self.tmpdir
542        sys.path.insert(0, str(path))
543        # a call to syspathinsert() usually means that the caller
544        # wants to import some dynamically created files.
545        # with python3 we thus invalidate import caches.
546        self._possibly_invalidate_import_caches()
547
548    def _possibly_invalidate_import_caches(self):
549        # invalidate caches if we can (py33 and above)
550        try:
551            import importlib
552        except ImportError:
553            pass
554        else:
555            if hasattr(importlib, "invalidate_caches"):
556                importlib.invalidate_caches()
557
558    def mkdir(self, name):
559        """Create a new (sub)directory."""
560        return self.tmpdir.mkdir(name)
561
562    def mkpydir(self, name):
563        """Create a new python package.
564
565        This creates a (sub)directory with an empty ``__init__.py``
566        file so that is recognised as a python package.
567
568        """
569        p = self.mkdir(name)
570        p.ensure("__init__.py")
571        return p
572
573    Session = Session
574
575    def getnode(self, config, arg):
576        """Return the collection node of a file.
577
578        :param config: :py:class:`_pytest.config.Config` instance, see
579           :py:meth:`parseconfig` and :py:meth:`parseconfigure` to
580           create the configuration.
581
582        :param arg: A :py:class:`py.path.local` instance of the file.
583
584        """
585        session = Session(config)
586        assert '::' not in str(arg)
587        p = py.path.local(arg)
588        config.hook.pytest_sessionstart(session=session)
589        res = session.perform_collect([str(p)], genitems=False)[0]
590        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
591        return res
592
593    def getpathnode(self, path):
594        """Return the collection node of a file.
595
596        This is like :py:meth:`getnode` but uses
597        :py:meth:`parseconfigure` to create the (configured) pytest
598        Config instance.
599
600        :param path: A :py:class:`py.path.local` instance of the file.
601
602        """
603        config = self.parseconfigure(path)
604        session = Session(config)
605        x = session.fspath.bestrelpath(path)
606        config.hook.pytest_sessionstart(session=session)
607        res = session.perform_collect([x], genitems=False)[0]
608        config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK)
609        return res
610
611    def genitems(self, colitems):
612        """Generate all test items from a collection node.
613
614        This recurses into the collection node and returns a list of
615        all the test items contained within.
616
617        """
618        session = colitems[0].session
619        result = []
620        for colitem in colitems:
621            result.extend(session.genitems(colitem))
622        return result
623
624    def runitem(self, source):
625        """Run the "test_func" Item.
626
627        The calling test instance (the class which contains the test
628        method) must provide a ``.getrunner()`` method which should
629        return a runner which can run the test protocol for a single
630        item, like e.g. :py:func:`_pytest.runner.runtestprotocol`.
631
632        """
633        # used from runner functional tests
634        item = self.getitem(source)
635        # the test class where we are called from wants to provide the runner
636        testclassinstance = self.request.instance
637        runner = testclassinstance.getrunner()
638        return runner(item)
639
640    def inline_runsource(self, source, *cmdlineargs):
641        """Run a test module in process using ``pytest.main()``.
642
643        This run writes "source" into a temporary file and runs
644        ``pytest.main()`` on it, returning a :py:class:`HookRecorder`
645        instance for the result.
646
647        :param source: The source code of the test module.
648
649        :param cmdlineargs: Any extra command line arguments to use.
650
651        :return: :py:class:`HookRecorder` instance of the result.
652
653        """
654        p = self.makepyfile(source)
655        values = list(cmdlineargs) + [p]
656        return self.inline_run(*values)
657
658    def inline_genitems(self, *args):
659        """Run ``pytest.main(['--collectonly'])`` in-process.
660
661        Returns a tuple of the collected items and a
662        :py:class:`HookRecorder` instance.
663
664        This runs the :py:func:`pytest.main` function to run all of
665        pytest inside the test process itself like
666        :py:meth:`inline_run`.  However the return value is a tuple of
667        the collection items and a :py:class:`HookRecorder` instance.
668
669        """
670        rec = self.inline_run("--collect-only", *args)
671        items = [x.item for x in rec.getcalls("pytest_itemcollected")]
672        return items, rec
673
674    def inline_run(self, *args, **kwargs):
675        """Run ``pytest.main()`` in-process, returning a HookRecorder.
676
677        This runs the :py:func:`pytest.main` function to run all of
678        pytest inside the test process itself.  This means it can
679        return a :py:class:`HookRecorder` instance which gives more
680        detailed results from then run then can be done by matching
681        stdout/stderr from :py:meth:`runpytest`.
682
683        :param args: Any command line arguments to pass to
684           :py:func:`pytest.main`.
685
686        :param plugin: (keyword-only) Extra plugin instances the
687           ``pytest.main()`` instance should use.
688
689        :return: A :py:class:`HookRecorder` instance.
690        """
691        # When running py.test inline any plugins active in the main
692        # test process are already imported.  So this disables the
693        # warning which will trigger to say they can no longer be
694        # rewritten, which is fine as they are already rewritten.
695        orig_warn = AssertionRewritingHook._warn_already_imported
696
697        def revert():
698            AssertionRewritingHook._warn_already_imported = orig_warn
699
700        self.request.addfinalizer(revert)
701        AssertionRewritingHook._warn_already_imported = lambda *a: None
702
703        rec = []
704
705        class Collect:
706            def pytest_configure(x, config):
707                rec.append(self.make_hook_recorder(config.pluginmanager))
708
709        plugins = kwargs.get("plugins") or []
710        plugins.append(Collect())
711        ret = pytest.main(list(args), plugins=plugins)
712        self.delete_loaded_modules()
713        if len(rec) == 1:
714            reprec = rec.pop()
715        else:
716            class reprec:
717                pass
718        reprec.ret = ret
719
720        # typically we reraise keyboard interrupts from the child run
721        # because it's our user requesting interruption of the testing
722        if ret == 2 and not kwargs.get("no_reraise_ctrlc"):
723            calls = reprec.getcalls("pytest_keyboard_interrupt")
724            if calls and calls[-1].excinfo.type == KeyboardInterrupt:
725                raise KeyboardInterrupt()
726        return reprec
727
728    def runpytest_inprocess(self, *args, **kwargs):
729        """ Return result of running pytest in-process, providing a similar
730        interface to what self.runpytest() provides. """
731        if kwargs.get("syspathinsert"):
732            self.syspathinsert()
733        now = time.time()
734        capture = MultiCapture(Capture=SysCapture)
735        capture.start_capturing()
736        try:
737            try:
738                reprec = self.inline_run(*args, **kwargs)
739            except SystemExit as e:
740
741                class reprec:
742                    ret = e.args[0]
743
744            except Exception:
745                traceback.print_exc()
746
747                class reprec:
748                    ret = 3
749        finally:
750            out, err = capture.readouterr()
751            capture.stop_capturing()
752            sys.stdout.write(out)
753            sys.stderr.write(err)
754
755        res = RunResult(reprec.ret,
756                        out.split("\n"), err.split("\n"),
757                        time.time() - now)
758        res.reprec = reprec
759        return res
760
761    def runpytest(self, *args, **kwargs):
762        """ Run pytest inline or in a subprocess, depending on the command line
763        option "--runpytest" and return a :py:class:`RunResult`.
764
765        """
766        args = self._ensure_basetemp(args)
767        return self._runpytest_method(*args, **kwargs)
768
769    def _ensure_basetemp(self, args):
770        args = [str(x) for x in args]
771        for x in args:
772            if str(x).startswith('--basetemp'):
773                # print("basedtemp exists: %s" %(args,))
774                break
775        else:
776            args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp'))
777            # print("added basetemp: %s" %(args,))
778        return args
779
780    def parseconfig(self, *args):
781        """Return a new pytest Config instance from given commandline args.
782
783        This invokes the pytest bootstrapping code in _pytest.config
784        to create a new :py:class:`_pytest.core.PluginManager` and
785        call the pytest_cmdline_parse hook to create new
786        :py:class:`_pytest.config.Config` instance.
787
788        If :py:attr:`plugins` has been populated they should be plugin
789        modules which will be registered with the PluginManager.
790
791        """
792        args = self._ensure_basetemp(args)
793
794        import _pytest.config
795        config = _pytest.config._prepareconfig(args, self.plugins)
796        # we don't know what the test will do with this half-setup config
797        # object and thus we make sure it gets unconfigured properly in any
798        # case (otherwise capturing could still be active, for example)
799        self.request.addfinalizer(config._ensure_unconfigure)
800        return config
801
802    def parseconfigure(self, *args):
803        """Return a new pytest configured Config instance.
804
805        This returns a new :py:class:`_pytest.config.Config` instance
806        like :py:meth:`parseconfig`, but also calls the
807        pytest_configure hook.
808
809        """
810        config = self.parseconfig(*args)
811        config._do_configure()
812        self.request.addfinalizer(config._ensure_unconfigure)
813        return config
814
815    def getitem(self, source, funcname="test_func"):
816        """Return the test item for a test function.
817
818        This writes the source to a python file and runs pytest's
819        collection on the resulting module, returning the test item
820        for the requested function name.
821
822        :param source: The module source.
823
824        :param funcname: The name of the test function for which the
825           Item must be returned.
826
827        """
828        items = self.getitems(source)
829        for item in items:
830            if item.name == funcname:
831                return item
832        assert 0, "%r item not found in module:\n%s\nitems: %s" % (
833                  funcname, source, items)
834
835    def getitems(self, source):
836        """Return all test items collected from the module.
837
838        This writes the source to a python file and runs pytest's
839        collection on the resulting module, returning all test items
840        contained within.
841
842        """
843        modcol = self.getmodulecol(source)
844        return self.genitems([modcol])
845
846    def getmodulecol(self, source, configargs=(), withinit=False):
847        """Return the module collection node for ``source``.
848
849        This writes ``source`` to a file using :py:meth:`makepyfile`
850        and then runs the pytest collection on it, returning the
851        collection node for the test module.
852
853        :param source: The source code of the module to collect.
854
855        :param configargs: Any extra arguments to pass to
856           :py:meth:`parseconfigure`.
857
858        :param withinit: Whether to also write a ``__init__.py`` file
859           to the temporary directory to ensure it is a package.
860
861        """
862        kw = {self.request.function.__name__: Source(source).strip()}
863        path = self.makepyfile(**kw)
864        if withinit:
865            self.makepyfile(__init__="#")
866        self.config = config = self.parseconfigure(path, *configargs)
867        node = self.getnode(config, path)
868
869        return node
870
871    def collect_by_name(self, modcol, name):
872        """Return the collection node for name from the module collection.
873
874        This will search a module collection node for a collection
875        node matching the given name.
876
877        :param modcol: A module collection node, see
878           :py:meth:`getmodulecol`.
879
880        :param name: The name of the node to return.
881
882        """
883        if modcol not in self._mod_collections:
884            self._mod_collections[modcol] = list(modcol.collect())
885        for colitem in self._mod_collections[modcol]:
886            if colitem.name == name:
887                return colitem
888
889    def popen(self, cmdargs, stdout, stderr, **kw):
890        """Invoke subprocess.Popen.
891
892        This calls subprocess.Popen making sure the current working
893        directory is the PYTHONPATH.
894
895        You probably want to use :py:meth:`run` instead.
896
897        """
898        env = os.environ.copy()
899        env['PYTHONPATH'] = os.pathsep.join(filter(None, [
900            str(os.getcwd()), env.get('PYTHONPATH', '')]))
901        kw['env'] = env
902
903        popen = subprocess.Popen(cmdargs, stdin=subprocess.PIPE, stdout=stdout, stderr=stderr, **kw)
904        popen.stdin.close()
905
906        return popen
907
908    def run(self, *cmdargs):
909        """Run a command with arguments.
910
911        Run a process using subprocess.Popen saving the stdout and
912        stderr.
913
914        Returns a :py:class:`RunResult`.
915
916        """
917        return self._run(*cmdargs)
918
919    def _run(self, *cmdargs):
920        cmdargs = [str(x) for x in cmdargs]
921        p1 = self.tmpdir.join("stdout")
922        p2 = self.tmpdir.join("stderr")
923        print("running:", ' '.join(cmdargs))
924        print("     in:", str(py.path.local()))
925        f1 = codecs.open(str(p1), "w", encoding="utf8")
926        f2 = codecs.open(str(p2), "w", encoding="utf8")
927        try:
928            now = time.time()
929            popen = self.popen(cmdargs, stdout=f1, stderr=f2,
930                               close_fds=(sys.platform != "win32"))
931            ret = popen.wait()
932        finally:
933            f1.close()
934            f2.close()
935        f1 = codecs.open(str(p1), "r", encoding="utf8")
936        f2 = codecs.open(str(p2), "r", encoding="utf8")
937        try:
938            out = f1.read().splitlines()
939            err = f2.read().splitlines()
940        finally:
941            f1.close()
942            f2.close()
943        self._dump_lines(out, sys.stdout)
944        self._dump_lines(err, sys.stderr)
945        return RunResult(ret, out, err, time.time() - now)
946
947    def _dump_lines(self, lines, fp):
948        try:
949            for line in lines:
950                print(line, file=fp)
951        except UnicodeEncodeError:
952            print("couldn't print to %s because of encoding" % (fp,))
953
954    def _getpytestargs(self):
955        # we cannot use "(sys.executable,script)"
956        # because on windows the script is e.g. a pytest.exe
957        return (sys.executable, PYTEST_FULLPATH) # noqa
958
959    def runpython(self, script):
960        """Run a python script using sys.executable as interpreter.
961
962        Returns a :py:class:`RunResult`.
963        """
964        return self.run(sys.executable, script)
965
966    def runpython_c(self, command):
967        """Run python -c "command", return a :py:class:`RunResult`."""
968        return self.run(sys.executable, "-c", command)
969
970    def runpytest_subprocess(self, *args, **kwargs):
971        """Run pytest as a subprocess with given arguments.
972
973        Any plugins added to the :py:attr:`plugins` list will added
974        using the ``-p`` command line option.  Addtionally
975        ``--basetemp`` is used put any temporary files and directories
976        in a numbered directory prefixed with "runpytest-" so they do
977        not conflict with the normal numberd pytest location for
978        temporary files and directories.
979
980        Returns a :py:class:`RunResult`.
981
982        """
983        p = py.path.local.make_numbered_dir(prefix="runpytest-",
984                                            keep=None, rootdir=self.tmpdir)
985        args = ('--basetemp=%s' % p, ) + args
986        # for x in args:
987        #    if '--confcutdir' in str(x):
988        #        break
989        # else:
990        #    pass
991        #    args = ('--confcutdir=.',) + args
992        plugins = [x for x in self.plugins if isinstance(x, str)]
993        if plugins:
994            args = ('-p', plugins[0]) + args
995        args = self._getpytestargs() + args
996        return self.run(*args)
997
998    def spawn_pytest(self, string, expect_timeout=10.0):
999        """Run pytest using pexpect.
1000
1001        This makes sure to use the right pytest and sets up the
1002        temporary directory locations.
1003
1004        The pexpect child is returned.
1005
1006        """
1007        basetemp = self.tmpdir.mkdir("temp-pexpect")
1008        invoke = " ".join(map(str, self._getpytestargs()))
1009        cmd = "%s --basetemp=%s %s" % (invoke, basetemp, string)
1010        return self.spawn(cmd, expect_timeout=expect_timeout)
1011
1012    def spawn(self, cmd, expect_timeout=10.0):
1013        """Run a command using pexpect.
1014
1015        The pexpect child is returned.
1016        """
1017        pexpect = pytest.importorskip("pexpect", "3.0")
1018        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
1019            pytest.skip("pypy-64 bit not supported")
1020        if sys.platform.startswith("freebsd"):
1021            pytest.xfail("pexpect does not work reliably on freebsd")
1022        logfile = self.tmpdir.join("spawn.out").open("wb")
1023        child = pexpect.spawn(cmd, logfile=logfile)
1024        self.request.addfinalizer(logfile.close)
1025        child.timeout = expect_timeout
1026        return child
1027
1028
1029def getdecoded(out):
1030    try:
1031        return out.decode("utf-8")
1032    except UnicodeDecodeError:
1033        return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % (
1034            py.io.saferepr(out),)
1035
1036
1037class LineComp:
1038    def __init__(self):
1039        self.stringio = py.io.TextIO()
1040
1041    def assert_contains_lines(self, lines2):
1042        """ assert that lines2 are contained (linearly) in lines1.
1043            return a list of extralines found.
1044        """
1045        __tracebackhide__ = True
1046        val = self.stringio.getvalue()
1047        self.stringio.truncate(0)
1048        self.stringio.seek(0)
1049        lines1 = val.split("\n")
1050        return LineMatcher(lines1).fnmatch_lines(lines2)
1051
1052
1053class LineMatcher:
1054    """Flexible matching of text.
1055
1056    This is a convenience class to test large texts like the output of
1057    commands.
1058
1059    The constructor takes a list of lines without their trailing
1060    newlines, i.e. ``text.splitlines()``.
1061
1062    """
1063
1064    def __init__(self, lines):
1065        self.lines = lines
1066        self._log_output = []
1067
1068    def str(self):
1069        """Return the entire original text."""
1070        return "\n".join(self.lines)
1071
1072    def _getlines(self, lines2):
1073        if isinstance(lines2, str):
1074            lines2 = Source(lines2)
1075        if isinstance(lines2, Source):
1076            lines2 = lines2.strip().lines
1077        return lines2
1078
1079    def fnmatch_lines_random(self, lines2):
1080        """Check lines exist in the output using ``fnmatch.fnmatch``, in any order.
1081
1082        The argument is a list of lines which have to occur in the
1083        output, in any order.
1084        """
1085        self._match_lines_random(lines2, fnmatch)
1086
1087    def re_match_lines_random(self, lines2):
1088        """Check lines exist in the output using ``re.match``, in any order.
1089
1090        The argument is a list of lines which have to occur in the
1091        output, in any order.
1092
1093        """
1094        self._match_lines_random(lines2, lambda name, pat: re.match(pat, name))
1095
1096    def _match_lines_random(self, lines2, match_func):
1097        """Check lines exist in the output.
1098
1099        The argument is a list of lines which have to occur in the
1100        output, in any order.  Each line can contain glob whildcards.
1101
1102        """
1103        lines2 = self._getlines(lines2)
1104        for line in lines2:
1105            for x in self.lines:
1106                if line == x or match_func(x, line):
1107                    self._log("matched: ", repr(line))
1108                    break
1109            else:
1110                self._log("line %r not found in output" % line)
1111                raise ValueError(self._log_text)
1112
1113    def get_lines_after(self, fnline):
1114        """Return all lines following the given line in the text.
1115
1116        The given line can contain glob wildcards.
1117        """
1118        for i, line in enumerate(self.lines):
1119            if fnline == line or fnmatch(line, fnline):
1120                return self.lines[i + 1:]
1121        raise ValueError("line %r not found in output" % fnline)
1122
1123    def _log(self, *args):
1124        self._log_output.append(' '.join((str(x) for x in args)))
1125
1126    @property
1127    def _log_text(self):
1128        return '\n'.join(self._log_output)
1129
1130    def fnmatch_lines(self, lines2):
1131        """Search captured text for matching lines using ``fnmatch.fnmatch``.
1132
1133        The argument is a list of lines which have to match and can
1134        use glob wildcards.  If they do not match a pytest.fail() is
1135        called.  The matches and non-matches are also printed on
1136        stdout.
1137
1138        """
1139        self._match_lines(lines2, fnmatch, 'fnmatch')
1140
1141    def re_match_lines(self, lines2):
1142        """Search captured text for matching lines using ``re.match``.
1143
1144        The argument is a list of lines which have to match using ``re.match``.
1145        If they do not match a pytest.fail() is called.
1146
1147        The matches and non-matches are also printed on
1148        stdout.
1149        """
1150        self._match_lines(lines2, lambda name, pat: re.match(pat, name), 're.match')
1151
1152    def _match_lines(self, lines2, match_func, match_nickname):
1153        """Underlying implementation of ``fnmatch_lines`` and ``re_match_lines``.
1154
1155        :param list[str] lines2: list of string patterns to match. The actual format depends on
1156            ``match_func``.
1157        :param match_func: a callable ``match_func(line, pattern)`` where line is the captured
1158            line from stdout/stderr and pattern is the matching pattern.
1159
1160        :param str match_nickname: the nickname for the match function that will be logged
1161            to stdout when a match occurs.
1162        """
1163        lines2 = self._getlines(lines2)
1164        lines1 = self.lines[:]
1165        nextline = None
1166        extralines = []
1167        __tracebackhide__ = True
1168        for line in lines2:
1169            nomatchprinted = False
1170            while lines1:
1171                nextline = lines1.pop(0)
1172                if line == nextline:
1173                    self._log("exact match:", repr(line))
1174                    break
1175                elif match_func(nextline, line):
1176                    self._log("%s:" % match_nickname, repr(line))
1177                    self._log("   with:", repr(nextline))
1178                    break
1179                else:
1180                    if not nomatchprinted:
1181                        self._log("nomatch:", repr(line))
1182                        nomatchprinted = True
1183                    self._log("    and:", repr(nextline))
1184                extralines.append(nextline)
1185            else:
1186                self._log("remains unmatched: %r" % (line,))
1187                pytest.fail(self._log_text)
1188