1"""test the IPython Kernel"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import ast
7import io
8import os.path
9import sys
10import time
11
12import nose.tools as nt
13from flaky import flaky
14import pytest
15from packaging import version
16
17from IPython.testing import decorators as dec, tools as tt
18import IPython
19from IPython.paths import locate_profile
20from ipython_genutils.tempdir import TemporaryDirectory
21
22from .utils import (
23    new_kernel, kernel, TIMEOUT, assemble_output, execute,
24    flush_channels, wait_for_idle, get_reply,
25)
26
27
28def _check_master(kc, expected=True, stream="stdout"):
29    execute(kc=kc, code="import sys")
30    flush_channels(kc)
31    msg_id, content = execute(kc=kc, code="print (sys.%s._is_master_process())" % stream)
32    stdout, stderr = assemble_output(kc.iopub_channel)
33    assert stdout.strip() == repr(expected)
34
35
36def _check_status(content):
37    """If status=error, show the traceback"""
38    if content['status'] == 'error':
39        assert False, ''.join(['\n'] + content['traceback'])
40
41
42# printing tests
43
44def test_simple_print():
45    """simple print statement in kernel"""
46    with kernel() as kc:
47        iopub = kc.iopub_channel
48        msg_id, content = execute(kc=kc, code="print ('hi')")
49        stdout, stderr = assemble_output(iopub)
50        assert stdout == 'hi\n'
51        assert stderr == ''
52        _check_master(kc, expected=True)
53
54
55def test_sys_path():
56    """test that sys.path doesn't get messed up by default"""
57    with kernel() as kc:
58        msg_id, content = execute(kc=kc, code="import sys; print(repr(sys.path))")
59        stdout, stderr = assemble_output(kc.iopub_channel)
60    # for error-output on failure
61    sys.stderr.write(stderr)
62
63    sys_path = ast.literal_eval(stdout.strip())
64    assert '' in sys_path
65
66
67def test_sys_path_profile_dir():
68    """test that sys.path doesn't get messed up when `--profile-dir` is specified"""
69
70    with new_kernel(['--profile-dir', locate_profile('default')]) as kc:
71        msg_id, content = execute(kc=kc, code="import sys; print(repr(sys.path))")
72        stdout, stderr = assemble_output(kc.iopub_channel)
73    # for error-output on failure
74    sys.stderr.write(stderr)
75
76    sys_path = ast.literal_eval(stdout.strip())
77    assert '' in sys_path
78
79
80@flaky(max_runs=3)
81@dec.skipif(
82    sys.platform == 'win32' or (sys.platform == "darwin" and sys.version_info >=(3, 8)),
83    "subprocess prints fail on Windows and MacOS Python 3.8+"
84)
85def test_subprocess_print():
86    """printing from forked mp.Process"""
87    with new_kernel() as kc:
88        iopub = kc.iopub_channel
89
90        _check_master(kc, expected=True)
91        flush_channels(kc)
92        np = 5
93        code = '\n'.join([
94            "import time",
95            "import multiprocessing as mp",
96            "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
97            "for p in pool: p.start()",
98            "for p in pool: p.join()",
99            "time.sleep(0.5),"
100        ])
101
102        msg_id, content = execute(kc=kc, code=code)
103        stdout, stderr = assemble_output(iopub)
104        nt.assert_equal(stdout.count("hello"), np, stdout)
105        for n in range(np):
106            nt.assert_equal(stdout.count(str(n)), 1, stdout)
107        assert stderr == ''
108        _check_master(kc, expected=True)
109        _check_master(kc, expected=True, stream="stderr")
110
111
112@flaky(max_runs=3)
113def test_subprocess_noprint():
114    """mp.Process without print doesn't trigger iostream mp_mode"""
115    with kernel() as kc:
116        iopub = kc.iopub_channel
117
118        np = 5
119        code = '\n'.join([
120            "import multiprocessing as mp",
121            "pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np,
122            "for p in pool: p.start()",
123            "for p in pool: p.join()"
124        ])
125
126        msg_id, content = execute(kc=kc, code=code)
127        stdout, stderr = assemble_output(iopub)
128        assert stdout == ''
129        assert stderr == ''
130
131        _check_master(kc, expected=True)
132        _check_master(kc, expected=True, stream="stderr")
133
134
135@flaky(max_runs=3)
136@dec.skipif(
137    sys.platform == 'win32' or (sys.platform == "darwin" and sys.version_info >=(3, 8)),
138    "subprocess prints fail on Windows and MacOS Python 3.8+"
139)
140def test_subprocess_error():
141    """error in mp.Process doesn't crash"""
142    with new_kernel() as kc:
143        iopub = kc.iopub_channel
144
145        code = '\n'.join([
146            "import multiprocessing as mp",
147            "p = mp.Process(target=int, args=('hi',))",
148            "p.start()",
149            "p.join()",
150        ])
151
152        msg_id, content = execute(kc=kc, code=code)
153        stdout, stderr = assemble_output(iopub)
154        assert stdout == ''
155        assert "ValueError" in stderr
156
157        _check_master(kc, expected=True)
158        _check_master(kc, expected=True, stream="stderr")
159
160# raw_input tests
161
162def test_raw_input():
163    """test input"""
164    with kernel() as kc:
165        iopub = kc.iopub_channel
166
167        input_f = "input"
168        theprompt = "prompt> "
169        code = 'print({input_f}("{theprompt}"))'.format(**locals())
170        msg_id = kc.execute(code, allow_stdin=True)
171        msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT)
172        assert msg['header']['msg_type'] == 'input_request'
173        content = msg['content']
174        assert content['prompt'] == theprompt
175        text = "some text"
176        kc.input(text)
177        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
178        assert reply['content']['status'] == 'ok'
179        stdout, stderr = assemble_output(iopub)
180        assert stdout == text + "\n"
181
182
183def test_save_history():
184    # Saving history from the kernel with %hist -f was failing because of
185    # unicode problems on Python 2.
186    with kernel() as kc, TemporaryDirectory() as td:
187        file = os.path.join(td, 'hist.out')
188        execute('a=1', kc=kc)
189        wait_for_idle(kc)
190        execute('b="abcþ"', kc=kc)
191        wait_for_idle(kc)
192        _, reply = execute("%hist -f " + file, kc=kc)
193        assert reply['status'] == 'ok'
194        with io.open(file, encoding='utf-8') as f:
195            content = f.read()
196        assert 'a=1' in content
197        assert 'b="abcþ"' in content
198
199
200@dec.skip_without('faulthandler')
201def test_smoke_faulthandler():
202    with kernel() as kc:
203        # Note: faulthandler.register is not available on windows.
204        code = '\n'.join([
205            'import sys',
206            'import faulthandler',
207            'import signal',
208            'faulthandler.enable()',
209            'if not sys.platform.startswith("win32"):',
210            '    faulthandler.register(signal.SIGTERM)'])
211        _, reply = execute(code, kc=kc)
212        nt.assert_equal(reply['status'], 'ok', reply.get('traceback', ''))
213
214
215def test_help_output():
216    """ipython kernel --help-all works"""
217    tt.help_all_output_test('kernel')
218
219
220def test_is_complete():
221    with kernel() as kc:
222        # There are more test cases for this in core - here we just check
223        # that the kernel exposes the interface correctly.
224        kc.is_complete('2+2')
225        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
226        assert reply['content']['status'] == 'complete'
227
228        # SyntaxError
229        kc.is_complete('raise = 2')
230        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
231        assert reply['content']['status'] == 'invalid'
232
233        kc.is_complete('a = [1,\n2,')
234        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
235        assert reply['content']['status'] == 'incomplete'
236        assert reply['content']['indent'] == ''
237
238        # Cell magic ends on two blank lines for console UIs
239        kc.is_complete('%%timeit\na\n\n')
240        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
241        assert reply['content']['status'] == 'complete'
242
243
244@dec.skipif(sys.platform != 'win32', "only run on Windows")
245def test_complete():
246    with kernel() as kc:
247        execute('a = 1', kc=kc)
248        wait_for_idle(kc)
249        cell = 'import IPython\nb = a.'
250        kc.complete(cell)
251        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
252
253    c = reply['content']
254    assert c['status'] == 'ok'
255    start = cell.find('a.')
256    end = start + 2
257    assert c['cursor_end'] == cell.find('a.') + 2
258    assert c['cursor_start'] <= end
259
260    # there are many right answers for cursor_start,
261    # so verify application of the completion
262    # rather than the value of cursor_start
263
264    matches = c['matches']
265    assert matches
266    for m in matches:
267        completed = cell[:c['cursor_start']] + m
268        assert completed.startswith(cell)
269
270
271@dec.skip_without('matplotlib')
272def test_matplotlib_inline_on_import():
273    with kernel() as kc:
274        cell = '\n'.join([
275            'import matplotlib, matplotlib.pyplot as plt',
276            'backend = matplotlib.get_backend()'
277        ])
278        _, reply = execute(cell,
279            user_expressions={'backend': 'backend'},
280            kc=kc)
281        _check_status(reply)
282        backend_bundle = reply['user_expressions']['backend']
283        _check_status(backend_bundle)
284        assert 'backend_inline' in backend_bundle['data']['text/plain']
285
286
287def test_message_order():
288    N = 100  # number of messages to test
289    with kernel() as kc:
290        _, reply = execute("a = 1", kc=kc)
291        _check_status(reply)
292        offset = reply['execution_count'] + 1
293        cell = "a += 1\na"
294        msg_ids = []
295        # submit N executions as fast as we can
296        for i in range(N):
297            msg_ids.append(kc.execute(cell))
298        # check message-handling order
299        for i, msg_id in enumerate(msg_ids, offset):
300            reply = kc.get_shell_msg(timeout=TIMEOUT)
301            _check_status(reply['content'])
302            assert reply['content']['execution_count'] == i
303            assert reply['parent_header']['msg_id'] == msg_id
304
305
306@dec.skipif(sys.platform.startswith('linux') or sys.platform.startswith('darwin'))
307def test_unc_paths():
308    with kernel() as kc, TemporaryDirectory() as td:
309        drive_file_path = os.path.join(td, 'unc.txt')
310        with open(drive_file_path, 'w+') as f:
311            f.write('# UNC test')
312        unc_root = '\\\\localhost\\C$'
313        file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1]
314        unc_file_path = os.path.join(unc_root, file_path[1:])
315
316        iopub = kc.iopub_channel
317
318        kc.execute("cd {0:s}".format(unc_file_path))
319        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
320        assert reply['content']['status'] == 'ok'
321        out, err = assemble_output(iopub)
322        assert unc_file_path in out
323
324        flush_channels(kc)
325        kc.execute(code="ls")
326        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
327        assert reply['content']['status'] == 'ok'
328        out, err = assemble_output(iopub)
329        assert 'unc.txt' in out
330
331        kc.execute(code="cd")
332        reply = kc.get_shell_msg(block=True, timeout=TIMEOUT)
333        assert reply['content']['status'] == 'ok'
334
335
336def test_shutdown():
337    """Kernel exits after polite shutdown_request"""
338    with new_kernel() as kc:
339        km = kc.parent
340        execute('a = 1', kc=kc)
341        wait_for_idle(kc)
342        kc.shutdown()
343        for i in range(300): # 30s timeout
344            if km.is_alive():
345                time.sleep(.1)
346            else:
347                break
348        assert not km.is_alive()
349
350
351def test_interrupt_during_input():
352    """
353    The kernel exits after being interrupted while waiting in input().
354
355    input() appears to have issues other functions don't, and it needs to be
356    interruptible in order for pdb to be interruptible.
357    """
358    with new_kernel() as kc:
359        km = kc.parent
360        msg_id = kc.execute("input()")
361        time.sleep(1)  # Make sure it's actually waiting for input.
362        km.interrupt_kernel()
363        from .test_message_spec import validate_message
364        # If we failed to interrupt interrupt, this will timeout:
365        reply = get_reply(kc, msg_id, TIMEOUT)
366        validate_message(reply, 'execute_reply', msg_id)
367
368
369@pytest.mark.skipif(
370    version.parse(IPython.__version__) < version.parse("7.14.0"),
371    reason="Need new IPython"
372)
373def test_interrupt_during_pdb_set_trace():
374    """
375    The kernel exits after being interrupted while waiting in pdb.set_trace().
376
377    Merely testing input() isn't enough, pdb has its own issues that need
378    to be handled in addition.
379
380    This test will fail with versions of IPython < 7.14.0.
381    """
382    with new_kernel() as kc:
383        km = kc.parent
384        msg_id = kc.execute("import pdb; pdb.set_trace()")
385        msg_id2 = kc.execute("3 + 4")
386        time.sleep(1)  # Make sure it's actually waiting for input.
387        km.interrupt_kernel()
388        from .test_message_spec import validate_message
389        # If we failed to interrupt interrupt, this will timeout:
390        reply = get_reply(kc, msg_id, TIMEOUT)
391        validate_message(reply, 'execute_reply', msg_id)
392        # If we failed to interrupt interrupt, this will timeout:
393        reply = get_reply(kc, msg_id2, TIMEOUT)
394        validate_message(reply, 'execute_reply', msg_id2)
395