1# (C) Copyright 2007-2019 Enthought, Inc., Austin, TX
2# All rights reserved.
3#
4# This software is provided without warranty under the terms of the BSD
5# license included in LICENSE.txt and may be redistributed only
6# under the conditions described in the aforementioned license.  The license
7# is also available online at http://www.enthought.com/licenses/BSD.txt
8# Thanks for using Enthought open source!
9
10from __future__ import absolute_import, print_function, unicode_literals
11
12import atexit
13import gc
14try:
15    # Python 3: mock part of standard library.
16    from unittest import mock
17except ImportError:
18    # Python 2: use 3rd-party mock
19    import mock
20import os
21import shutil
22import sys
23import tempfile
24import threading
25import unittest
26import warnings
27
28try:
29    import ipykernel  # noqa: F401
30except ImportError:
31    ipykernel_available = False
32else:
33    ipykernel_available = True
34
35if ipykernel_available:
36    import ipykernel.iostream
37    import ipykernel.ipkernel
38    import ipykernel.kernelapp
39    import ipykernel.zmqshell
40    import IPython.utils.io
41    import tornado.ioloop
42    import zmq
43
44    from envisage.plugins.ipython_kernel.internal_ipkernel import (
45        InternalIPKernel)
46
47
48@unittest.skipUnless(ipykernel_available,
49                     "skipping tests that require the ipykernel package")
50class TestInternalIPKernel(unittest.TestCase):
51    def setUp(self):
52        # Make sure that IPython-related files are written to a temporary
53        # directory instead of the home directory.
54        tmpdir = tempfile.mkdtemp()
55        self.addCleanup(shutil.rmtree, tmpdir)
56
57        self._old_ipythondir = os.environ.get("IPYTHONDIR")
58        os.environ["IPYTHONDIR"] = tmpdir
59
60    def tearDown(self):
61        # Restore previous state of the IPYTHONDIR environment variable.
62        old_ipythondir = self._old_ipythondir
63        if old_ipythondir is None:
64            del os.environ["IPYTHONDIR"]
65        else:
66            os.environ["IPYTHONDIR"] = old_ipythondir
67
68    def test_lifecycle(self):
69        kernel = InternalIPKernel()
70        self.assertIsNone(kernel.ipkernel)
71
72        kernel.init_ipkernel(gui_backend=None)
73        self.assertIsNotNone(kernel.ipkernel)
74        self.assertIsInstance(kernel.ipkernel, ipykernel.kernelapp.IPKernelApp)
75
76        kernel.new_qt_console()
77        kernel.new_qt_console()
78        self.assertEqual(len(kernel.consoles), 2)
79
80        kernel.shutdown()
81        self.assertIsNone(kernel.ipkernel)
82        self.assertEqual(len(kernel.consoles), 0)
83
84    def test_initial_namespace(self):
85        kernel = InternalIPKernel(initial_namespace=[('x', 42.1)])
86        kernel.init_ipkernel(gui_backend=None)
87        self.assertIn('x', kernel.namespace)
88        self.assertEqual(kernel.namespace['x'], 42.1)
89        kernel.shutdown()
90
91    def test_shutdown_restores_output_streams(self):
92        original_stdin = sys.stdin
93        original_stdout = sys.stdout
94        original_stderr = sys.stderr
95
96        self.create_and_destroy_kernel()
97
98        self.assertIs(sys.stdin, original_stdin)
99        self.assertIs(sys.stdout, original_stdout)
100        self.assertIs(sys.stderr, original_stderr)
101
102    def test_shutdown_restores_sys_modules_main(self):
103        original_sys_modules_main = sys.modules["__main__"]
104        self.create_and_destroy_kernel()
105        self.assertIs(sys.modules["__main__"], original_sys_modules_main)
106
107    def test_shutdown_restores_displayhook_and_excepthook(self):
108        original_displayhook = sys.displayhook
109        original_excepthook = sys.excepthook
110
111        self.create_and_destroy_kernel()
112
113        self.assertIs(sys.displayhook, original_displayhook)
114        self.assertIs(sys.excepthook, original_excepthook)
115
116    def test_shutdown_restores_sys_path(self):
117        original_sys_path = sys.path[:]
118
119        self.create_and_destroy_kernel()
120
121        self.assertEqual(sys.path, original_sys_path)
122
123    def test_shutdown_closes_console_pipes(self):
124        kernel = InternalIPKernel()
125        kernel.init_ipkernel(gui_backend=None)
126        console = kernel.new_qt_console()
127        self.assertFalse(console.stdout.closed)
128        self.assertFalse(console.stderr.closed)
129        kernel.shutdown()
130        self.assertTrue(console.stdout.closed)
131        self.assertTrue(console.stderr.closed)
132
133    def test_ipython_util_io_globals_restored(self):
134        original_io_stdin = IPython.utils.io.stdin
135        original_io_stdout = IPython.utils.io.stdout
136        original_io_stderr = IPython.utils.io.stderr
137
138        self.create_and_destroy_kernel()
139
140        self.assertIs(IPython.utils.io.stdin, original_io_stdin)
141        self.assertIs(IPython.utils.io.stdout, original_io_stdout)
142        self.assertIs(IPython.utils.io.stderr, original_io_stderr)
143
144    def test_ipython_util_io_globals_restored_if_they_dont_exist(self):
145        # Regression test for enthought/envisage#218
146        original_io_stdin = IPython.utils.io.stdin
147        original_io_stdout = IPython.utils.io.stdout
148        original_io_stderr = IPython.utils.io.stderr
149
150        del IPython.utils.io.stdin
151        del IPython.utils.io.stdout
152        del IPython.utils.io.stderr
153
154        try:
155            self.create_and_destroy_kernel()
156            self.assertFalse(hasattr(IPython.utils.io, "stdin"))
157            self.assertFalse(hasattr(IPython.utils.io, "stdout"))
158            self.assertFalse(hasattr(IPython.utils.io, "stderr"))
159        finally:
160            IPython.utils.io.stdin = original_io_stdin
161            IPython.utils.io.stdout = original_io_stdout
162            IPython.utils.io.stderr = original_io_stderr
163
164    def test_io_pub_thread_stopped(self):
165        self.create_and_destroy_kernel()
166        io_pub_threads = self.objects_of_type(ipykernel.iostream.IOPubThread)
167        for thread in io_pub_threads:
168            self.assertFalse(thread.thread.is_alive())
169
170    def test_no_threads_leaked(self):
171        threads_before = threading.active_count()
172        self.create_and_destroy_kernel()
173        threads_after = threading.active_count()
174        self.assertEqual(threads_before, threads_after)
175
176    def test_no_new_atexit_handlers(self):
177        # Caution: this is a rather fragile and indirect test. We want
178        # to know that all cleanup has happened when shutting down the
179        # kernel, with none of that cleanup deferred to atexit handlers.
180        #
181        # Since we have no direct way to get hold of the atexit handlers on
182        # Python 3, we instead use the number of referents from the
183        # atexit module as a proxy.
184        #
185        # If this test starts failing, try adding a warmup cycle. If the
186        # first call to self.create_and_destroy_kernel adds new referents,
187        # that's not a big deal. But if every call consistently adds new
188        # referents, then there's something to be fixed.
189        atexit_handlers_before = len(gc.get_referents(atexit))
190        self.create_and_destroy_kernel()
191        atexit_handlers_after = len(gc.get_referents(atexit))
192
193        self.assertEqual(atexit_handlers_before, atexit_handlers_after)
194
195    def test_zmq_sockets_closed(self):
196        # Previously, tests were leaking file descriptors linked to
197        # zmq.Socket objects. Check that all extant sockets are closed.
198        self.create_and_destroy_kernel()
199        sockets = self.objects_of_type(zmq.Socket)
200        self.assertTrue(all(socket.closed for socket in sockets))
201
202    def test_ipykernel_live_objects(self):
203        # Check that all IPython-related objects have been cleaned up
204        # as expected.
205
206        self.create_and_destroy_kernel()
207
208        # Remove anything kept alive by cycles. (There are too many cycles
209        # to break them individually.)
210        gc.collect()
211
212        shells = self.objects_of_type(ipykernel.zmqshell.ZMQInteractiveShell)
213        self.assertEqual(shells, [])
214
215        kernels = self.objects_of_type(ipykernel.ipkernel.IPythonKernel)
216        self.assertEqual(kernels, [])
217
218        kernel_apps = self.objects_of_type(ipykernel.kernelapp.IPKernelApp)
219        self.assertEqual(kernel_apps, [])
220
221    def test_initialize_twice(self):
222        # Trying to re-initialize an already initialized IPKernelApp can
223        # happen right now as a result of refactoring, but eventually
224        # it should be an error. For now, it's a warning.
225        kernel = InternalIPKernel()
226        self.assertIsNone(kernel.ipkernel)
227        kernel.init_ipkernel(gui_backend=None)
228        try:
229            self.assertIsNotNone(kernel.ipkernel)
230            ipkernel = kernel.ipkernel
231
232            with warnings.catch_warnings(record=True) as warn_msgs:
233                warnings.simplefilter("always", category=DeprecationWarning)
234                kernel.init_ipkernel(gui_backend=None)
235
236            # Check that the existing kernel has not been replaced.
237            self.assertIs(ipkernel, kernel.ipkernel)
238        finally:
239            kernel.shutdown()
240
241        # Check that we got the expected warning message.
242        self.assertEqual(len(warn_msgs), 1)
243        message = str(warn_msgs[0].message)
244        self.assertIn("already been initialized", message)
245
246    def test_init_ipkernel_with_explicit_gui_backend(self):
247        loop = tornado.ioloop.IOLoop.current()
248
249        # Kernel initialization adds an "enter_eventloop" call to the
250        # ioloop event loop queue. Mock to avoid modifying the actual event
251        # loop.
252        with mock.patch.object(loop, "add_callback") as mock_add_callback:
253            with warnings.catch_warnings(record=True) as warn_msgs:
254                warnings.simplefilter("always", category=DeprecationWarning)
255
256                # Use of gui_backend is deprecated.
257                kernel = InternalIPKernel()
258                kernel.init_ipkernel(gui_backend="qt4")
259                kernel.shutdown()
260
261        mock_add_callback.reset_mock()
262
263        # Check that we got the expected warning message.
264        matching_messages = [
265            msg for msg in warn_msgs
266            if "gui_backend argument is deprecated" in str(msg.message)
267        ]
268        self.assertEqual(len(matching_messages), 1)
269
270    # Helper functions.
271
272    def objects_of_type(self, type):
273        """
274        Find and return a list of all currently tracked instances of the
275        given type.
276        """
277        return [
278            obj for obj in gc.get_objects()
279            if isinstance(obj, type)
280        ]
281
282    def create_and_destroy_kernel(self):
283        """
284        Set up a new kernel with two associated consoles, then shut everything
285        down.
286        """
287        kernel = InternalIPKernel()
288        kernel.init_ipkernel(gui_backend=None)
289        kernel.new_qt_console()
290        kernel.new_qt_console()
291        kernel.shutdown()
292