1"""test the IPython Kernel""" 2 3# Copyright (c) IPython Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6import ast 7import io 8import os.path 9import sys 10import time 11 12import nose.tools as nt 13from flaky import flaky 14import pytest 15from packaging import version 16 17from IPython.testing import decorators as dec, tools as tt 18import IPython 19from IPython.paths import locate_profile 20from ipython_genutils.tempdir import TemporaryDirectory 21 22from .utils import ( 23 new_kernel, kernel, TIMEOUT, assemble_output, execute, 24 flush_channels, wait_for_idle, get_reply, 25) 26 27 28def _check_master(kc, expected=True, stream="stdout"): 29 execute(kc=kc, code="import sys") 30 flush_channels(kc) 31 msg_id, content = execute(kc=kc, code="print (sys.%s._is_master_process())" % stream) 32 stdout, stderr = assemble_output(kc.iopub_channel) 33 assert stdout.strip() == repr(expected) 34 35 36def _check_status(content): 37 """If status=error, show the traceback""" 38 if content['status'] == 'error': 39 assert False, ''.join(['\n'] + content['traceback']) 40 41 42# printing tests 43 44def test_simple_print(): 45 """simple print statement in kernel""" 46 with kernel() as kc: 47 iopub = kc.iopub_channel 48 msg_id, content = execute(kc=kc, code="print ('hi')") 49 stdout, stderr = assemble_output(iopub) 50 assert stdout == 'hi\n' 51 assert stderr == '' 52 _check_master(kc, expected=True) 53 54 55def test_sys_path(): 56 """test that sys.path doesn't get messed up by default""" 57 with kernel() as kc: 58 msg_id, content = execute(kc=kc, code="import sys; print(repr(sys.path))") 59 stdout, stderr = assemble_output(kc.iopub_channel) 60 # for error-output on failure 61 sys.stderr.write(stderr) 62 63 sys_path = ast.literal_eval(stdout.strip()) 64 assert '' in sys_path 65 66 67def test_sys_path_profile_dir(): 68 """test that sys.path doesn't get messed up when `--profile-dir` is specified""" 69 70 with new_kernel(['--profile-dir', locate_profile('default')]) as kc: 71 msg_id, content = execute(kc=kc, code="import sys; print(repr(sys.path))") 72 stdout, stderr = assemble_output(kc.iopub_channel) 73 # for error-output on failure 74 sys.stderr.write(stderr) 75 76 sys_path = ast.literal_eval(stdout.strip()) 77 assert '' in sys_path 78 79 80@flaky(max_runs=3) 81@dec.skipif( 82 sys.platform == 'win32' or (sys.platform == "darwin" and sys.version_info >=(3, 8)), 83 "subprocess prints fail on Windows and MacOS Python 3.8+" 84) 85def test_subprocess_print(): 86 """printing from forked mp.Process""" 87 with new_kernel() as kc: 88 iopub = kc.iopub_channel 89 90 _check_master(kc, expected=True) 91 flush_channels(kc) 92 np = 5 93 code = '\n'.join([ 94 "import time", 95 "import multiprocessing as mp", 96 "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np, 97 "for p in pool: p.start()", 98 "for p in pool: p.join()", 99 "time.sleep(0.5)," 100 ]) 101 102 msg_id, content = execute(kc=kc, code=code) 103 stdout, stderr = assemble_output(iopub) 104 nt.assert_equal(stdout.count("hello"), np, stdout) 105 for n in range(np): 106 nt.assert_equal(stdout.count(str(n)), 1, stdout) 107 assert stderr == '' 108 _check_master(kc, expected=True) 109 _check_master(kc, expected=True, stream="stderr") 110 111 112@flaky(max_runs=3) 113def test_subprocess_noprint(): 114 """mp.Process without print doesn't trigger iostream mp_mode""" 115 with kernel() as kc: 116 iopub = kc.iopub_channel 117 118 np = 5 119 code = '\n'.join([ 120 "import multiprocessing as mp", 121 "pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np, 122 "for p in pool: p.start()", 123 "for p in pool: p.join()" 124 ]) 125 126 msg_id, content = execute(kc=kc, code=code) 127 stdout, stderr = assemble_output(iopub) 128 assert stdout == '' 129 assert stderr == '' 130 131 _check_master(kc, expected=True) 132 _check_master(kc, expected=True, stream="stderr") 133 134 135@flaky(max_runs=3) 136@dec.skipif( 137 sys.platform == 'win32' or (sys.platform == "darwin" and sys.version_info >=(3, 8)), 138 "subprocess prints fail on Windows and MacOS Python 3.8+" 139) 140def test_subprocess_error(): 141 """error in mp.Process doesn't crash""" 142 with new_kernel() as kc: 143 iopub = kc.iopub_channel 144 145 code = '\n'.join([ 146 "import multiprocessing as mp", 147 "p = mp.Process(target=int, args=('hi',))", 148 "p.start()", 149 "p.join()", 150 ]) 151 152 msg_id, content = execute(kc=kc, code=code) 153 stdout, stderr = assemble_output(iopub) 154 assert stdout == '' 155 assert "ValueError" in stderr 156 157 _check_master(kc, expected=True) 158 _check_master(kc, expected=True, stream="stderr") 159 160# raw_input tests 161 162def test_raw_input(): 163 """test input""" 164 with kernel() as kc: 165 iopub = kc.iopub_channel 166 167 input_f = "input" 168 theprompt = "prompt> " 169 code = 'print({input_f}("{theprompt}"))'.format(**locals()) 170 msg_id = kc.execute(code, allow_stdin=True) 171 msg = kc.get_stdin_msg(block=True, timeout=TIMEOUT) 172 assert msg['header']['msg_type'] == 'input_request' 173 content = msg['content'] 174 assert content['prompt'] == theprompt 175 text = "some text" 176 kc.input(text) 177 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 178 assert reply['content']['status'] == 'ok' 179 stdout, stderr = assemble_output(iopub) 180 assert stdout == text + "\n" 181 182 183def test_save_history(): 184 # Saving history from the kernel with %hist -f was failing because of 185 # unicode problems on Python 2. 186 with kernel() as kc, TemporaryDirectory() as td: 187 file = os.path.join(td, 'hist.out') 188 execute('a=1', kc=kc) 189 wait_for_idle(kc) 190 execute('b="abcþ"', kc=kc) 191 wait_for_idle(kc) 192 _, reply = execute("%hist -f " + file, kc=kc) 193 assert reply['status'] == 'ok' 194 with io.open(file, encoding='utf-8') as f: 195 content = f.read() 196 assert 'a=1' in content 197 assert 'b="abcþ"' in content 198 199 200@dec.skip_without('faulthandler') 201def test_smoke_faulthandler(): 202 with kernel() as kc: 203 # Note: faulthandler.register is not available on windows. 204 code = '\n'.join([ 205 'import sys', 206 'import faulthandler', 207 'import signal', 208 'faulthandler.enable()', 209 'if not sys.platform.startswith("win32"):', 210 ' faulthandler.register(signal.SIGTERM)']) 211 _, reply = execute(code, kc=kc) 212 nt.assert_equal(reply['status'], 'ok', reply.get('traceback', '')) 213 214 215def test_help_output(): 216 """ipython kernel --help-all works""" 217 tt.help_all_output_test('kernel') 218 219 220def test_is_complete(): 221 with kernel() as kc: 222 # There are more test cases for this in core - here we just check 223 # that the kernel exposes the interface correctly. 224 kc.is_complete('2+2') 225 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 226 assert reply['content']['status'] == 'complete' 227 228 # SyntaxError 229 kc.is_complete('raise = 2') 230 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 231 assert reply['content']['status'] == 'invalid' 232 233 kc.is_complete('a = [1,\n2,') 234 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 235 assert reply['content']['status'] == 'incomplete' 236 assert reply['content']['indent'] == '' 237 238 # Cell magic ends on two blank lines for console UIs 239 kc.is_complete('%%timeit\na\n\n') 240 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 241 assert reply['content']['status'] == 'complete' 242 243 244@dec.skipif(sys.platform != 'win32', "only run on Windows") 245def test_complete(): 246 with kernel() as kc: 247 execute('a = 1', kc=kc) 248 wait_for_idle(kc) 249 cell = 'import IPython\nb = a.' 250 kc.complete(cell) 251 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 252 253 c = reply['content'] 254 assert c['status'] == 'ok' 255 start = cell.find('a.') 256 end = start + 2 257 assert c['cursor_end'] == cell.find('a.') + 2 258 assert c['cursor_start'] <= end 259 260 # there are many right answers for cursor_start, 261 # so verify application of the completion 262 # rather than the value of cursor_start 263 264 matches = c['matches'] 265 assert matches 266 for m in matches: 267 completed = cell[:c['cursor_start']] + m 268 assert completed.startswith(cell) 269 270 271@dec.skip_without('matplotlib') 272def test_matplotlib_inline_on_import(): 273 with kernel() as kc: 274 cell = '\n'.join([ 275 'import matplotlib, matplotlib.pyplot as plt', 276 'backend = matplotlib.get_backend()' 277 ]) 278 _, reply = execute(cell, 279 user_expressions={'backend': 'backend'}, 280 kc=kc) 281 _check_status(reply) 282 backend_bundle = reply['user_expressions']['backend'] 283 _check_status(backend_bundle) 284 assert 'backend_inline' in backend_bundle['data']['text/plain'] 285 286 287def test_message_order(): 288 N = 100 # number of messages to test 289 with kernel() as kc: 290 _, reply = execute("a = 1", kc=kc) 291 _check_status(reply) 292 offset = reply['execution_count'] + 1 293 cell = "a += 1\na" 294 msg_ids = [] 295 # submit N executions as fast as we can 296 for i in range(N): 297 msg_ids.append(kc.execute(cell)) 298 # check message-handling order 299 for i, msg_id in enumerate(msg_ids, offset): 300 reply = kc.get_shell_msg(timeout=TIMEOUT) 301 _check_status(reply['content']) 302 assert reply['content']['execution_count'] == i 303 assert reply['parent_header']['msg_id'] == msg_id 304 305 306@dec.skipif(sys.platform.startswith('linux') or sys.platform.startswith('darwin')) 307def test_unc_paths(): 308 with kernel() as kc, TemporaryDirectory() as td: 309 drive_file_path = os.path.join(td, 'unc.txt') 310 with open(drive_file_path, 'w+') as f: 311 f.write('# UNC test') 312 unc_root = '\\\\localhost\\C$' 313 file_path = os.path.splitdrive(os.path.dirname(drive_file_path))[1] 314 unc_file_path = os.path.join(unc_root, file_path[1:]) 315 316 iopub = kc.iopub_channel 317 318 kc.execute("cd {0:s}".format(unc_file_path)) 319 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 320 assert reply['content']['status'] == 'ok' 321 out, err = assemble_output(iopub) 322 assert unc_file_path in out 323 324 flush_channels(kc) 325 kc.execute(code="ls") 326 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 327 assert reply['content']['status'] == 'ok' 328 out, err = assemble_output(iopub) 329 assert 'unc.txt' in out 330 331 kc.execute(code="cd") 332 reply = kc.get_shell_msg(block=True, timeout=TIMEOUT) 333 assert reply['content']['status'] == 'ok' 334 335 336def test_shutdown(): 337 """Kernel exits after polite shutdown_request""" 338 with new_kernel() as kc: 339 km = kc.parent 340 execute('a = 1', kc=kc) 341 wait_for_idle(kc) 342 kc.shutdown() 343 for i in range(300): # 30s timeout 344 if km.is_alive(): 345 time.sleep(.1) 346 else: 347 break 348 assert not km.is_alive() 349 350 351def test_interrupt_during_input(): 352 """ 353 The kernel exits after being interrupted while waiting in input(). 354 355 input() appears to have issues other functions don't, and it needs to be 356 interruptible in order for pdb to be interruptible. 357 """ 358 with new_kernel() as kc: 359 km = kc.parent 360 msg_id = kc.execute("input()") 361 time.sleep(1) # Make sure it's actually waiting for input. 362 km.interrupt_kernel() 363 from .test_message_spec import validate_message 364 # If we failed to interrupt interrupt, this will timeout: 365 reply = get_reply(kc, msg_id, TIMEOUT) 366 validate_message(reply, 'execute_reply', msg_id) 367 368 369@pytest.mark.skipif( 370 version.parse(IPython.__version__) < version.parse("7.14.0"), 371 reason="Need new IPython" 372) 373def test_interrupt_during_pdb_set_trace(): 374 """ 375 The kernel exits after being interrupted while waiting in pdb.set_trace(). 376 377 Merely testing input() isn't enough, pdb has its own issues that need 378 to be handled in addition. 379 380 This test will fail with versions of IPython < 7.14.0. 381 """ 382 with new_kernel() as kc: 383 km = kc.parent 384 msg_id = kc.execute("import pdb; pdb.set_trace()") 385 msg_id2 = kc.execute("3 + 4") 386 time.sleep(1) # Make sure it's actually waiting for input. 387 km.interrupt_kernel() 388 from .test_message_spec import validate_message 389 # If we failed to interrupt interrupt, this will timeout: 390 reply = get_reply(kc, msg_id, TIMEOUT) 391 validate_message(reply, 'execute_reply', msg_id) 392 # If we failed to interrupt interrupt, this will timeout: 393 reply = get_reply(kc, msg_id2, TIMEOUT) 394 validate_message(reply, 'execute_reply', msg_id2) 395