1"""
2Based on
3https://github.com/jupyter/notebook/blob/master/notebook/static/services/kernels/comm.js
4https://github.com/ipython/ipykernel/blob/master/ipykernel/comm/manager.py
5https://github.com/ipython/ipykernel/blob/master/ipykernel/comm/comm.py
6
7
8Which are distributed under the terms of the Modified BSD License.
9"""
10import logging
11
12from traitlets.config import LoggingConfigurable
13
14from ipython_genutils.importstring import import_item
15
16import uuid
17
18from qtpy import QtCore
19from qtconsole.util import MetaQObjectHasTraits, SuperQObject
20
21
22class CommManager(MetaQObjectHasTraits(
23        'NewBase', (LoggingConfigurable, SuperQObject), {})):
24    """
25    Manager for Comms in the Frontend
26    """
27
28    def __init__(self, kernel_client, *args, **kwargs):
29        super().__init__(*args, **kwargs)
30        self.comms = {}
31        self.targets = {}
32        if kernel_client:
33            self.init_kernel_client(kernel_client)
34
35    def init_kernel_client(self, kernel_client):
36        """
37        connect the kernel, and register message handlers
38        """
39        self.kernel_client = kernel_client
40        kernel_client.iopub_channel.message_received.connect(self._dispatch)
41
42    @QtCore.Slot(object)
43    def _dispatch(self, msg):
44        """Dispatch messages"""
45        msg_type = msg['header']['msg_type']
46        handled_msg_types = ['comm_open', 'comm_msg', 'comm_close']
47        if msg_type in handled_msg_types:
48            getattr(self, msg_type)(msg)
49
50    def new_comm(self, target_name, data=None, metadata=None,
51                 comm_id=None, buffers=None):
52        """
53        Create a new Comm, register it, and open its Kernel-side counterpart
54        Mimics the auto-registration in `Comm.__init__` in the Jupyter Comm.
55
56        argument comm_id is optional
57        """
58        comm = Comm(target_name, self.kernel_client, comm_id)
59        self.register_comm(comm)
60        try:
61            comm.open(data, metadata, buffers)
62        except Exception:
63            self.unregister_comm(comm)
64            raise
65        return comm
66
67    def register_target(self, target_name, f):
68        """Register a callable f for a given target name
69
70        f will be called with two arguments when a comm_open message is
71        received with `target`:
72
73        - the Comm instance
74        - the `comm_open` message itself.
75
76        f can be a Python callable or an import string for one.
77        """
78        if isinstance(f, str):
79            f = import_item(f)
80
81        self.targets[target_name] = f
82
83    def unregister_target(self, target_name, f):
84        """Unregister a callable registered with register_target"""
85        return self.targets.pop(target_name)
86
87    def register_comm(self, comm):
88        """Register a new comm"""
89        comm_id = comm.comm_id
90        comm.kernel_client = self.kernel_client
91        self.comms[comm_id] = comm
92        comm.sig_is_closing.connect(self.unregister_comm)
93        return comm_id
94
95    @QtCore.Slot(object)
96    def unregister_comm(self, comm):
97        """Unregister a comm, and close its counterpart."""
98        # unlike get_comm, this should raise a KeyError
99        comm.sig_is_closing.disconnect(self.unregister_comm)
100        self.comms.pop(comm.comm_id)
101
102    def get_comm(self, comm_id, closing=False):
103        """Get a comm with a particular id
104
105        Returns the comm if found, otherwise None.
106
107        This will not raise an error,
108        it will log messages if the comm cannot be found.
109        If the comm is closing, it might already have closed,
110        so this is ignored.
111        """
112        try:
113            return self.comms[comm_id]
114        except KeyError:
115            if closing:
116                return
117            self.log.warning("No such comm: %s", comm_id)
118            # don't create the list of keys if debug messages aren't enabled
119            if self.log.isEnabledFor(logging.DEBUG):
120                self.log.debug("Current comms: %s", list(self.comms.keys()))
121
122    # comm message handlers
123    def comm_open(self, msg):
124        """Handler for comm_open messages"""
125        content = msg['content']
126        comm_id = content['comm_id']
127        target_name = content['target_name']
128        f = self.targets.get(target_name, None)
129
130        comm = Comm(target_name, self.kernel_client, comm_id)
131        self.register_comm(comm)
132
133        if f is None:
134            self.log.error("No such comm target registered: %s", target_name)
135        else:
136            try:
137                f(comm, msg)
138                return
139            except Exception:
140                self.log.error("Exception opening comm with target: %s",
141                               target_name, exc_info=True)
142
143        # Failure.
144        try:
145            comm.close()
146        except Exception:
147            self.log.error(
148                "Could not close comm during `comm_open` failure "
149                "clean-up.  The comm may not have been opened yet.""",
150                exc_info=True)
151
152    def comm_close(self, msg):
153        """Handler for comm_close messages"""
154        content = msg['content']
155        comm_id = content['comm_id']
156        comm = self.get_comm(comm_id, closing=True)
157        if comm is None:
158            return
159
160        self.unregister_comm(comm)
161
162        try:
163            comm.handle_close(msg)
164        except Exception:
165            self.log.error('Exception in comm_close for %s', comm_id,
166                           exc_info=True)
167
168    def comm_msg(self, msg):
169        """Handler for comm_msg messages"""
170        content = msg['content']
171        comm_id = content['comm_id']
172        comm = self.get_comm(comm_id)
173        if comm is None:
174            return
175        try:
176            comm.handle_msg(msg)
177        except Exception:
178            self.log.error('Exception in comm_msg for %s', comm_id,
179                           exc_info=True)
180
181
182class Comm(MetaQObjectHasTraits(
183        'NewBase', (LoggingConfigurable, SuperQObject), {})):
184    """
185    Comm base class
186    """
187    sig_is_closing = QtCore.Signal(object)
188
189    def __init__(self, target_name, kernel_client, comm_id=None,
190                 msg_callback=None, close_callback=None):
191        """
192        Create a new comm. Must call open to use.
193        """
194        super().__init__(target_name=target_name)
195        self.target_name = target_name
196        self.kernel_client = kernel_client
197        if comm_id is None:
198            comm_id = uuid.uuid1().hex
199        self.comm_id = comm_id
200        self._msg_callback = msg_callback
201        self._close_callback = close_callback
202        self._send_channel = self.kernel_client.shell_channel
203
204    def _send_msg(self, msg_type, content, data, metadata, buffers):
205        """
206        Send a message on the shell channel.
207        """
208        if data is None:
209            data = {}
210        if content is None:
211            content = {}
212        content['comm_id'] = self.comm_id
213        content['data'] = data
214
215        msg = self.kernel_client.session.msg(
216            msg_type, content, metadata=metadata)
217        if buffers:
218            msg['buffers'] = buffers
219        return self._send_channel.send(msg)
220
221    # methods for sending messages
222    def open(self, data=None, metadata=None, buffers=None):
223        """Open the kernel-side version of this comm"""
224        return self._send_msg(
225            'comm_open', {'target_name': self.target_name},
226            data, metadata, buffers)
227
228    def send(self, data=None, metadata=None, buffers=None):
229        """Send a message to the kernel-side version of this comm"""
230        return self._send_msg(
231            'comm_msg', {}, data, metadata, buffers)
232
233    def close(self, data=None, metadata=None, buffers=None):
234        """Close the kernel-side version of this comm"""
235        self.sig_is_closing.emit(self)
236        return self._send_msg(
237            'comm_close', {}, data, metadata, buffers)
238
239    # methods for registering callbacks for incoming messages
240
241    def on_msg(self, callback):
242        """Register a callback for comm_msg
243
244        Will be called with the `data` of any comm_msg messages.
245
246        Call `on_msg(None)` to disable an existing callback.
247        """
248        self._msg_callback = callback
249
250    def on_close(self, callback):
251        """Register a callback for comm_close
252
253        Will be called with the `data` of the close message.
254
255        Call `on_close(None)` to disable an existing callback.
256        """
257        self._close_callback = callback
258
259    # methods for handling incoming messages
260    def handle_msg(self, msg):
261        """Handle a comm_msg message"""
262        self.log.debug("handle_msg[%s](%s)", self.comm_id, msg)
263        if self._msg_callback:
264            return self._msg_callback(msg)
265
266    def handle_close(self, msg):
267        """Handle a comm_close message"""
268        self.log.debug("handle_close[%s](%s)", self.comm_id, msg)
269        if self._close_callback:
270            return self._close_callback(msg)
271
272
273__all__ = ['CommManager']
274