1"""Base class for a kernel that talks to frontends over 0MQ."""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import asyncio
7import concurrent.futures
8from datetime import datetime
9from functools import partial
10import itertools
11import logging
12import inspect
13import os
14from signal import signal, default_int_handler, SIGINT
15import sys
16import time
17import uuid
18import warnings
19
20try:
21    # jupyter_client >= 5, use tz-aware now
22    from jupyter_client.session import utcnow as now
23except ImportError:
24    # jupyter_client < 5, use local now()
25    now = datetime.now
26
27from tornado import ioloop
28from tornado.queues import Queue
29import zmq
30from zmq.eventloop.zmqstream import ZMQStream
31
32from traitlets.config.configurable import SingletonConfigurable
33from IPython.core.error import StdinNotImplementedError
34from ipykernel.jsonutil import json_clean
35from traitlets import (
36    Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
37    observe, default
38)
39
40from jupyter_client.session import Session
41
42from ._version import kernel_protocol_version
43
44
45class Kernel(SingletonConfigurable):
46
47    #---------------------------------------------------------------------------
48    # Kernel interface
49    #---------------------------------------------------------------------------
50
51    # attribute to override with a GUI
52    eventloop = Any(None)
53
54    @observe('eventloop')
55    def _update_eventloop(self, change):
56        """schedule call to eventloop from IOLoop"""
57        loop = ioloop.IOLoop.current()
58        if change.new is not None:
59            loop.add_callback(self.enter_eventloop)
60
61    session = Instance(Session, allow_none=True)
62    profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
63    shell_stream = Instance(ZMQStream, allow_none=True)
64
65    shell_streams = List(
66        help="""Deprecated shell_streams alias. Use shell_stream
67
68        .. versionchanged:: 6.0
69            shell_streams is deprecated. Use shell_stream.
70        """
71    )
72
73    @default("shell_streams")
74    def _shell_streams_default(self):
75        warnings.warn(
76            "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
77            DeprecationWarning,
78            stacklevel=2,
79        )
80        if self.shell_stream is not None:
81            return [self.shell_stream]
82        else:
83            return []
84
85    @observe("shell_streams")
86    def _shell_streams_changed(self, change):
87        warnings.warn(
88            "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream",
89            DeprecationWarning,
90            stacklevel=2,
91        )
92        if len(change.new) > 1:
93            warnings.warn(
94                "Kernel only supports one shell stream. Additional streams will be ignored.",
95                RuntimeWarning,
96                stacklevel=2,
97            )
98        if change.new:
99            self.shell_stream = change.new[0]
100
101    control_stream = Instance(ZMQStream, allow_none=True)
102
103    debug_shell_socket = Any()
104
105    control_thread = Any()
106    iopub_socket = Any()
107    iopub_thread = Any()
108    stdin_socket = Any()
109    log = Instance(logging.Logger, allow_none=True)
110
111    # identities:
112    int_id = Integer(-1)
113    ident = Unicode()
114
115    @default('ident')
116    def _default_ident(self):
117        return str(uuid.uuid4())
118
119    # This should be overridden by wrapper kernels that implement any real
120    # language.
121    language_info = {}
122
123    # any links that should go in the help menu
124    help_links = List()
125
126    # Private interface
127
128    _darwin_app_nap = Bool(True,
129        help="""Whether to use appnope for compatibility with OS X App Nap.
130
131        Only affects OS X >= 10.9.
132        """
133    ).tag(config=True)
134
135    # track associations with current request
136    _allow_stdin = Bool(False)
137    _parents = Dict({"shell": {}, "control": {}})
138    _parent_ident = Dict({'shell': b'', 'control': b''})
139
140    @property
141    def _parent_header(self):
142        warnings.warn(
143            "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()",
144            DeprecationWarning,
145            stacklevel=2,
146        )
147        return self.get_parent(channel="shell")
148
149    # Time to sleep after flushing the stdout/err buffers in each execute
150    # cycle.  While this introduces a hard limit on the minimal latency of the
151    # execute cycle, it helps prevent output synchronization problems for
152    # clients.
153    # Units are in seconds.  The minimum zmq latency on local host is probably
154    # ~150 microseconds, set this to 500us for now.  We may need to increase it
155    # a little if it's not enough after more interactive testing.
156    _execute_sleep = Float(0.0005).tag(config=True)
157
158    # Frequency of the kernel's event loop.
159    # Units are in seconds, kernel subclasses for GUI toolkits may need to
160    # adapt to milliseconds.
161    _poll_interval = Float(0.01).tag(config=True)
162
163    stop_on_error_timeout = Float(
164        0.0,
165        config=True,
166        help="""time (in seconds) to wait for messages to arrive
167        when aborting queued requests after an error.
168
169        Requests that arrive within this window after an error
170        will be cancelled.
171
172        Increase in the event of unusually slow network
173        causing significant delays,
174        which can manifest as e.g. "Run all" in a notebook
175        aborting some, but not all, messages after an error.
176        """
177    )
178
179    # If the shutdown was requested over the network, we leave here the
180    # necessary reply message so it can be sent by our registered atexit
181    # handler.  This ensures that the reply is only sent to clients truly at
182    # the end of our shutdown process (which happens after the underlying
183    # IPython shell's own shutdown).
184    _shutdown_message = None
185
186    # This is a dict of port number that the kernel is listening on. It is set
187    # by record_ports and used by connect_request.
188    _recorded_ports = Dict()
189
190    # set of aborted msg_ids
191    aborted = Set()
192
193    # Track execution count here. For IPython, we override this to use the
194    # execution count we store in the shell.
195    execution_count = 0
196
197    msg_types = [
198        'execute_request', 'complete_request',
199        'inspect_request', 'history_request',
200        'comm_info_request', 'kernel_info_request',
201        'connect_request', 'shutdown_request',
202        'is_complete_request', 'interrupt_request',
203        # deprecated:
204        'apply_request',
205    ]
206    # add deprecated ipyparallel control messages
207    control_msg_types = msg_types + ['clear_request', 'abort_request', 'debug_request']
208
209    def __init__(self, **kwargs):
210        super(Kernel, self).__init__(**kwargs)
211        # Build dict of handlers for message types
212        self.shell_handlers = {}
213        for msg_type in self.msg_types:
214            self.shell_handlers[msg_type] = getattr(self, msg_type)
215
216        self.control_handlers = {}
217        for msg_type in self.control_msg_types:
218            self.control_handlers[msg_type] = getattr(self, msg_type)
219
220        self.control_queue = Queue()
221
222    def dispatch_control(self, msg):
223        self.control_queue.put_nowait(msg)
224
225    async def poll_control_queue(self):
226        while True:
227            msg = await self.control_queue.get()
228            # handle tracers from _flush_control_queue
229            if isinstance(msg, (concurrent.futures.Future, asyncio.Future)):
230                msg.set_result(None)
231                continue
232            await self.process_control(msg)
233
234    async def _flush_control_queue(self):
235        """Flush the control queue, wait for processing of any pending messages"""
236        if self.control_thread:
237            control_loop = self.control_thread.io_loop
238            # concurrent.futures.Futures are threadsafe
239            # and can be used to await across threads
240            tracer_future = concurrent.futures.Future()
241            awaitable_future = asyncio.wrap_future(tracer_future)
242        else:
243            control_loop = self.io_loop
244            tracer_future = awaitable_future = asyncio.Future()
245
246        def _flush():
247            # control_stream.flush puts messages on the queue
248            self.control_stream.flush()
249            # put Future on the queue after all of those,
250            # so we can wait for all queued messages to be processed
251            self.control_queue.put(tracer_future)
252
253        control_loop.add_callback(_flush)
254        return awaitable_future
255
256    async def process_control(self, msg):
257        """dispatch control requests"""
258        idents, msg = self.session.feed_identities(msg, copy=False)
259        try:
260            msg = self.session.deserialize(msg, content=True, copy=False)
261        except Exception:
262            self.log.error("Invalid Control Message", exc_info=True)
263            return
264
265        self.log.debug("Control received: %s", msg)
266
267        # Set the parent message for side effects.
268        self.set_parent(idents, msg, channel='control')
269        self._publish_status('busy', 'control')
270
271        header = msg['header']
272        msg_type = header['msg_type']
273
274        handler = self.control_handlers.get(msg_type, None)
275        if handler is None:
276            self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
277        else:
278            try:
279                result = handler(self.control_stream, idents, msg)
280                if inspect.isawaitable(result):
281                    await result
282            except Exception:
283                self.log.error("Exception in control handler:", exc_info=True)
284
285        sys.stdout.flush()
286        sys.stderr.flush()
287        self._publish_status('idle', 'control')
288        # flush to ensure reply is sent
289        self.control_stream.flush(zmq.POLLOUT)
290
291    def should_handle(self, stream, msg, idents):
292        """Check whether a shell-channel message should be handled
293
294        Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
295        """
296        msg_id = msg['header']['msg_id']
297        if msg_id in self.aborted:
298            # is it safe to assume a msg_id will not be resubmitted?
299            self.aborted.remove(msg_id)
300            self._send_abort_reply(stream, msg, idents)
301            return False
302        return True
303
304    async def dispatch_shell(self, msg):
305        """dispatch shell requests"""
306
307        # flush control queue before handling shell requests
308        await self._flush_control_queue()
309
310        idents, msg = self.session.feed_identities(msg, copy=False)
311        try:
312            msg = self.session.deserialize(msg, content=True, copy=False)
313        except Exception:
314            self.log.error("Invalid Message", exc_info=True)
315            return
316
317        # Set the parent message for side effects.
318        self.set_parent(idents, msg, channel='shell')
319        self._publish_status('busy', 'shell')
320
321        msg_type = msg['header']['msg_type']
322
323        # Only abort execute requests
324        if self._aborting and msg_type == 'execute_request':
325            self._send_abort_reply(self.shell_stream, msg, idents)
326            self._publish_status('idle', 'shell')
327            # flush to ensure reply is sent before
328            # handling the next request
329            self.shell_stream.flush(zmq.POLLOUT)
330            return
331
332        # Print some info about this message and leave a '--->' marker, so it's
333        # easier to trace visually the message chain when debugging.  Each
334        # handler prints its message at the end.
335        self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
336        self.log.debug('   Content: %s\n   --->\n   ', msg['content'])
337
338        if not self.should_handle(self.shell_stream, msg, idents):
339            return
340
341        handler = self.shell_handlers.get(msg_type, None)
342        if handler is None:
343            self.log.warning("Unknown message type: %r", msg_type)
344        else:
345            self.log.debug("%s: %s", msg_type, msg)
346            try:
347                self.pre_handler_hook()
348            except Exception:
349                self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True)
350            try:
351                result = handler(self.shell_stream, idents, msg)
352                if inspect.isawaitable(result):
353                    await result
354            except Exception:
355                self.log.error("Exception in message handler:", exc_info=True)
356            except KeyboardInterrupt:
357                # Ctrl-c shouldn't crash the kernel here.
358                self.log.error("KeyboardInterrupt caught in kernel.")
359            finally:
360                try:
361                    self.post_handler_hook()
362                except Exception:
363                    self.log.debug("Unable to signal in post_handler_hook:", exc_info=True)
364
365        sys.stdout.flush()
366        sys.stderr.flush()
367        self._publish_status('idle', 'shell')
368        # flush to ensure reply is sent before
369        # handling the next request
370        self.shell_stream.flush(zmq.POLLOUT)
371
372    def pre_handler_hook(self):
373        """Hook to execute before calling message handler"""
374        # ensure default_int_handler during handler call
375        self.saved_sigint_handler = signal(SIGINT, default_int_handler)
376
377    def post_handler_hook(self):
378        """Hook to execute after calling message handler"""
379        signal(SIGINT, self.saved_sigint_handler)
380
381    def enter_eventloop(self):
382        """enter eventloop"""
383        self.log.info("Entering eventloop %s", self.eventloop)
384        # record handle, so we can check when this changes
385        eventloop = self.eventloop
386        if eventloop is None:
387            self.log.info("Exiting as there is no eventloop")
388            return
389
390        def advance_eventloop():
391            # check if eventloop changed:
392            if self.eventloop is not eventloop:
393                self.log.info("exiting eventloop %s", eventloop)
394                return
395            if self.msg_queue.qsize():
396                self.log.debug("Delaying eventloop due to waiting messages")
397                # still messages to process, make the eventloop wait
398                schedule_next()
399                return
400            self.log.debug("Advancing eventloop %s", eventloop)
401            try:
402                eventloop(self)
403            except KeyboardInterrupt:
404                # Ctrl-C shouldn't crash the kernel
405                self.log.error("KeyboardInterrupt caught in kernel")
406                pass
407            if self.eventloop is eventloop:
408                # schedule advance again
409                schedule_next()
410
411        def schedule_next():
412            """Schedule the next advance of the eventloop"""
413            # flush the eventloop every so often,
414            # giving us a chance to handle messages in the meantime
415            self.log.debug("Scheduling eventloop advance")
416            self.io_loop.call_later(0.001, advance_eventloop)
417
418        # begin polling the eventloop
419        schedule_next()
420
421    async def do_one_iteration(self):
422        """Process a single shell message
423
424        Any pending control messages will be flushed as well
425
426        .. versionchanged:: 5
427            This is now a coroutine
428        """
429        # flush messages off of shell stream into the message queue
430        self.shell_stream.flush()
431        # process at most one shell message per iteration
432        await self.process_one(wait=False)
433
434    async def process_one(self, wait=True):
435        """Process one request
436
437        Returns None if no message was handled.
438        """
439        if wait:
440            t, dispatch, args = await self.msg_queue.get()
441        else:
442            try:
443                t, dispatch, args = self.msg_queue.get_nowait()
444            except asyncio.QueueEmpty:
445                return None
446        await dispatch(*args)
447
448    async def dispatch_queue(self):
449        """Coroutine to preserve order of message handling
450
451        Ensures that only one message is processing at a time,
452        even when the handler is async
453        """
454
455        while True:
456            try:
457                await self.process_one()
458            except Exception:
459                self.log.exception("Error in message handler")
460
461    _message_counter = Any(
462        help="""Monotonic counter of messages
463        """,
464    )
465    @default('_message_counter')
466    def _message_counter_default(self):
467        return itertools.count()
468
469    def schedule_dispatch(self, dispatch, *args):
470        """schedule a message for dispatch"""
471        idx = next(self._message_counter)
472
473        self.msg_queue.put_nowait(
474            (
475                idx,
476                dispatch,
477                args,
478            )
479        )
480        # ensure the eventloop wakes up
481        self.io_loop.add_callback(lambda: None)
482
483    def start(self):
484        """register dispatchers for streams"""
485        self.io_loop = ioloop.IOLoop.current()
486        self.msg_queue = Queue()
487        self.io_loop.add_callback(self.dispatch_queue)
488
489        self.control_stream.on_recv(self.dispatch_control, copy=False)
490
491        if self.control_thread:
492            control_loop = self.control_thread.io_loop
493        else:
494            control_loop = self.io_loop
495
496        asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop.asyncio_loop)
497
498        self.shell_stream.on_recv(
499            partial(
500                self.schedule_dispatch,
501                self.dispatch_shell,
502            ),
503            copy=False,
504        )
505
506        # publish idle status
507        self._publish_status('starting', 'shell')
508
509
510    def record_ports(self, ports):
511        """Record the ports that this kernel is using.
512
513        The creator of the Kernel instance must call this methods if they
514        want the :meth:`connect_request` method to return the port numbers.
515        """
516        self._recorded_ports = ports
517
518    #---------------------------------------------------------------------------
519    # Kernel request handlers
520    #---------------------------------------------------------------------------
521
522    def _publish_execute_input(self, code, parent, execution_count):
523        """Publish the code request on the iopub stream."""
524
525        self.session.send(self.iopub_socket, 'execute_input',
526                          {'code':code, 'execution_count': execution_count},
527                          parent=parent, ident=self._topic('execute_input')
528        )
529
530    def _publish_status(self, status, channel, parent=None):
531        """send status (busy/idle) on IOPub"""
532        self.session.send(
533            self.iopub_socket,
534            "status",
535            {"execution_state": status},
536            parent=parent or self.get_parent(channel),
537            ident=self._topic("status"),
538        )
539
540    def _publish_debug_event(self, event):
541        self.session.send(
542            self.iopub_socket,
543            "debug_event",
544            event,
545            parent=self.get_parent("control"),
546            ident=self._topic("debug_event"),
547        )
548
549    def set_parent(self, ident, parent, channel='shell'):
550        """Set the current parent request
551
552        Side effects (IOPub messages) and replies are associated with
553        the request that caused them via the parent_header.
554
555        The parent identity is used to route input_request messages
556        on the stdin channel.
557        """
558        self._parent_ident[channel] = ident
559        self._parents[channel] = parent
560
561    def get_parent(self, channel="shell"):
562        """Get the parent request associated with a channel.
563
564        .. versionadded:: 6
565
566        Parameters
567        ----------
568        channel : str
569            the name of the channel ('shell' or 'control')
570
571        Returns
572        -------
573        message : dict
574            the parent message for the most recent request on the channel.
575        """
576        return self._parents.get(channel, {})
577
578    def send_response(self, stream, msg_or_type, content=None, ident=None,
579             buffers=None, track=False, header=None, metadata=None, channel='shell'):
580        """Send a response to the message we're currently processing.
581
582        This accepts all the parameters of :meth:`jupyter_client.session.Session.send`
583        except ``parent``.
584
585        This relies on :meth:`set_parent` having been called for the current
586        message.
587        """
588        return self.session.send(
589            stream,
590            msg_or_type,
591            content,
592            self.get_parent(channel),
593            ident,
594            buffers,
595            track,
596            header,
597            metadata,
598        )
599
600    def init_metadata(self, parent):
601        """Initialize metadata.
602
603        Run at the beginning of execution requests.
604        """
605        # FIXME: `started` is part of ipyparallel
606        # Remove for ipykernel 5.0
607        return {
608            'started': now(),
609        }
610
611    def finish_metadata(self, parent, metadata, reply_content):
612        """Finish populating metadata.
613
614        Run after completing an execution request.
615        """
616        return metadata
617
618    async def execute_request(self, stream, ident, parent):
619        """handle an execute_request"""
620
621        try:
622            content = parent['content']
623            code = content['code']
624            silent = content['silent']
625            store_history = content.get('store_history', not silent)
626            user_expressions = content.get('user_expressions', {})
627            allow_stdin = content.get('allow_stdin', False)
628        except Exception:
629            self.log.error("Got bad msg: ")
630            self.log.error("%s", parent)
631            return
632
633        stop_on_error = content.get('stop_on_error', True)
634
635        metadata = self.init_metadata(parent)
636
637        # Re-broadcast our input for the benefit of listening clients, and
638        # start computing output
639        if not silent:
640            self.execution_count += 1
641            self._publish_execute_input(code, parent, self.execution_count)
642
643        reply_content = self.do_execute(
644            code, silent, store_history,
645            user_expressions, allow_stdin,
646        )
647        if inspect.isawaitable(reply_content):
648            reply_content = await reply_content
649
650        # Flush output before sending the reply.
651        sys.stdout.flush()
652        sys.stderr.flush()
653        # FIXME: on rare occasions, the flush doesn't seem to make it to the
654        # clients... This seems to mitigate the problem, but we definitely need
655        # to better understand what's going on.
656        if self._execute_sleep:
657            time.sleep(self._execute_sleep)
658
659        # Send the reply.
660        reply_content = json_clean(reply_content)
661        metadata = self.finish_metadata(parent, metadata, reply_content)
662
663        reply_msg = self.session.send(stream, 'execute_reply',
664                                      reply_content, parent, metadata=metadata,
665                                      ident=ident)
666
667        self.log.debug("%s", reply_msg)
668
669        if not silent and reply_msg['content']['status'] == 'error' and stop_on_error:
670            await self._abort_queues()
671
672    def do_execute(self, code, silent, store_history=True,
673                   user_expressions=None, allow_stdin=False):
674        """Execute user code. Must be overridden by subclasses.
675        """
676        raise NotImplementedError
677
678    async def complete_request(self, stream, ident, parent):
679        content = parent['content']
680        code = content['code']
681        cursor_pos = content['cursor_pos']
682
683        matches = self.do_complete(code, cursor_pos)
684        if inspect.isawaitable(matches):
685            matches = await matches
686
687        matches = json_clean(matches)
688        self.session.send(stream, "complete_reply", matches, parent, ident)
689
690    def do_complete(self, code, cursor_pos):
691        """Override in subclasses to find completions.
692        """
693        return {'matches' : [],
694                'cursor_end' : cursor_pos,
695                'cursor_start' : cursor_pos,
696                'metadata' : {},
697                'status' : 'ok'}
698
699    async def inspect_request(self, stream, ident, parent):
700        content = parent['content']
701
702        reply_content = self.do_inspect(
703            content['code'], content['cursor_pos'],
704            content.get('detail_level', 0),
705            set(content.get('omit_sections', [])),
706        )
707        if inspect.isawaitable(reply_content):
708            reply_content = await reply_content
709
710        # Before we send this object over, we scrub it for JSON usage
711        reply_content = json_clean(reply_content)
712        msg = self.session.send(stream, 'inspect_reply',
713                                reply_content, parent, ident)
714        self.log.debug("%s", msg)
715
716    def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()):
717        """Override in subclasses to allow introspection.
718        """
719        return {'status': 'ok', 'data': {}, 'metadata': {}, 'found': False}
720
721    async def history_request(self, stream, ident, parent):
722        content = parent['content']
723
724        reply_content = self.do_history(**content)
725        if inspect.isawaitable(reply_content):
726            reply_content = await reply_content
727
728        reply_content = json_clean(reply_content)
729        msg = self.session.send(stream, 'history_reply',
730                                reply_content, parent, ident)
731        self.log.debug("%s", msg)
732
733    def do_history(self, hist_access_type, output, raw, session=None, start=None,
734                   stop=None, n=None, pattern=None, unique=False):
735        """Override in subclasses to access history.
736        """
737        return {'status': 'ok', 'history': []}
738
739    async def connect_request(self, stream, ident, parent):
740        if self._recorded_ports is not None:
741            content = self._recorded_ports.copy()
742        else:
743            content = {}
744        content['status'] = 'ok'
745        msg = self.session.send(stream, 'connect_reply',
746                                content, parent, ident)
747        self.log.debug("%s", msg)
748
749    @property
750    def kernel_info(self):
751        return {
752            'protocol_version': kernel_protocol_version,
753            'implementation': self.implementation,
754            'implementation_version': self.implementation_version,
755            'language_info': self.language_info,
756            'banner': self.banner,
757            'help_links': self.help_links,
758        }
759
760    async def kernel_info_request(self, stream, ident, parent):
761        content = {'status': 'ok'}
762        content.update(self.kernel_info)
763        msg = self.session.send(stream, 'kernel_info_reply',
764                                content, parent, ident)
765        self.log.debug("%s", msg)
766
767    async def comm_info_request(self, stream, ident, parent):
768        content = parent['content']
769        target_name = content.get('target_name', None)
770
771        # Should this be moved to ipkernel?
772        if hasattr(self, 'comm_manager'):
773            comms = {
774                k: dict(target_name=v.target_name)
775                for (k, v) in self.comm_manager.comms.items()
776                if v.target_name == target_name or target_name is None
777            }
778        else:
779            comms = {}
780        reply_content = dict(comms=comms, status='ok')
781        msg = self.session.send(stream, 'comm_info_reply',
782                                reply_content, parent, ident)
783        self.log.debug("%s", msg)
784
785    async def interrupt_request(self, stream, ident, parent):
786        pid = os.getpid()
787        pgid = os.getpgid(pid)
788
789        if os.name == "nt":
790            self.log.error("Interrupt message not supported on Windows")
791
792        else:
793            # Prefer process-group over process
794            if pgid and hasattr(os, "killpg"):
795                try:
796                    os.killpg(pgid, SIGINT)
797                    return
798                except OSError:
799                    pass
800            try:
801                os.kill(pid, SIGINT)
802            except OSError:
803                pass
804
805        content = parent['content']
806        self.session.send(stream, 'interrupt_reply', content, parent, ident=ident)
807        return
808
809    async def shutdown_request(self, stream, ident, parent):
810        content = self.do_shutdown(parent['content']['restart'])
811        if inspect.isawaitable(content):
812            content = await content
813        self.session.send(stream, 'shutdown_reply', content, parent, ident=ident)
814        # same content, but different msg_id for broadcasting on IOPub
815        self._shutdown_message = self.session.msg('shutdown_reply',
816                                                  content, parent
817        )
818
819        self._at_shutdown()
820
821        self.log.debug('Stopping control ioloop')
822        control_io_loop = self.control_stream.io_loop
823        control_io_loop.add_callback(control_io_loop.stop)
824
825        self.log.debug('Stopping shell ioloop')
826        shell_io_loop = self.shell_stream.io_loop
827        shell_io_loop.add_callback(shell_io_loop.stop)
828
829    def do_shutdown(self, restart):
830        """Override in subclasses to do things when the frontend shuts down the
831        kernel.
832        """
833        return {'status': 'ok', 'restart': restart}
834
835    async def is_complete_request(self, stream, ident, parent):
836        content = parent['content']
837        code = content['code']
838
839        reply_content = self.do_is_complete(code)
840        if inspect.isawaitable(reply_content):
841            reply_content = await reply_content
842        reply_content = json_clean(reply_content)
843        reply_msg = self.session.send(stream, 'is_complete_reply',
844                                      reply_content, parent, ident)
845        self.log.debug("%s", reply_msg)
846
847    def do_is_complete(self, code):
848        """Override in subclasses to find completions.
849        """
850        return { 'status' : 'unknown'}
851
852    async def debug_request(self, stream, ident, parent):
853        content = parent['content']
854
855        reply_content = self.do_debug_request(content)
856        if inspect.isawaitable(reply_content):
857            reply_content = await reply_content
858        reply_content = json_clean(reply_content)
859        reply_msg = self.session.send(stream, 'debug_reply', reply_content,
860                                      parent, ident)
861        self.log.debug("%s", reply_msg)
862
863    async def do_debug_request(self, msg):
864        raise NotImplementedError
865
866    #---------------------------------------------------------------------------
867    # Engine methods (DEPRECATED)
868    #---------------------------------------------------------------------------
869
870    async def apply_request(self, stream, ident, parent):
871        self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.")
872        try:
873            content = parent['content']
874            bufs = parent['buffers']
875            msg_id = parent['header']['msg_id']
876        except Exception:
877            self.log.error("Got bad msg: %s", parent, exc_info=True)
878            return
879
880        md = self.init_metadata(parent)
881
882        reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
883
884        # flush i/o
885        sys.stdout.flush()
886        sys.stderr.flush()
887
888        md = self.finish_metadata(parent, md, reply_content)
889
890        self.session.send(stream, 'apply_reply', reply_content,
891                    parent=parent, ident=ident,buffers=result_buf, metadata=md)
892
893    def do_apply(self, content, bufs, msg_id, reply_metadata):
894        """DEPRECATED"""
895        raise NotImplementedError
896
897    #---------------------------------------------------------------------------
898    # Control messages (DEPRECATED)
899    #---------------------------------------------------------------------------
900
901    async def abort_request(self, stream, ident, parent):
902        """abort a specific msg by id"""
903        self.log.warning("abort_request is deprecated in kernel_base. It is only part of IPython parallel")
904        msg_ids = parent['content'].get('msg_ids', None)
905        if isinstance(msg_ids, str):
906            msg_ids = [msg_ids]
907        if not msg_ids:
908            self._abort_queues()
909        for mid in msg_ids:
910            self.aborted.add(str(mid))
911
912        content = dict(status='ok')
913        reply_msg = self.session.send(stream, 'abort_reply', content=content,
914                parent=parent, ident=ident)
915        self.log.debug("%s", reply_msg)
916
917    async def clear_request(self, stream, idents, parent):
918        """Clear our namespace."""
919        self.log.warning("clear_request is deprecated in kernel_base. It is only part of IPython parallel")
920        content = self.do_clear()
921        self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
922                content = content)
923
924    def do_clear(self):
925        """DEPRECATED since 4.0.3"""
926        raise NotImplementedError
927
928    #---------------------------------------------------------------------------
929    # Protected interface
930    #---------------------------------------------------------------------------
931
932    def _topic(self, topic):
933        """prefixed topic for IOPub messages"""
934        base = "kernel.%s" % self.ident
935
936        return ("%s.%s" % (base, topic)).encode()
937
938    _aborting = Bool(False)
939
940    async def _abort_queues(self):
941        self.shell_stream.flush()
942        self._aborting = True
943        def stop_aborting():
944            self.log.info("Finishing abort")
945            self._aborting = False
946        asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting)
947
948    def _send_abort_reply(self, stream, msg, idents):
949        """Send a reply to an aborted request"""
950        self.log.info(
951            f"Aborting {msg['header']['msg_id']}: {msg['header']['msg_type']}"
952        )
953        reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply"
954        status = {"status": "aborted"}
955        md = self.init_metadata(msg)
956        md = self.finish_metadata(msg, md, status)
957        md.update(status)
958
959        self.session.send(
960            stream, reply_type, metadata=md,
961            content=status, parent=msg, ident=idents,
962        )
963
964    def _no_raw_input(self):
965        """Raise StdinNotImplementedError if active frontend doesn't support
966        stdin."""
967        raise StdinNotImplementedError("raw_input was called, but this "
968                                       "frontend does not support stdin.")
969
970    def getpass(self, prompt='', stream=None):
971        """Forward getpass to frontends
972
973        Raises
974        ------
975        StdinNotImplementedError if active frontend doesn't support stdin.
976        """
977        if not self._allow_stdin:
978            raise StdinNotImplementedError(
979                "getpass was called, but this frontend does not support input requests."
980            )
981        if stream is not None:
982            import warnings
983
984            warnings.warn(
985                "The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel",
986                UserWarning,
987                stacklevel=2,
988            )
989        return self._input_request(
990            prompt,
991            self._parent_ident["shell"],
992            self.get_parent("shell"),
993            password=True,
994        )
995
996    def raw_input(self, prompt=''):
997        """Forward raw_input to frontends
998
999        Raises
1000        ------
1001        StdinNotImplementedError if active frontend doesn't support stdin.
1002        """
1003        if not self._allow_stdin:
1004            raise StdinNotImplementedError(
1005                "raw_input was called, but this frontend does not support input requests."
1006            )
1007        return self._input_request(
1008            str(prompt),
1009            self._parent_ident["shell"],
1010            self.get_parent("shell"),
1011            password=False,
1012        )
1013
1014    def _input_request(self, prompt, ident, parent, password=False):
1015        # Flush output before making the request.
1016        sys.stderr.flush()
1017        sys.stdout.flush()
1018
1019        # flush the stdin socket, to purge stale replies
1020        while True:
1021            try:
1022                self.stdin_socket.recv_multipart(zmq.NOBLOCK)
1023            except zmq.ZMQError as e:
1024                if e.errno == zmq.EAGAIN:
1025                    break
1026                else:
1027                    raise
1028
1029        # Send the input request.
1030        content = json_clean(dict(prompt=prompt, password=password))
1031        self.session.send(self.stdin_socket, 'input_request', content, parent,
1032                          ident=ident)
1033
1034        # Await a response.
1035        while True:
1036            try:
1037                # Use polling with select() so KeyboardInterrupts can get
1038                # through; doing a blocking recv() means stdin reads are
1039                # uninterruptible on Windows. We need a timeout because
1040                # zmq.select() is also uninterruptible, but at least this
1041                # way reads get noticed immediately and KeyboardInterrupts
1042                # get noticed fairly quickly by human response time standards.
1043                rlist, _, xlist = zmq.select(
1044                    [self.stdin_socket], [], [self.stdin_socket], 0.01
1045                )
1046                if rlist or xlist:
1047                    ident, reply = self.session.recv(self.stdin_socket)
1048                    if (ident, reply) != (None, None):
1049                        break
1050            except KeyboardInterrupt:
1051                # re-raise KeyboardInterrupt, to truncate traceback
1052                raise KeyboardInterrupt("Interrupted by user") from None
1053            except Exception:
1054                self.log.warning("Invalid Message:", exc_info=True)
1055
1056        try:
1057            value = reply["content"]["value"]
1058        except Exception:
1059            self.log.error("Bad input_reply: %s", parent)
1060            value = ''
1061        if value == '\x04':
1062            # EOF
1063            raise EOFError
1064        return value
1065
1066    def _at_shutdown(self):
1067        """Actions taken at shutdown by the kernel, called by python's atexit.
1068        """
1069        if self._shutdown_message is not None:
1070            self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
1071            self.log.debug("%s", self._shutdown_message)
1072        self.control_stream.flush(zmq.POLLOUT)
1073