1""" 2Based on 3https://github.com/jupyter/notebook/blob/master/notebook/static/services/kernels/comm.js 4https://github.com/ipython/ipykernel/blob/master/ipykernel/comm/manager.py 5https://github.com/ipython/ipykernel/blob/master/ipykernel/comm/comm.py 6 7 8Which are distributed under the terms of the Modified BSD License. 9""" 10import logging 11 12from traitlets.config import LoggingConfigurable 13 14from ipython_genutils.importstring import import_item 15 16import uuid 17 18from qtpy import QtCore 19from qtconsole.util import MetaQObjectHasTraits, SuperQObject 20 21 22class CommManager(MetaQObjectHasTraits( 23 'NewBase', (LoggingConfigurable, SuperQObject), {})): 24 """ 25 Manager for Comms in the Frontend 26 """ 27 28 def __init__(self, kernel_client, *args, **kwargs): 29 super().__init__(*args, **kwargs) 30 self.comms = {} 31 self.targets = {} 32 if kernel_client: 33 self.init_kernel_client(kernel_client) 34 35 def init_kernel_client(self, kernel_client): 36 """ 37 connect the kernel, and register message handlers 38 """ 39 self.kernel_client = kernel_client 40 kernel_client.iopub_channel.message_received.connect(self._dispatch) 41 42 @QtCore.Slot(object) 43 def _dispatch(self, msg): 44 """Dispatch messages""" 45 msg_type = msg['header']['msg_type'] 46 handled_msg_types = ['comm_open', 'comm_msg', 'comm_close'] 47 if msg_type in handled_msg_types: 48 getattr(self, msg_type)(msg) 49 50 def new_comm(self, target_name, data=None, metadata=None, 51 comm_id=None, buffers=None): 52 """ 53 Create a new Comm, register it, and open its Kernel-side counterpart 54 Mimics the auto-registration in `Comm.__init__` in the Jupyter Comm. 55 56 argument comm_id is optional 57 """ 58 comm = Comm(target_name, self.kernel_client, comm_id) 59 self.register_comm(comm) 60 try: 61 comm.open(data, metadata, buffers) 62 except Exception: 63 self.unregister_comm(comm) 64 raise 65 return comm 66 67 def register_target(self, target_name, f): 68 """Register a callable f for a given target name 69 70 f will be called with two arguments when a comm_open message is 71 received with `target`: 72 73 - the Comm instance 74 - the `comm_open` message itself. 75 76 f can be a Python callable or an import string for one. 77 """ 78 if isinstance(f, str): 79 f = import_item(f) 80 81 self.targets[target_name] = f 82 83 def unregister_target(self, target_name, f): 84 """Unregister a callable registered with register_target""" 85 return self.targets.pop(target_name) 86 87 def register_comm(self, comm): 88 """Register a new comm""" 89 comm_id = comm.comm_id 90 comm.kernel_client = self.kernel_client 91 self.comms[comm_id] = comm 92 comm.sig_is_closing.connect(self.unregister_comm) 93 return comm_id 94 95 @QtCore.Slot(object) 96 def unregister_comm(self, comm): 97 """Unregister a comm, and close its counterpart.""" 98 # unlike get_comm, this should raise a KeyError 99 comm.sig_is_closing.disconnect(self.unregister_comm) 100 self.comms.pop(comm.comm_id) 101 102 def get_comm(self, comm_id, closing=False): 103 """Get a comm with a particular id 104 105 Returns the comm if found, otherwise None. 106 107 This will not raise an error, 108 it will log messages if the comm cannot be found. 109 If the comm is closing, it might already have closed, 110 so this is ignored. 111 """ 112 try: 113 return self.comms[comm_id] 114 except KeyError: 115 if closing: 116 return 117 self.log.warning("No such comm: %s", comm_id) 118 # don't create the list of keys if debug messages aren't enabled 119 if self.log.isEnabledFor(logging.DEBUG): 120 self.log.debug("Current comms: %s", list(self.comms.keys())) 121 122 # comm message handlers 123 def comm_open(self, msg): 124 """Handler for comm_open messages""" 125 content = msg['content'] 126 comm_id = content['comm_id'] 127 target_name = content['target_name'] 128 f = self.targets.get(target_name, None) 129 130 comm = Comm(target_name, self.kernel_client, comm_id) 131 self.register_comm(comm) 132 133 if f is None: 134 self.log.error("No such comm target registered: %s", target_name) 135 else: 136 try: 137 f(comm, msg) 138 return 139 except Exception: 140 self.log.error("Exception opening comm with target: %s", 141 target_name, exc_info=True) 142 143 # Failure. 144 try: 145 comm.close() 146 except Exception: 147 self.log.error( 148 "Could not close comm during `comm_open` failure " 149 "clean-up. The comm may not have been opened yet.""", 150 exc_info=True) 151 152 def comm_close(self, msg): 153 """Handler for comm_close messages""" 154 content = msg['content'] 155 comm_id = content['comm_id'] 156 comm = self.get_comm(comm_id, closing=True) 157 if comm is None: 158 return 159 160 self.unregister_comm(comm) 161 162 try: 163 comm.handle_close(msg) 164 except Exception: 165 self.log.error('Exception in comm_close for %s', comm_id, 166 exc_info=True) 167 168 def comm_msg(self, msg): 169 """Handler for comm_msg messages""" 170 content = msg['content'] 171 comm_id = content['comm_id'] 172 comm = self.get_comm(comm_id) 173 if comm is None: 174 return 175 try: 176 comm.handle_msg(msg) 177 except Exception: 178 self.log.error('Exception in comm_msg for %s', comm_id, 179 exc_info=True) 180 181 182class Comm(MetaQObjectHasTraits( 183 'NewBase', (LoggingConfigurable, SuperQObject), {})): 184 """ 185 Comm base class 186 """ 187 sig_is_closing = QtCore.Signal(object) 188 189 def __init__(self, target_name, kernel_client, comm_id=None, 190 msg_callback=None, close_callback=None): 191 """ 192 Create a new comm. Must call open to use. 193 """ 194 super().__init__(target_name=target_name) 195 self.target_name = target_name 196 self.kernel_client = kernel_client 197 if comm_id is None: 198 comm_id = uuid.uuid1().hex 199 self.comm_id = comm_id 200 self._msg_callback = msg_callback 201 self._close_callback = close_callback 202 self._send_channel = self.kernel_client.shell_channel 203 204 def _send_msg(self, msg_type, content, data, metadata, buffers): 205 """ 206 Send a message on the shell channel. 207 """ 208 if data is None: 209 data = {} 210 if content is None: 211 content = {} 212 content['comm_id'] = self.comm_id 213 content['data'] = data 214 215 msg = self.kernel_client.session.msg( 216 msg_type, content, metadata=metadata) 217 if buffers: 218 msg['buffers'] = buffers 219 return self._send_channel.send(msg) 220 221 # methods for sending messages 222 def open(self, data=None, metadata=None, buffers=None): 223 """Open the kernel-side version of this comm""" 224 return self._send_msg( 225 'comm_open', {'target_name': self.target_name}, 226 data, metadata, buffers) 227 228 def send(self, data=None, metadata=None, buffers=None): 229 """Send a message to the kernel-side version of this comm""" 230 return self._send_msg( 231 'comm_msg', {}, data, metadata, buffers) 232 233 def close(self, data=None, metadata=None, buffers=None): 234 """Close the kernel-side version of this comm""" 235 self.sig_is_closing.emit(self) 236 return self._send_msg( 237 'comm_close', {}, data, metadata, buffers) 238 239 # methods for registering callbacks for incoming messages 240 241 def on_msg(self, callback): 242 """Register a callback for comm_msg 243 244 Will be called with the `data` of any comm_msg messages. 245 246 Call `on_msg(None)` to disable an existing callback. 247 """ 248 self._msg_callback = callback 249 250 def on_close(self, callback): 251 """Register a callback for comm_close 252 253 Will be called with the `data` of the close message. 254 255 Call `on_close(None)` to disable an existing callback. 256 """ 257 self._close_callback = callback 258 259 # methods for handling incoming messages 260 def handle_msg(self, msg): 261 """Handle a comm_msg message""" 262 self.log.debug("handle_msg[%s](%s)", self.comm_id, msg) 263 if self._msg_callback: 264 return self._msg_callback(msg) 265 266 def handle_close(self, msg): 267 """Handle a comm_close message""" 268 self.log.debug("handle_close[%s](%s)", self.comm_id, msg) 269 if self._close_callback: 270 return self._close_callback(msg) 271 272 273__all__ = ['CommManager'] 274