1# (C) Copyright 2007-2019 Enthought, Inc., Austin, TX 2# All rights reserved. 3# 4# This software is provided without warranty under the terms of the BSD 5# license included in LICENSE.txt and may be redistributed only 6# under the conditions described in the aforementioned license. The license 7# is also available online at http://www.enthought.com/licenses/BSD.txt 8# Thanks for using Enthought open source! 9 10from __future__ import absolute_import, print_function, unicode_literals 11 12import atexit 13import gc 14try: 15 # Python 3: mock part of standard library. 16 from unittest import mock 17except ImportError: 18 # Python 2: use 3rd-party mock 19 import mock 20import os 21import shutil 22import sys 23import tempfile 24import threading 25import unittest 26import warnings 27 28try: 29 import ipykernel # noqa: F401 30except ImportError: 31 ipykernel_available = False 32else: 33 ipykernel_available = True 34 35if ipykernel_available: 36 import ipykernel.iostream 37 import ipykernel.ipkernel 38 import ipykernel.kernelapp 39 import ipykernel.zmqshell 40 import IPython.utils.io 41 import tornado.ioloop 42 import zmq 43 44 from envisage.plugins.ipython_kernel.internal_ipkernel import ( 45 InternalIPKernel) 46 47 48@unittest.skipUnless(ipykernel_available, 49 "skipping tests that require the ipykernel package") 50class TestInternalIPKernel(unittest.TestCase): 51 def setUp(self): 52 # Make sure that IPython-related files are written to a temporary 53 # directory instead of the home directory. 54 tmpdir = tempfile.mkdtemp() 55 self.addCleanup(shutil.rmtree, tmpdir) 56 57 self._old_ipythondir = os.environ.get("IPYTHONDIR") 58 os.environ["IPYTHONDIR"] = tmpdir 59 60 def tearDown(self): 61 # Restore previous state of the IPYTHONDIR environment variable. 62 old_ipythondir = self._old_ipythondir 63 if old_ipythondir is None: 64 del os.environ["IPYTHONDIR"] 65 else: 66 os.environ["IPYTHONDIR"] = old_ipythondir 67 68 def test_lifecycle(self): 69 kernel = InternalIPKernel() 70 self.assertIsNone(kernel.ipkernel) 71 72 kernel.init_ipkernel(gui_backend=None) 73 self.assertIsNotNone(kernel.ipkernel) 74 self.assertIsInstance(kernel.ipkernel, ipykernel.kernelapp.IPKernelApp) 75 76 kernel.new_qt_console() 77 kernel.new_qt_console() 78 self.assertEqual(len(kernel.consoles), 2) 79 80 kernel.shutdown() 81 self.assertIsNone(kernel.ipkernel) 82 self.assertEqual(len(kernel.consoles), 0) 83 84 def test_initial_namespace(self): 85 kernel = InternalIPKernel(initial_namespace=[('x', 42.1)]) 86 kernel.init_ipkernel(gui_backend=None) 87 self.assertIn('x', kernel.namespace) 88 self.assertEqual(kernel.namespace['x'], 42.1) 89 kernel.shutdown() 90 91 def test_shutdown_restores_output_streams(self): 92 original_stdin = sys.stdin 93 original_stdout = sys.stdout 94 original_stderr = sys.stderr 95 96 self.create_and_destroy_kernel() 97 98 self.assertIs(sys.stdin, original_stdin) 99 self.assertIs(sys.stdout, original_stdout) 100 self.assertIs(sys.stderr, original_stderr) 101 102 def test_shutdown_restores_sys_modules_main(self): 103 original_sys_modules_main = sys.modules["__main__"] 104 self.create_and_destroy_kernel() 105 self.assertIs(sys.modules["__main__"], original_sys_modules_main) 106 107 def test_shutdown_restores_displayhook_and_excepthook(self): 108 original_displayhook = sys.displayhook 109 original_excepthook = sys.excepthook 110 111 self.create_and_destroy_kernel() 112 113 self.assertIs(sys.displayhook, original_displayhook) 114 self.assertIs(sys.excepthook, original_excepthook) 115 116 def test_shutdown_restores_sys_path(self): 117 original_sys_path = sys.path[:] 118 119 self.create_and_destroy_kernel() 120 121 self.assertEqual(sys.path, original_sys_path) 122 123 def test_shutdown_closes_console_pipes(self): 124 kernel = InternalIPKernel() 125 kernel.init_ipkernel(gui_backend=None) 126 console = kernel.new_qt_console() 127 self.assertFalse(console.stdout.closed) 128 self.assertFalse(console.stderr.closed) 129 kernel.shutdown() 130 self.assertTrue(console.stdout.closed) 131 self.assertTrue(console.stderr.closed) 132 133 def test_ipython_util_io_globals_restored(self): 134 original_io_stdin = IPython.utils.io.stdin 135 original_io_stdout = IPython.utils.io.stdout 136 original_io_stderr = IPython.utils.io.stderr 137 138 self.create_and_destroy_kernel() 139 140 self.assertIs(IPython.utils.io.stdin, original_io_stdin) 141 self.assertIs(IPython.utils.io.stdout, original_io_stdout) 142 self.assertIs(IPython.utils.io.stderr, original_io_stderr) 143 144 def test_ipython_util_io_globals_restored_if_they_dont_exist(self): 145 # Regression test for enthought/envisage#218 146 original_io_stdin = IPython.utils.io.stdin 147 original_io_stdout = IPython.utils.io.stdout 148 original_io_stderr = IPython.utils.io.stderr 149 150 del IPython.utils.io.stdin 151 del IPython.utils.io.stdout 152 del IPython.utils.io.stderr 153 154 try: 155 self.create_and_destroy_kernel() 156 self.assertFalse(hasattr(IPython.utils.io, "stdin")) 157 self.assertFalse(hasattr(IPython.utils.io, "stdout")) 158 self.assertFalse(hasattr(IPython.utils.io, "stderr")) 159 finally: 160 IPython.utils.io.stdin = original_io_stdin 161 IPython.utils.io.stdout = original_io_stdout 162 IPython.utils.io.stderr = original_io_stderr 163 164 def test_io_pub_thread_stopped(self): 165 self.create_and_destroy_kernel() 166 io_pub_threads = self.objects_of_type(ipykernel.iostream.IOPubThread) 167 for thread in io_pub_threads: 168 self.assertFalse(thread.thread.is_alive()) 169 170 def test_no_threads_leaked(self): 171 threads_before = threading.active_count() 172 self.create_and_destroy_kernel() 173 threads_after = threading.active_count() 174 self.assertEqual(threads_before, threads_after) 175 176 def test_no_new_atexit_handlers(self): 177 # Caution: this is a rather fragile and indirect test. We want 178 # to know that all cleanup has happened when shutting down the 179 # kernel, with none of that cleanup deferred to atexit handlers. 180 # 181 # Since we have no direct way to get hold of the atexit handlers on 182 # Python 3, we instead use the number of referents from the 183 # atexit module as a proxy. 184 # 185 # If this test starts failing, try adding a warmup cycle. If the 186 # first call to self.create_and_destroy_kernel adds new referents, 187 # that's not a big deal. But if every call consistently adds new 188 # referents, then there's something to be fixed. 189 atexit_handlers_before = len(gc.get_referents(atexit)) 190 self.create_and_destroy_kernel() 191 atexit_handlers_after = len(gc.get_referents(atexit)) 192 193 self.assertEqual(atexit_handlers_before, atexit_handlers_after) 194 195 def test_zmq_sockets_closed(self): 196 # Previously, tests were leaking file descriptors linked to 197 # zmq.Socket objects. Check that all extant sockets are closed. 198 self.create_and_destroy_kernel() 199 sockets = self.objects_of_type(zmq.Socket) 200 self.assertTrue(all(socket.closed for socket in sockets)) 201 202 def test_ipykernel_live_objects(self): 203 # Check that all IPython-related objects have been cleaned up 204 # as expected. 205 206 self.create_and_destroy_kernel() 207 208 # Remove anything kept alive by cycles. (There are too many cycles 209 # to break them individually.) 210 gc.collect() 211 212 shells = self.objects_of_type(ipykernel.zmqshell.ZMQInteractiveShell) 213 self.assertEqual(shells, []) 214 215 kernels = self.objects_of_type(ipykernel.ipkernel.IPythonKernel) 216 self.assertEqual(kernels, []) 217 218 kernel_apps = self.objects_of_type(ipykernel.kernelapp.IPKernelApp) 219 self.assertEqual(kernel_apps, []) 220 221 def test_initialize_twice(self): 222 # Trying to re-initialize an already initialized IPKernelApp can 223 # happen right now as a result of refactoring, but eventually 224 # it should be an error. For now, it's a warning. 225 kernel = InternalIPKernel() 226 self.assertIsNone(kernel.ipkernel) 227 kernel.init_ipkernel(gui_backend=None) 228 try: 229 self.assertIsNotNone(kernel.ipkernel) 230 ipkernel = kernel.ipkernel 231 232 with warnings.catch_warnings(record=True) as warn_msgs: 233 warnings.simplefilter("always", category=DeprecationWarning) 234 kernel.init_ipkernel(gui_backend=None) 235 236 # Check that the existing kernel has not been replaced. 237 self.assertIs(ipkernel, kernel.ipkernel) 238 finally: 239 kernel.shutdown() 240 241 # Check that we got the expected warning message. 242 self.assertEqual(len(warn_msgs), 1) 243 message = str(warn_msgs[0].message) 244 self.assertIn("already been initialized", message) 245 246 def test_init_ipkernel_with_explicit_gui_backend(self): 247 loop = tornado.ioloop.IOLoop.current() 248 249 # Kernel initialization adds an "enter_eventloop" call to the 250 # ioloop event loop queue. Mock to avoid modifying the actual event 251 # loop. 252 with mock.patch.object(loop, "add_callback") as mock_add_callback: 253 with warnings.catch_warnings(record=True) as warn_msgs: 254 warnings.simplefilter("always", category=DeprecationWarning) 255 256 # Use of gui_backend is deprecated. 257 kernel = InternalIPKernel() 258 kernel.init_ipkernel(gui_backend="qt4") 259 kernel.shutdown() 260 261 mock_add_callback.reset_mock() 262 263 # Check that we got the expected warning message. 264 matching_messages = [ 265 msg for msg in warn_msgs 266 if "gui_backend argument is deprecated" in str(msg.message) 267 ] 268 self.assertEqual(len(matching_messages), 1) 269 270 # Helper functions. 271 272 def objects_of_type(self, type): 273 """ 274 Find and return a list of all currently tracked instances of the 275 given type. 276 """ 277 return [ 278 obj for obj in gc.get_objects() 279 if isinstance(obj, type) 280 ] 281 282 def create_and_destroy_kernel(self): 283 """ 284 Set up a new kernel with two associated consoles, then shut everything 285 down. 286 """ 287 kernel = InternalIPKernel() 288 kernel.init_ipkernel(gui_backend=None) 289 kernel.new_qt_console() 290 kernel.new_qt_console() 291 kernel.shutdown() 292