1"""Base class for a kernel that talks to frontends over 0MQ.""" 2 3# Copyright (c) IPython Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6import asyncio 7import concurrent.futures 8from datetime import datetime 9from functools import partial 10import itertools 11import logging 12import inspect 13import os 14from signal import signal, default_int_handler, SIGINT 15import sys 16import time 17import uuid 18import warnings 19 20try: 21 # jupyter_client >= 5, use tz-aware now 22 from jupyter_client.session import utcnow as now 23except ImportError: 24 # jupyter_client < 5, use local now() 25 now = datetime.now 26 27from tornado import ioloop 28from tornado.queues import Queue 29import zmq 30from zmq.eventloop.zmqstream import ZMQStream 31 32from traitlets.config.configurable import SingletonConfigurable 33from IPython.core.error import StdinNotImplementedError 34from ipykernel.jsonutil import json_clean 35from traitlets import ( 36 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, 37 observe, default 38) 39 40from jupyter_client.session import Session 41 42from ._version import kernel_protocol_version 43 44 45class Kernel(SingletonConfigurable): 46 47 #--------------------------------------------------------------------------- 48 # Kernel interface 49 #--------------------------------------------------------------------------- 50 51 # attribute to override with a GUI 52 eventloop = Any(None) 53 54 @observe('eventloop') 55 def _update_eventloop(self, change): 56 """schedule call to eventloop from IOLoop""" 57 loop = ioloop.IOLoop.current() 58 if change.new is not None: 59 loop.add_callback(self.enter_eventloop) 60 61 session = Instance(Session, allow_none=True) 62 profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) 63 shell_stream = Instance(ZMQStream, allow_none=True) 64 65 shell_streams = List( 66 help="""Deprecated shell_streams alias. Use shell_stream 67 68 .. versionchanged:: 6.0 69 shell_streams is deprecated. Use shell_stream. 70 """ 71 ) 72 73 @default("shell_streams") 74 def _shell_streams_default(self): 75 warnings.warn( 76 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", 77 DeprecationWarning, 78 stacklevel=2, 79 ) 80 if self.shell_stream is not None: 81 return [self.shell_stream] 82 else: 83 return [] 84 85 @observe("shell_streams") 86 def _shell_streams_changed(self, change): 87 warnings.warn( 88 "Kernel.shell_streams is deprecated in ipykernel 6.0. Use Kernel.shell_stream", 89 DeprecationWarning, 90 stacklevel=2, 91 ) 92 if len(change.new) > 1: 93 warnings.warn( 94 "Kernel only supports one shell stream. Additional streams will be ignored.", 95 RuntimeWarning, 96 stacklevel=2, 97 ) 98 if change.new: 99 self.shell_stream = change.new[0] 100 101 control_stream = Instance(ZMQStream, allow_none=True) 102 103 debug_shell_socket = Any() 104 105 control_thread = Any() 106 iopub_socket = Any() 107 iopub_thread = Any() 108 stdin_socket = Any() 109 log = Instance(logging.Logger, allow_none=True) 110 111 # identities: 112 int_id = Integer(-1) 113 ident = Unicode() 114 115 @default('ident') 116 def _default_ident(self): 117 return str(uuid.uuid4()) 118 119 # This should be overridden by wrapper kernels that implement any real 120 # language. 121 language_info = {} 122 123 # any links that should go in the help menu 124 help_links = List() 125 126 # Private interface 127 128 _darwin_app_nap = Bool(True, 129 help="""Whether to use appnope for compatibility with OS X App Nap. 130 131 Only affects OS X >= 10.9. 132 """ 133 ).tag(config=True) 134 135 # track associations with current request 136 _allow_stdin = Bool(False) 137 _parents = Dict({"shell": {}, "control": {}}) 138 _parent_ident = Dict({'shell': b'', 'control': b''}) 139 140 @property 141 def _parent_header(self): 142 warnings.warn( 143 "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()", 144 DeprecationWarning, 145 stacklevel=2, 146 ) 147 return self.get_parent(channel="shell") 148 149 # Time to sleep after flushing the stdout/err buffers in each execute 150 # cycle. While this introduces a hard limit on the minimal latency of the 151 # execute cycle, it helps prevent output synchronization problems for 152 # clients. 153 # Units are in seconds. The minimum zmq latency on local host is probably 154 # ~150 microseconds, set this to 500us for now. We may need to increase it 155 # a little if it's not enough after more interactive testing. 156 _execute_sleep = Float(0.0005).tag(config=True) 157 158 # Frequency of the kernel's event loop. 159 # Units are in seconds, kernel subclasses for GUI toolkits may need to 160 # adapt to milliseconds. 161 _poll_interval = Float(0.01).tag(config=True) 162 163 stop_on_error_timeout = Float( 164 0.0, 165 config=True, 166 help="""time (in seconds) to wait for messages to arrive 167 when aborting queued requests after an error. 168 169 Requests that arrive within this window after an error 170 will be cancelled. 171 172 Increase in the event of unusually slow network 173 causing significant delays, 174 which can manifest as e.g. "Run all" in a notebook 175 aborting some, but not all, messages after an error. 176 """ 177 ) 178 179 # If the shutdown was requested over the network, we leave here the 180 # necessary reply message so it can be sent by our registered atexit 181 # handler. This ensures that the reply is only sent to clients truly at 182 # the end of our shutdown process (which happens after the underlying 183 # IPython shell's own shutdown). 184 _shutdown_message = None 185 186 # This is a dict of port number that the kernel is listening on. It is set 187 # by record_ports and used by connect_request. 188 _recorded_ports = Dict() 189 190 # set of aborted msg_ids 191 aborted = Set() 192 193 # Track execution count here. For IPython, we override this to use the 194 # execution count we store in the shell. 195 execution_count = 0 196 197 msg_types = [ 198 'execute_request', 'complete_request', 199 'inspect_request', 'history_request', 200 'comm_info_request', 'kernel_info_request', 201 'connect_request', 'shutdown_request', 202 'is_complete_request', 'interrupt_request', 203 # deprecated: 204 'apply_request', 205 ] 206 # add deprecated ipyparallel control messages 207 control_msg_types = msg_types + ['clear_request', 'abort_request', 'debug_request'] 208 209 def __init__(self, **kwargs): 210 super(Kernel, self).__init__(**kwargs) 211 # Build dict of handlers for message types 212 self.shell_handlers = {} 213 for msg_type in self.msg_types: 214 self.shell_handlers[msg_type] = getattr(self, msg_type) 215 216 self.control_handlers = {} 217 for msg_type in self.control_msg_types: 218 self.control_handlers[msg_type] = getattr(self, msg_type) 219 220 self.control_queue = Queue() 221 222 def dispatch_control(self, msg): 223 self.control_queue.put_nowait(msg) 224 225 async def poll_control_queue(self): 226 while True: 227 msg = await self.control_queue.get() 228 # handle tracers from _flush_control_queue 229 if isinstance(msg, (concurrent.futures.Future, asyncio.Future)): 230 msg.set_result(None) 231 continue 232 await self.process_control(msg) 233 234 async def _flush_control_queue(self): 235 """Flush the control queue, wait for processing of any pending messages""" 236 if self.control_thread: 237 control_loop = self.control_thread.io_loop 238 # concurrent.futures.Futures are threadsafe 239 # and can be used to await across threads 240 tracer_future = concurrent.futures.Future() 241 awaitable_future = asyncio.wrap_future(tracer_future) 242 else: 243 control_loop = self.io_loop 244 tracer_future = awaitable_future = asyncio.Future() 245 246 def _flush(): 247 # control_stream.flush puts messages on the queue 248 self.control_stream.flush() 249 # put Future on the queue after all of those, 250 # so we can wait for all queued messages to be processed 251 self.control_queue.put(tracer_future) 252 253 control_loop.add_callback(_flush) 254 return awaitable_future 255 256 async def process_control(self, msg): 257 """dispatch control requests""" 258 idents, msg = self.session.feed_identities(msg, copy=False) 259 try: 260 msg = self.session.deserialize(msg, content=True, copy=False) 261 except Exception: 262 self.log.error("Invalid Control Message", exc_info=True) 263 return 264 265 self.log.debug("Control received: %s", msg) 266 267 # Set the parent message for side effects. 268 self.set_parent(idents, msg, channel='control') 269 self._publish_status('busy', 'control') 270 271 header = msg['header'] 272 msg_type = header['msg_type'] 273 274 handler = self.control_handlers.get(msg_type, None) 275 if handler is None: 276 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) 277 else: 278 try: 279 result = handler(self.control_stream, idents, msg) 280 if inspect.isawaitable(result): 281 await result 282 except Exception: 283 self.log.error("Exception in control handler:", exc_info=True) 284 285 sys.stdout.flush() 286 sys.stderr.flush() 287 self._publish_status('idle', 'control') 288 # flush to ensure reply is sent 289 self.control_stream.flush(zmq.POLLOUT) 290 291 def should_handle(self, stream, msg, idents): 292 """Check whether a shell-channel message should be handled 293 294 Allows subclasses to prevent handling of certain messages (e.g. aborted requests). 295 """ 296 msg_id = msg['header']['msg_id'] 297 if msg_id in self.aborted: 298 # is it safe to assume a msg_id will not be resubmitted? 299 self.aborted.remove(msg_id) 300 self._send_abort_reply(stream, msg, idents) 301 return False 302 return True 303 304 async def dispatch_shell(self, msg): 305 """dispatch shell requests""" 306 307 # flush control queue before handling shell requests 308 await self._flush_control_queue() 309 310 idents, msg = self.session.feed_identities(msg, copy=False) 311 try: 312 msg = self.session.deserialize(msg, content=True, copy=False) 313 except Exception: 314 self.log.error("Invalid Message", exc_info=True) 315 return 316 317 # Set the parent message for side effects. 318 self.set_parent(idents, msg, channel='shell') 319 self._publish_status('busy', 'shell') 320 321 msg_type = msg['header']['msg_type'] 322 323 # Only abort execute requests 324 if self._aborting and msg_type == 'execute_request': 325 self._send_abort_reply(self.shell_stream, msg, idents) 326 self._publish_status('idle', 'shell') 327 # flush to ensure reply is sent before 328 # handling the next request 329 self.shell_stream.flush(zmq.POLLOUT) 330 return 331 332 # Print some info about this message and leave a '--->' marker, so it's 333 # easier to trace visually the message chain when debugging. Each 334 # handler prints its message at the end. 335 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) 336 self.log.debug(' Content: %s\n --->\n ', msg['content']) 337 338 if not self.should_handle(self.shell_stream, msg, idents): 339 return 340 341 handler = self.shell_handlers.get(msg_type, None) 342 if handler is None: 343 self.log.warning("Unknown message type: %r", msg_type) 344 else: 345 self.log.debug("%s: %s", msg_type, msg) 346 try: 347 self.pre_handler_hook() 348 except Exception: 349 self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) 350 try: 351 result = handler(self.shell_stream, idents, msg) 352 if inspect.isawaitable(result): 353 await result 354 except Exception: 355 self.log.error("Exception in message handler:", exc_info=True) 356 except KeyboardInterrupt: 357 # Ctrl-c shouldn't crash the kernel here. 358 self.log.error("KeyboardInterrupt caught in kernel.") 359 finally: 360 try: 361 self.post_handler_hook() 362 except Exception: 363 self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) 364 365 sys.stdout.flush() 366 sys.stderr.flush() 367 self._publish_status('idle', 'shell') 368 # flush to ensure reply is sent before 369 # handling the next request 370 self.shell_stream.flush(zmq.POLLOUT) 371 372 def pre_handler_hook(self): 373 """Hook to execute before calling message handler""" 374 # ensure default_int_handler during handler call 375 self.saved_sigint_handler = signal(SIGINT, default_int_handler) 376 377 def post_handler_hook(self): 378 """Hook to execute after calling message handler""" 379 signal(SIGINT, self.saved_sigint_handler) 380 381 def enter_eventloop(self): 382 """enter eventloop""" 383 self.log.info("Entering eventloop %s", self.eventloop) 384 # record handle, so we can check when this changes 385 eventloop = self.eventloop 386 if eventloop is None: 387 self.log.info("Exiting as there is no eventloop") 388 return 389 390 def advance_eventloop(): 391 # check if eventloop changed: 392 if self.eventloop is not eventloop: 393 self.log.info("exiting eventloop %s", eventloop) 394 return 395 if self.msg_queue.qsize(): 396 self.log.debug("Delaying eventloop due to waiting messages") 397 # still messages to process, make the eventloop wait 398 schedule_next() 399 return 400 self.log.debug("Advancing eventloop %s", eventloop) 401 try: 402 eventloop(self) 403 except KeyboardInterrupt: 404 # Ctrl-C shouldn't crash the kernel 405 self.log.error("KeyboardInterrupt caught in kernel") 406 pass 407 if self.eventloop is eventloop: 408 # schedule advance again 409 schedule_next() 410 411 def schedule_next(): 412 """Schedule the next advance of the eventloop""" 413 # flush the eventloop every so often, 414 # giving us a chance to handle messages in the meantime 415 self.log.debug("Scheduling eventloop advance") 416 self.io_loop.call_later(0.001, advance_eventloop) 417 418 # begin polling the eventloop 419 schedule_next() 420 421 async def do_one_iteration(self): 422 """Process a single shell message 423 424 Any pending control messages will be flushed as well 425 426 .. versionchanged:: 5 427 This is now a coroutine 428 """ 429 # flush messages off of shell stream into the message queue 430 self.shell_stream.flush() 431 # process at most one shell message per iteration 432 await self.process_one(wait=False) 433 434 async def process_one(self, wait=True): 435 """Process one request 436 437 Returns None if no message was handled. 438 """ 439 if wait: 440 t, dispatch, args = await self.msg_queue.get() 441 else: 442 try: 443 t, dispatch, args = self.msg_queue.get_nowait() 444 except asyncio.QueueEmpty: 445 return None 446 await dispatch(*args) 447 448 async def dispatch_queue(self): 449 """Coroutine to preserve order of message handling 450 451 Ensures that only one message is processing at a time, 452 even when the handler is async 453 """ 454 455 while True: 456 try: 457 await self.process_one() 458 except Exception: 459 self.log.exception("Error in message handler") 460 461 _message_counter = Any( 462 help="""Monotonic counter of messages 463 """, 464 ) 465 @default('_message_counter') 466 def _message_counter_default(self): 467 return itertools.count() 468 469 def schedule_dispatch(self, dispatch, *args): 470 """schedule a message for dispatch""" 471 idx = next(self._message_counter) 472 473 self.msg_queue.put_nowait( 474 ( 475 idx, 476 dispatch, 477 args, 478 ) 479 ) 480 # ensure the eventloop wakes up 481 self.io_loop.add_callback(lambda: None) 482 483 def start(self): 484 """register dispatchers for streams""" 485 self.io_loop = ioloop.IOLoop.current() 486 self.msg_queue = Queue() 487 self.io_loop.add_callback(self.dispatch_queue) 488 489 self.control_stream.on_recv(self.dispatch_control, copy=False) 490 491 if self.control_thread: 492 control_loop = self.control_thread.io_loop 493 else: 494 control_loop = self.io_loop 495 496 asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop.asyncio_loop) 497 498 self.shell_stream.on_recv( 499 partial( 500 self.schedule_dispatch, 501 self.dispatch_shell, 502 ), 503 copy=False, 504 ) 505 506 # publish idle status 507 self._publish_status('starting', 'shell') 508 509 510 def record_ports(self, ports): 511 """Record the ports that this kernel is using. 512 513 The creator of the Kernel instance must call this methods if they 514 want the :meth:`connect_request` method to return the port numbers. 515 """ 516 self._recorded_ports = ports 517 518 #--------------------------------------------------------------------------- 519 # Kernel request handlers 520 #--------------------------------------------------------------------------- 521 522 def _publish_execute_input(self, code, parent, execution_count): 523 """Publish the code request on the iopub stream.""" 524 525 self.session.send(self.iopub_socket, 'execute_input', 526 {'code':code, 'execution_count': execution_count}, 527 parent=parent, ident=self._topic('execute_input') 528 ) 529 530 def _publish_status(self, status, channel, parent=None): 531 """send status (busy/idle) on IOPub""" 532 self.session.send( 533 self.iopub_socket, 534 "status", 535 {"execution_state": status}, 536 parent=parent or self.get_parent(channel), 537 ident=self._topic("status"), 538 ) 539 540 def _publish_debug_event(self, event): 541 self.session.send( 542 self.iopub_socket, 543 "debug_event", 544 event, 545 parent=self.get_parent("control"), 546 ident=self._topic("debug_event"), 547 ) 548 549 def set_parent(self, ident, parent, channel='shell'): 550 """Set the current parent request 551 552 Side effects (IOPub messages) and replies are associated with 553 the request that caused them via the parent_header. 554 555 The parent identity is used to route input_request messages 556 on the stdin channel. 557 """ 558 self._parent_ident[channel] = ident 559 self._parents[channel] = parent 560 561 def get_parent(self, channel="shell"): 562 """Get the parent request associated with a channel. 563 564 .. versionadded:: 6 565 566 Parameters 567 ---------- 568 channel : str 569 the name of the channel ('shell' or 'control') 570 571 Returns 572 ------- 573 message : dict 574 the parent message for the most recent request on the channel. 575 """ 576 return self._parents.get(channel, {}) 577 578 def send_response(self, stream, msg_or_type, content=None, ident=None, 579 buffers=None, track=False, header=None, metadata=None, channel='shell'): 580 """Send a response to the message we're currently processing. 581 582 This accepts all the parameters of :meth:`jupyter_client.session.Session.send` 583 except ``parent``. 584 585 This relies on :meth:`set_parent` having been called for the current 586 message. 587 """ 588 return self.session.send( 589 stream, 590 msg_or_type, 591 content, 592 self.get_parent(channel), 593 ident, 594 buffers, 595 track, 596 header, 597 metadata, 598 ) 599 600 def init_metadata(self, parent): 601 """Initialize metadata. 602 603 Run at the beginning of execution requests. 604 """ 605 # FIXME: `started` is part of ipyparallel 606 # Remove for ipykernel 5.0 607 return { 608 'started': now(), 609 } 610 611 def finish_metadata(self, parent, metadata, reply_content): 612 """Finish populating metadata. 613 614 Run after completing an execution request. 615 """ 616 return metadata 617 618 async def execute_request(self, stream, ident, parent): 619 """handle an execute_request""" 620 621 try: 622 content = parent['content'] 623 code = content['code'] 624 silent = content['silent'] 625 store_history = content.get('store_history', not silent) 626 user_expressions = content.get('user_expressions', {}) 627 allow_stdin = content.get('allow_stdin', False) 628 except Exception: 629 self.log.error("Got bad msg: ") 630 self.log.error("%s", parent) 631 return 632 633 stop_on_error = content.get('stop_on_error', True) 634 635 metadata = self.init_metadata(parent) 636 637 # Re-broadcast our input for the benefit of listening clients, and 638 # start computing output 639 if not silent: 640 self.execution_count += 1 641 self._publish_execute_input(code, parent, self.execution_count) 642 643 reply_content = self.do_execute( 644 code, silent, store_history, 645 user_expressions, allow_stdin, 646 ) 647 if inspect.isawaitable(reply_content): 648 reply_content = await reply_content 649 650 # Flush output before sending the reply. 651 sys.stdout.flush() 652 sys.stderr.flush() 653 # FIXME: on rare occasions, the flush doesn't seem to make it to the 654 # clients... This seems to mitigate the problem, but we definitely need 655 # to better understand what's going on. 656 if self._execute_sleep: 657 time.sleep(self._execute_sleep) 658 659 # Send the reply. 660 reply_content = json_clean(reply_content) 661 metadata = self.finish_metadata(parent, metadata, reply_content) 662 663 reply_msg = self.session.send(stream, 'execute_reply', 664 reply_content, parent, metadata=metadata, 665 ident=ident) 666 667 self.log.debug("%s", reply_msg) 668 669 if not silent and reply_msg['content']['status'] == 'error' and stop_on_error: 670 await self._abort_queues() 671 672 def do_execute(self, code, silent, store_history=True, 673 user_expressions=None, allow_stdin=False): 674 """Execute user code. Must be overridden by subclasses. 675 """ 676 raise NotImplementedError 677 678 async def complete_request(self, stream, ident, parent): 679 content = parent['content'] 680 code = content['code'] 681 cursor_pos = content['cursor_pos'] 682 683 matches = self.do_complete(code, cursor_pos) 684 if inspect.isawaitable(matches): 685 matches = await matches 686 687 matches = json_clean(matches) 688 self.session.send(stream, "complete_reply", matches, parent, ident) 689 690 def do_complete(self, code, cursor_pos): 691 """Override in subclasses to find completions. 692 """ 693 return {'matches' : [], 694 'cursor_end' : cursor_pos, 695 'cursor_start' : cursor_pos, 696 'metadata' : {}, 697 'status' : 'ok'} 698 699 async def inspect_request(self, stream, ident, parent): 700 content = parent['content'] 701 702 reply_content = self.do_inspect( 703 content['code'], content['cursor_pos'], 704 content.get('detail_level', 0), 705 set(content.get('omit_sections', [])), 706 ) 707 if inspect.isawaitable(reply_content): 708 reply_content = await reply_content 709 710 # Before we send this object over, we scrub it for JSON usage 711 reply_content = json_clean(reply_content) 712 msg = self.session.send(stream, 'inspect_reply', 713 reply_content, parent, ident) 714 self.log.debug("%s", msg) 715 716 def do_inspect(self, code, cursor_pos, detail_level=0, omit_sections=()): 717 """Override in subclasses to allow introspection. 718 """ 719 return {'status': 'ok', 'data': {}, 'metadata': {}, 'found': False} 720 721 async def history_request(self, stream, ident, parent): 722 content = parent['content'] 723 724 reply_content = self.do_history(**content) 725 if inspect.isawaitable(reply_content): 726 reply_content = await reply_content 727 728 reply_content = json_clean(reply_content) 729 msg = self.session.send(stream, 'history_reply', 730 reply_content, parent, ident) 731 self.log.debug("%s", msg) 732 733 def do_history(self, hist_access_type, output, raw, session=None, start=None, 734 stop=None, n=None, pattern=None, unique=False): 735 """Override in subclasses to access history. 736 """ 737 return {'status': 'ok', 'history': []} 738 739 async def connect_request(self, stream, ident, parent): 740 if self._recorded_ports is not None: 741 content = self._recorded_ports.copy() 742 else: 743 content = {} 744 content['status'] = 'ok' 745 msg = self.session.send(stream, 'connect_reply', 746 content, parent, ident) 747 self.log.debug("%s", msg) 748 749 @property 750 def kernel_info(self): 751 return { 752 'protocol_version': kernel_protocol_version, 753 'implementation': self.implementation, 754 'implementation_version': self.implementation_version, 755 'language_info': self.language_info, 756 'banner': self.banner, 757 'help_links': self.help_links, 758 } 759 760 async def kernel_info_request(self, stream, ident, parent): 761 content = {'status': 'ok'} 762 content.update(self.kernel_info) 763 msg = self.session.send(stream, 'kernel_info_reply', 764 content, parent, ident) 765 self.log.debug("%s", msg) 766 767 async def comm_info_request(self, stream, ident, parent): 768 content = parent['content'] 769 target_name = content.get('target_name', None) 770 771 # Should this be moved to ipkernel? 772 if hasattr(self, 'comm_manager'): 773 comms = { 774 k: dict(target_name=v.target_name) 775 for (k, v) in self.comm_manager.comms.items() 776 if v.target_name == target_name or target_name is None 777 } 778 else: 779 comms = {} 780 reply_content = dict(comms=comms, status='ok') 781 msg = self.session.send(stream, 'comm_info_reply', 782 reply_content, parent, ident) 783 self.log.debug("%s", msg) 784 785 async def interrupt_request(self, stream, ident, parent): 786 pid = os.getpid() 787 pgid = os.getpgid(pid) 788 789 if os.name == "nt": 790 self.log.error("Interrupt message not supported on Windows") 791 792 else: 793 # Prefer process-group over process 794 if pgid and hasattr(os, "killpg"): 795 try: 796 os.killpg(pgid, SIGINT) 797 return 798 except OSError: 799 pass 800 try: 801 os.kill(pid, SIGINT) 802 except OSError: 803 pass 804 805 content = parent['content'] 806 self.session.send(stream, 'interrupt_reply', content, parent, ident=ident) 807 return 808 809 async def shutdown_request(self, stream, ident, parent): 810 content = self.do_shutdown(parent['content']['restart']) 811 if inspect.isawaitable(content): 812 content = await content 813 self.session.send(stream, 'shutdown_reply', content, parent, ident=ident) 814 # same content, but different msg_id for broadcasting on IOPub 815 self._shutdown_message = self.session.msg('shutdown_reply', 816 content, parent 817 ) 818 819 self._at_shutdown() 820 821 self.log.debug('Stopping control ioloop') 822 control_io_loop = self.control_stream.io_loop 823 control_io_loop.add_callback(control_io_loop.stop) 824 825 self.log.debug('Stopping shell ioloop') 826 shell_io_loop = self.shell_stream.io_loop 827 shell_io_loop.add_callback(shell_io_loop.stop) 828 829 def do_shutdown(self, restart): 830 """Override in subclasses to do things when the frontend shuts down the 831 kernel. 832 """ 833 return {'status': 'ok', 'restart': restart} 834 835 async def is_complete_request(self, stream, ident, parent): 836 content = parent['content'] 837 code = content['code'] 838 839 reply_content = self.do_is_complete(code) 840 if inspect.isawaitable(reply_content): 841 reply_content = await reply_content 842 reply_content = json_clean(reply_content) 843 reply_msg = self.session.send(stream, 'is_complete_reply', 844 reply_content, parent, ident) 845 self.log.debug("%s", reply_msg) 846 847 def do_is_complete(self, code): 848 """Override in subclasses to find completions. 849 """ 850 return { 'status' : 'unknown'} 851 852 async def debug_request(self, stream, ident, parent): 853 content = parent['content'] 854 855 reply_content = self.do_debug_request(content) 856 if inspect.isawaitable(reply_content): 857 reply_content = await reply_content 858 reply_content = json_clean(reply_content) 859 reply_msg = self.session.send(stream, 'debug_reply', reply_content, 860 parent, ident) 861 self.log.debug("%s", reply_msg) 862 863 async def do_debug_request(self, msg): 864 raise NotImplementedError 865 866 #--------------------------------------------------------------------------- 867 # Engine methods (DEPRECATED) 868 #--------------------------------------------------------------------------- 869 870 async def apply_request(self, stream, ident, parent): 871 self.log.warning("apply_request is deprecated in kernel_base, moving to ipyparallel.") 872 try: 873 content = parent['content'] 874 bufs = parent['buffers'] 875 msg_id = parent['header']['msg_id'] 876 except Exception: 877 self.log.error("Got bad msg: %s", parent, exc_info=True) 878 return 879 880 md = self.init_metadata(parent) 881 882 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) 883 884 # flush i/o 885 sys.stdout.flush() 886 sys.stderr.flush() 887 888 md = self.finish_metadata(parent, md, reply_content) 889 890 self.session.send(stream, 'apply_reply', reply_content, 891 parent=parent, ident=ident,buffers=result_buf, metadata=md) 892 893 def do_apply(self, content, bufs, msg_id, reply_metadata): 894 """DEPRECATED""" 895 raise NotImplementedError 896 897 #--------------------------------------------------------------------------- 898 # Control messages (DEPRECATED) 899 #--------------------------------------------------------------------------- 900 901 async def abort_request(self, stream, ident, parent): 902 """abort a specific msg by id""" 903 self.log.warning("abort_request is deprecated in kernel_base. It is only part of IPython parallel") 904 msg_ids = parent['content'].get('msg_ids', None) 905 if isinstance(msg_ids, str): 906 msg_ids = [msg_ids] 907 if not msg_ids: 908 self._abort_queues() 909 for mid in msg_ids: 910 self.aborted.add(str(mid)) 911 912 content = dict(status='ok') 913 reply_msg = self.session.send(stream, 'abort_reply', content=content, 914 parent=parent, ident=ident) 915 self.log.debug("%s", reply_msg) 916 917 async def clear_request(self, stream, idents, parent): 918 """Clear our namespace.""" 919 self.log.warning("clear_request is deprecated in kernel_base. It is only part of IPython parallel") 920 content = self.do_clear() 921 self.session.send(stream, 'clear_reply', ident=idents, parent=parent, 922 content = content) 923 924 def do_clear(self): 925 """DEPRECATED since 4.0.3""" 926 raise NotImplementedError 927 928 #--------------------------------------------------------------------------- 929 # Protected interface 930 #--------------------------------------------------------------------------- 931 932 def _topic(self, topic): 933 """prefixed topic for IOPub messages""" 934 base = "kernel.%s" % self.ident 935 936 return ("%s.%s" % (base, topic)).encode() 937 938 _aborting = Bool(False) 939 940 async def _abort_queues(self): 941 self.shell_stream.flush() 942 self._aborting = True 943 def stop_aborting(): 944 self.log.info("Finishing abort") 945 self._aborting = False 946 asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting) 947 948 def _send_abort_reply(self, stream, msg, idents): 949 """Send a reply to an aborted request""" 950 self.log.info( 951 f"Aborting {msg['header']['msg_id']}: {msg['header']['msg_type']}" 952 ) 953 reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" 954 status = {"status": "aborted"} 955 md = self.init_metadata(msg) 956 md = self.finish_metadata(msg, md, status) 957 md.update(status) 958 959 self.session.send( 960 stream, reply_type, metadata=md, 961 content=status, parent=msg, ident=idents, 962 ) 963 964 def _no_raw_input(self): 965 """Raise StdinNotImplementedError if active frontend doesn't support 966 stdin.""" 967 raise StdinNotImplementedError("raw_input was called, but this " 968 "frontend does not support stdin.") 969 970 def getpass(self, prompt='', stream=None): 971 """Forward getpass to frontends 972 973 Raises 974 ------ 975 StdinNotImplementedError if active frontend doesn't support stdin. 976 """ 977 if not self._allow_stdin: 978 raise StdinNotImplementedError( 979 "getpass was called, but this frontend does not support input requests." 980 ) 981 if stream is not None: 982 import warnings 983 984 warnings.warn( 985 "The `stream` parameter of `getpass.getpass` will have no effect when using ipykernel", 986 UserWarning, 987 stacklevel=2, 988 ) 989 return self._input_request( 990 prompt, 991 self._parent_ident["shell"], 992 self.get_parent("shell"), 993 password=True, 994 ) 995 996 def raw_input(self, prompt=''): 997 """Forward raw_input to frontends 998 999 Raises 1000 ------ 1001 StdinNotImplementedError if active frontend doesn't support stdin. 1002 """ 1003 if not self._allow_stdin: 1004 raise StdinNotImplementedError( 1005 "raw_input was called, but this frontend does not support input requests." 1006 ) 1007 return self._input_request( 1008 str(prompt), 1009 self._parent_ident["shell"], 1010 self.get_parent("shell"), 1011 password=False, 1012 ) 1013 1014 def _input_request(self, prompt, ident, parent, password=False): 1015 # Flush output before making the request. 1016 sys.stderr.flush() 1017 sys.stdout.flush() 1018 1019 # flush the stdin socket, to purge stale replies 1020 while True: 1021 try: 1022 self.stdin_socket.recv_multipart(zmq.NOBLOCK) 1023 except zmq.ZMQError as e: 1024 if e.errno == zmq.EAGAIN: 1025 break 1026 else: 1027 raise 1028 1029 # Send the input request. 1030 content = json_clean(dict(prompt=prompt, password=password)) 1031 self.session.send(self.stdin_socket, 'input_request', content, parent, 1032 ident=ident) 1033 1034 # Await a response. 1035 while True: 1036 try: 1037 # Use polling with select() so KeyboardInterrupts can get 1038 # through; doing a blocking recv() means stdin reads are 1039 # uninterruptible on Windows. We need a timeout because 1040 # zmq.select() is also uninterruptible, but at least this 1041 # way reads get noticed immediately and KeyboardInterrupts 1042 # get noticed fairly quickly by human response time standards. 1043 rlist, _, xlist = zmq.select( 1044 [self.stdin_socket], [], [self.stdin_socket], 0.01 1045 ) 1046 if rlist or xlist: 1047 ident, reply = self.session.recv(self.stdin_socket) 1048 if (ident, reply) != (None, None): 1049 break 1050 except KeyboardInterrupt: 1051 # re-raise KeyboardInterrupt, to truncate traceback 1052 raise KeyboardInterrupt("Interrupted by user") from None 1053 except Exception: 1054 self.log.warning("Invalid Message:", exc_info=True) 1055 1056 try: 1057 value = reply["content"]["value"] 1058 except Exception: 1059 self.log.error("Bad input_reply: %s", parent) 1060 value = '' 1061 if value == '\x04': 1062 # EOF 1063 raise EOFError 1064 return value 1065 1066 def _at_shutdown(self): 1067 """Actions taken at shutdown by the kernel, called by python's atexit. 1068 """ 1069 if self._shutdown_message is not None: 1070 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) 1071 self.log.debug("%s", self._shutdown_message) 1072 self.control_stream.flush(zmq.POLLOUT) 1073