1"""Generic testing tools.
2
3Authors
4-------
5- Fernando Perez <Fernando.Perez@berkeley.edu>
6"""
7
8
9# Copyright (c) IPython Development Team.
10# Distributed under the terms of the Modified BSD License.
11
12import os
13import re
14import sys
15import tempfile
16import unittest
17
18from contextlib import contextmanager
19from io import StringIO
20from subprocess import Popen, PIPE
21from unittest.mock import patch
22
23try:
24    # These tools are used by parts of the runtime, so we make the nose
25    # dependency optional at this point.  Nose is a hard dependency to run the
26    # test suite, but NOT to use ipython itself.
27    import nose.tools as nt
28    has_nose = True
29except ImportError:
30    has_nose = False
31
32from traitlets.config.loader import Config
33from IPython.utils.process import get_output_error_code
34from IPython.utils.text import list_strings
35from IPython.utils.io import temp_pyfile, Tee
36from IPython.utils import py3compat
37
38from . import decorators as dec
39from . import skipdoctest
40
41
42# The docstring for full_path doctests differently on win32 (different path
43# separator) so just skip the doctest there.  The example remains informative.
44doctest_deco = skipdoctest.skip_doctest if sys.platform == 'win32' else dec.null_deco
45
46@doctest_deco
47def full_path(startPath,files):
48    """Make full paths for all the listed files, based on startPath.
49
50    Only the base part of startPath is kept, since this routine is typically
51    used with a script's ``__file__`` variable as startPath. The base of startPath
52    is then prepended to all the listed files, forming the output list.
53
54    Parameters
55    ----------
56    startPath : string
57      Initial path to use as the base for the results.  This path is split
58      using os.path.split() and only its first component is kept.
59
60    files : string or list
61      One or more files.
62
63    Examples
64    --------
65
66    >>> full_path('/foo/bar.py',['a.txt','b.txt'])
67    ['/foo/a.txt', '/foo/b.txt']
68
69    >>> full_path('/foo',['a.txt','b.txt'])
70    ['/a.txt', '/b.txt']
71
72    If a single file is given, the output is still a list::
73
74        >>> full_path('/foo','a.txt')
75        ['/a.txt']
76    """
77
78    files = list_strings(files)
79    base = os.path.split(startPath)[0]
80    return [ os.path.join(base,f) for f in files ]
81
82
83def parse_test_output(txt):
84    """Parse the output of a test run and return errors, failures.
85
86    Parameters
87    ----------
88    txt : str
89      Text output of a test run, assumed to contain a line of one of the
90      following forms::
91
92        'FAILED (errors=1)'
93        'FAILED (failures=1)'
94        'FAILED (errors=1, failures=1)'
95
96    Returns
97    -------
98    nerr, nfail
99      number of errors and failures.
100    """
101
102    err_m = re.search(r'^FAILED \(errors=(\d+)\)', txt, re.MULTILINE)
103    if err_m:
104        nerr = int(err_m.group(1))
105        nfail = 0
106        return  nerr, nfail
107
108    fail_m = re.search(r'^FAILED \(failures=(\d+)\)', txt, re.MULTILINE)
109    if fail_m:
110        nerr = 0
111        nfail = int(fail_m.group(1))
112        return  nerr, nfail
113
114    both_m = re.search(r'^FAILED \(errors=(\d+), failures=(\d+)\)', txt,
115                       re.MULTILINE)
116    if both_m:
117        nerr = int(both_m.group(1))
118        nfail = int(both_m.group(2))
119        return  nerr, nfail
120
121    # If the input didn't match any of these forms, assume no error/failures
122    return 0, 0
123
124
125# So nose doesn't think this is a test
126parse_test_output.__test__ = False
127
128
129def default_argv():
130    """Return a valid default argv for creating testing instances of ipython"""
131
132    return ['--quick', # so no config file is loaded
133            # Other defaults to minimize side effects on stdout
134            '--colors=NoColor', '--no-term-title','--no-banner',
135            '--autocall=0']
136
137
138def default_config():
139    """Return a config object with good defaults for testing."""
140    config = Config()
141    config.TerminalInteractiveShell.colors = 'NoColor'
142    config.TerminalTerminalInteractiveShell.term_title = False,
143    config.TerminalInteractiveShell.autocall = 0
144    f = tempfile.NamedTemporaryFile(suffix=u'test_hist.sqlite', delete=False)
145    config.HistoryManager.hist_file = f.name
146    f.close()
147    config.HistoryManager.db_cache_size = 10000
148    return config
149
150
151def get_ipython_cmd(as_string=False):
152    """
153    Return appropriate IPython command line name. By default, this will return
154    a list that can be used with subprocess.Popen, for example, but passing
155    `as_string=True` allows for returning the IPython command as a string.
156
157    Parameters
158    ----------
159    as_string: bool
160        Flag to allow to return the command as a string.
161    """
162    ipython_cmd = [sys.executable, "-m", "IPython"]
163
164    if as_string:
165        ipython_cmd = " ".join(ipython_cmd)
166
167    return ipython_cmd
168
169def ipexec(fname, options=None, commands=()):
170    """Utility to call 'ipython filename'.
171
172    Starts IPython with a minimal and safe configuration to make startup as fast
173    as possible.
174
175    Note that this starts IPython in a subprocess!
176
177    Parameters
178    ----------
179    fname : str
180      Name of file to be executed (should have .py or .ipy extension).
181
182    options : optional, list
183      Extra command-line flags to be passed to IPython.
184
185    commands : optional, list
186      Commands to send in on stdin
187
188    Returns
189    -------
190    ``(stdout, stderr)`` of ipython subprocess.
191    """
192    if options is None: options = []
193
194    cmdargs = default_argv() + options
195
196    test_dir = os.path.dirname(__file__)
197
198    ipython_cmd = get_ipython_cmd()
199    # Absolute path for filename
200    full_fname = os.path.join(test_dir, fname)
201    full_cmd = ipython_cmd + cmdargs + ['--', full_fname]
202    env = os.environ.copy()
203    # FIXME: ignore all warnings in ipexec while we have shims
204    # should we keep suppressing warnings here, even after removing shims?
205    env['PYTHONWARNINGS'] = 'ignore'
206    # env.pop('PYTHONWARNINGS', None)  # Avoid extraneous warnings appearing on stderr
207    for k, v in env.items():
208        # Debug a bizarre failure we've seen on Windows:
209        # TypeError: environment can only contain strings
210        if not isinstance(v, str):
211            print(k, v)
212    p = Popen(full_cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, env=env)
213    out, err = p.communicate(input=py3compat.encode('\n'.join(commands)) or None)
214    out, err = py3compat.decode(out), py3compat.decode(err)
215    # `import readline` causes 'ESC[?1034h' to be output sometimes,
216    # so strip that out before doing comparisons
217    if out:
218        out = re.sub(r'\x1b\[[^h]+h', '', out)
219    return out, err
220
221
222def ipexec_validate(fname, expected_out, expected_err='',
223                    options=None, commands=()):
224    """Utility to call 'ipython filename' and validate output/error.
225
226    This function raises an AssertionError if the validation fails.
227
228    Note that this starts IPython in a subprocess!
229
230    Parameters
231    ----------
232    fname : str
233      Name of the file to be executed (should have .py or .ipy extension).
234
235    expected_out : str
236      Expected stdout of the process.
237
238    expected_err : optional, str
239      Expected stderr of the process.
240
241    options : optional, list
242      Extra command-line flags to be passed to IPython.
243
244    Returns
245    -------
246    None
247    """
248
249    import nose.tools as nt
250
251    out, err = ipexec(fname, options, commands)
252    #print 'OUT', out  # dbg
253    #print 'ERR', err  # dbg
254    # If there are any errors, we must check those before stdout, as they may be
255    # more informative than simply having an empty stdout.
256    if err:
257        if expected_err:
258            nt.assert_equal("\n".join(err.strip().splitlines()), "\n".join(expected_err.strip().splitlines()))
259        else:
260            raise ValueError('Running file %r produced error: %r' %
261                             (fname, err))
262    # If no errors or output on stderr was expected, match stdout
263    nt.assert_equal("\n".join(out.strip().splitlines()), "\n".join(expected_out.strip().splitlines()))
264
265
266class TempFileMixin(unittest.TestCase):
267    """Utility class to create temporary Python/IPython files.
268
269    Meant as a mixin class for test cases."""
270
271    def mktmp(self, src, ext='.py'):
272        """Make a valid python temp file."""
273        fname = temp_pyfile(src, ext)
274        if not hasattr(self, 'tmps'):
275            self.tmps=[]
276        self.tmps.append(fname)
277        self.fname = fname
278
279    def tearDown(self):
280        # If the tmpfile wasn't made because of skipped tests, like in
281        # win32, there's nothing to cleanup.
282        if hasattr(self, 'tmps'):
283            for fname in self.tmps:
284                # If the tmpfile wasn't made because of skipped tests, like in
285                # win32, there's nothing to cleanup.
286                try:
287                    os.unlink(fname)
288                except:
289                    # On Windows, even though we close the file, we still can't
290                    # delete it.  I have no clue why
291                    pass
292
293    def __enter__(self):
294        return self
295
296    def __exit__(self, exc_type, exc_value, traceback):
297        self.tearDown()
298
299
300pair_fail_msg = ("Testing {0}\n\n"
301                "In:\n"
302                "  {1!r}\n"
303                "Expected:\n"
304                "  {2!r}\n"
305                "Got:\n"
306                "  {3!r}\n")
307def check_pairs(func, pairs):
308    """Utility function for the common case of checking a function with a
309    sequence of input/output pairs.
310
311    Parameters
312    ----------
313    func : callable
314      The function to be tested. Should accept a single argument.
315    pairs : iterable
316      A list of (input, expected_output) tuples.
317
318    Returns
319    -------
320    None. Raises an AssertionError if any output does not match the expected
321    value.
322    """
323    name = getattr(func, "func_name", getattr(func, "__name__", "<unknown>"))
324    for inp, expected in pairs:
325        out = func(inp)
326        assert out == expected, pair_fail_msg.format(name, inp, expected, out)
327
328
329MyStringIO = StringIO
330
331_re_type = type(re.compile(r''))
332
333notprinted_msg = """Did not find {0!r} in printed output (on {1}):
334-------
335{2!s}
336-------
337"""
338
339class AssertPrints(object):
340    """Context manager for testing that code prints certain text.
341
342    Examples
343    --------
344    >>> with AssertPrints("abc", suppress=False):
345    ...     print("abcd")
346    ...     print("def")
347    ...
348    abcd
349    def
350    """
351    def __init__(self, s, channel='stdout', suppress=True):
352        self.s = s
353        if isinstance(self.s, (str, _re_type)):
354            self.s = [self.s]
355        self.channel = channel
356        self.suppress = suppress
357
358    def __enter__(self):
359        self.orig_stream = getattr(sys, self.channel)
360        self.buffer = MyStringIO()
361        self.tee = Tee(self.buffer, channel=self.channel)
362        setattr(sys, self.channel, self.buffer if self.suppress else self.tee)
363
364    def __exit__(self, etype, value, traceback):
365        try:
366            if value is not None:
367                # If an error was raised, don't check anything else
368                return False
369            self.tee.flush()
370            setattr(sys, self.channel, self.orig_stream)
371            printed = self.buffer.getvalue()
372            for s in self.s:
373                if isinstance(s, _re_type):
374                    assert s.search(printed), notprinted_msg.format(s.pattern, self.channel, printed)
375                else:
376                    assert s in printed, notprinted_msg.format(s, self.channel, printed)
377            return False
378        finally:
379            self.tee.close()
380
381printed_msg = """Found {0!r} in printed output (on {1}):
382-------
383{2!s}
384-------
385"""
386
387class AssertNotPrints(AssertPrints):
388    """Context manager for checking that certain output *isn't* produced.
389
390    Counterpart of AssertPrints"""
391    def __exit__(self, etype, value, traceback):
392        try:
393            if value is not None:
394                # If an error was raised, don't check anything else
395                self.tee.close()
396                return False
397            self.tee.flush()
398            setattr(sys, self.channel, self.orig_stream)
399            printed = self.buffer.getvalue()
400            for s in self.s:
401                if isinstance(s, _re_type):
402                    assert not s.search(printed),printed_msg.format(
403                        s.pattern, self.channel, printed)
404                else:
405                    assert s not in printed, printed_msg.format(
406                        s, self.channel, printed)
407            return False
408        finally:
409            self.tee.close()
410
411@contextmanager
412def mute_warn():
413    from IPython.utils import warn
414    save_warn = warn.warn
415    warn.warn = lambda *a, **kw: None
416    try:
417        yield
418    finally:
419        warn.warn = save_warn
420
421@contextmanager
422def make_tempfile(name):
423    """ Create an empty, named, temporary file for the duration of the context.
424    """
425    open(name, 'w').close()
426    try:
427        yield
428    finally:
429        os.unlink(name)
430
431def fake_input(inputs):
432    """Temporarily replace the input() function to return the given values
433
434    Use as a context manager:
435
436    with fake_input(['result1', 'result2']):
437        ...
438
439    Values are returned in order. If input() is called again after the last value
440    was used, EOFError is raised.
441    """
442    it = iter(inputs)
443    def mock_input(prompt=''):
444        try:
445            return next(it)
446        except StopIteration:
447            raise EOFError('No more inputs given')
448
449    return patch('builtins.input', mock_input)
450
451def help_output_test(subcommand=''):
452    """test that `ipython [subcommand] -h` works"""
453    cmd = get_ipython_cmd() + [subcommand, '-h']
454    out, err, rc = get_output_error_code(cmd)
455    nt.assert_equal(rc, 0, err)
456    nt.assert_not_in("Traceback", err)
457    nt.assert_in("Options", out)
458    nt.assert_in("--help-all", out)
459    return out, err
460
461
462def help_all_output_test(subcommand=''):
463    """test that `ipython [subcommand] --help-all` works"""
464    cmd = get_ipython_cmd() + [subcommand, '--help-all']
465    out, err, rc = get_output_error_code(cmd)
466    nt.assert_equal(rc, 0, err)
467    nt.assert_not_in("Traceback", err)
468    nt.assert_in("Options", out)
469    nt.assert_in("Class", out)
470    return out, err
471
472