1"""Wrappers for forwarding stdout/stderr over zmq"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import atexit
7from binascii import b2a_hex
8from collections import deque
9from imp import lock_held as import_lock_held
10import os
11import sys
12import threading
13import warnings
14from io import StringIO, TextIOBase
15
16import zmq
17if zmq.pyzmq_version_info() >= (17, 0):
18    from tornado.ioloop import IOLoop
19else:
20    # deprecated since pyzmq 17
21    from zmq.eventloop.ioloop import IOLoop
22from zmq.eventloop.zmqstream import ZMQStream
23
24from jupyter_client.session import extract_header
25
26from ipython_genutils import py3compat
27
28#-----------------------------------------------------------------------------
29# Globals
30#-----------------------------------------------------------------------------
31
32MASTER = 0
33CHILD = 1
34
35#-----------------------------------------------------------------------------
36# IO classes
37#-----------------------------------------------------------------------------
38
39class IOPubThread(object):
40    """An object for sending IOPub messages in a background thread
41
42    Prevents a blocking main thread from delaying output from threads.
43
44    IOPubThread(pub_socket).background_socket is a Socket-API-providing object
45    whose IO is always run in a thread.
46    """
47
48    def __init__(self, socket, pipe=False):
49        """Create IOPub thread
50
51        Parameters
52        ----------
53
54        socket: zmq.PUB Socket
55            the socket on which messages will be sent.
56        pipe: bool
57            Whether this process should listen for IOPub messages
58            piped from subprocesses.
59        """
60        self.socket = socket
61        self.background_socket = BackgroundSocket(self)
62        self._master_pid = os.getpid()
63        self._pipe_flag = pipe
64        self.io_loop = IOLoop(make_current=False)
65        if pipe:
66            self._setup_pipe_in()
67        self._local = threading.local()
68        self._events = deque()
69        self._setup_event_pipe()
70        self.thread = threading.Thread(target=self._thread_main)
71        self.thread.daemon = True
72
73    def _thread_main(self):
74        """The inner loop that's actually run in a thread"""
75        self.io_loop.make_current()
76        self.io_loop.start()
77        self.io_loop.close(all_fds=True)
78
79    def _setup_event_pipe(self):
80        """Create the PULL socket listening for events that should fire in this thread."""
81        ctx = self.socket.context
82        pipe_in = ctx.socket(zmq.PULL)
83        pipe_in.linger = 0
84
85        _uuid = b2a_hex(os.urandom(16)).decode('ascii')
86        iface = self._event_interface = 'inproc://%s' % _uuid
87        pipe_in.bind(iface)
88        self._event_puller = ZMQStream(pipe_in, self.io_loop)
89        self._event_puller.on_recv(self._handle_event)
90
91    @property
92    def _event_pipe(self):
93        """thread-local event pipe for signaling events that should be processed in the thread"""
94        try:
95            event_pipe = self._local.event_pipe
96        except AttributeError:
97            # new thread, new event pipe
98            ctx = self.socket.context
99            event_pipe = ctx.socket(zmq.PUSH)
100            event_pipe.linger = 0
101            event_pipe.connect(self._event_interface)
102            self._local.event_pipe = event_pipe
103        return event_pipe
104
105    def _handle_event(self, msg):
106        """Handle an event on the event pipe
107
108        Content of the message is ignored.
109
110        Whenever *an* event arrives on the event stream,
111        *all* waiting events are processed in order.
112        """
113        # freeze event count so new writes don't extend the queue
114        # while we are processing
115        n_events = len(self._events)
116        for i in range(n_events):
117            event_f = self._events.popleft()
118            event_f()
119
120    def _setup_pipe_in(self):
121        """setup listening pipe for IOPub from forked subprocesses"""
122        ctx = self.socket.context
123
124        # use UUID to authenticate pipe messages
125        self._pipe_uuid = os.urandom(16)
126
127        pipe_in = ctx.socket(zmq.PULL)
128        pipe_in.linger = 0
129
130        try:
131            self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
132        except zmq.ZMQError as e:
133            warnings.warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
134                "\nsubprocess output will be unavailable."
135            )
136            self._pipe_flag = False
137            pipe_in.close()
138            return
139        self._pipe_in = ZMQStream(pipe_in, self.io_loop)
140        self._pipe_in.on_recv(self._handle_pipe_msg)
141
142    def _handle_pipe_msg(self, msg):
143        """handle a pipe message from a subprocess"""
144        if not self._pipe_flag or not self._is_master_process():
145            return
146        if msg[0] != self._pipe_uuid:
147            print("Bad pipe message: %s", msg, file=sys.__stderr__)
148            return
149        self.send_multipart(msg[1:])
150
151    def _setup_pipe_out(self):
152        # must be new context after fork
153        ctx = zmq.Context()
154        pipe_out = ctx.socket(zmq.PUSH)
155        pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
156        pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
157        return ctx, pipe_out
158
159    def _is_master_process(self):
160        return os.getpid() == self._master_pid
161
162    def _check_mp_mode(self):
163        """check for forks, and switch to zmq pipeline if necessary"""
164        if not self._pipe_flag or self._is_master_process():
165            return MASTER
166        else:
167            return CHILD
168
169    def start(self):
170        """Start the IOPub thread"""
171        self.thread.start()
172        # make sure we don't prevent process exit
173        # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
174        atexit.register(self.stop)
175
176    def stop(self):
177        """Stop the IOPub thread"""
178        if not self.thread.is_alive():
179            return
180        self.io_loop.add_callback(self.io_loop.stop)
181        self.thread.join()
182        if hasattr(self._local, 'event_pipe'):
183            self._local.event_pipe.close()
184
185    def close(self):
186        if self.closed:
187            return
188        self.socket.close()
189        self.socket = None
190
191    @property
192    def closed(self):
193        return self.socket is None
194
195    def schedule(self, f):
196        """Schedule a function to be called in our IO thread.
197
198        If the thread is not running, call immediately.
199        """
200        if self.thread.is_alive():
201            self._events.append(f)
202            # wake event thread (message content is ignored)
203            self._event_pipe.send(b'')
204        else:
205            f()
206
207    def send_multipart(self, *args, **kwargs):
208        """send_multipart schedules actual zmq send in my thread.
209
210        If my thread isn't running (e.g. forked process), send immediately.
211        """
212        self.schedule(lambda : self._really_send(*args, **kwargs))
213
214    def _really_send(self, msg, *args, **kwargs):
215        """The callback that actually sends messages"""
216        mp_mode = self._check_mp_mode()
217
218        if mp_mode != CHILD:
219            # we are master, do a regular send
220            self.socket.send_multipart(msg, *args, **kwargs)
221        else:
222            # we are a child, pipe to master
223            # new context/socket for every pipe-out
224            # since forks don't teardown politely, use ctx.term to ensure send has completed
225            ctx, pipe_out = self._setup_pipe_out()
226            pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs)
227            pipe_out.close()
228            ctx.term()
229
230
231class BackgroundSocket(object):
232    """Wrapper around IOPub thread that provides zmq send[_multipart]"""
233    io_thread = None
234
235    def __init__(self, io_thread):
236        self.io_thread = io_thread
237
238    def __getattr__(self, attr):
239        """Wrap socket attr access for backward-compatibility"""
240        if attr.startswith('__') and attr.endswith('__'):
241            # don't wrap magic methods
242            super(BackgroundSocket, self).__getattr__(attr)
243        if hasattr(self.io_thread.socket, attr):
244            warnings.warn("Accessing zmq Socket attribute %s on BackgroundSocket" % attr,
245                DeprecationWarning, stacklevel=2)
246            return getattr(self.io_thread.socket, attr)
247        super(BackgroundSocket, self).__getattr__(attr)
248
249    def __setattr__(self, attr, value):
250        if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))):
251            super(BackgroundSocket, self).__setattr__(attr, value)
252        else:
253            warnings.warn("Setting zmq Socket attribute %s on BackgroundSocket" % attr,
254                DeprecationWarning, stacklevel=2)
255            setattr(self.io_thread.socket, attr, value)
256
257    def send(self, msg, *args, **kwargs):
258        return self.send_multipart([msg], *args, **kwargs)
259
260    def send_multipart(self, *args, **kwargs):
261        """Schedule send in IO thread"""
262        return self.io_thread.send_multipart(*args, **kwargs)
263
264
265class OutStream(TextIOBase):
266    """A file like object that publishes the stream to a 0MQ PUB socket.
267
268    Output is handed off to an IO Thread
269    """
270
271    # timeout for flush to avoid infinite hang
272    # in case of misbehavior
273    flush_timeout = 10
274    # The time interval between automatic flushes, in seconds.
275    flush_interval = 0.2
276    topic = None
277    encoding = 'UTF-8'
278
279    def __init__(self, session, pub_thread, name, pipe=None, echo=None):
280        if pipe is not None:
281            warnings.warn("pipe argument to OutStream is deprecated and ignored",
282                DeprecationWarning)
283        # This is necessary for compatibility with Python built-in streams
284        self.session = session
285        if not isinstance(pub_thread, IOPubThread):
286            # Backward-compat: given socket, not thread. Wrap in a thread.
287            warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread,
288                DeprecationWarning, stacklevel=2)
289            pub_thread = IOPubThread(pub_thread)
290            pub_thread.start()
291        self.pub_thread = pub_thread
292        self.name = name
293        self.topic = b'stream.' + py3compat.cast_bytes(name)
294        self.parent_header = {}
295        self._master_pid = os.getpid()
296        self._flush_pending = False
297        self._subprocess_flush_pending = False
298        self._io_loop = pub_thread.io_loop
299        self._new_buffer()
300        self.echo = None
301
302        if echo:
303            if hasattr(echo, 'read') and hasattr(echo, 'write'):
304                self.echo = echo
305            else:
306                raise ValueError("echo argument must be a file like object")
307
308    def _is_master_process(self):
309        return os.getpid() == self._master_pid
310
311    def set_parent(self, parent):
312        self.parent_header = extract_header(parent)
313
314    def close(self):
315        self.pub_thread = None
316
317    @property
318    def closed(self):
319        return self.pub_thread is None
320
321    def _schedule_flush(self):
322        """schedule a flush in the IO thread
323
324        call this on write, to indicate that flush should be called soon.
325        """
326        if self._flush_pending:
327            return
328        self._flush_pending = True
329
330        # add_timeout has to be handed to the io thread via event pipe
331        def _schedule_in_thread():
332            self._io_loop.call_later(self.flush_interval, self._flush)
333        self.pub_thread.schedule(_schedule_in_thread)
334
335    def flush(self):
336        """trigger actual zmq send
337
338        send will happen in the background thread
339        """
340        if self.pub_thread and self.pub_thread.thread is not None and self.pub_thread.thread.is_alive():
341            # request flush on the background thread
342            self.pub_thread.schedule(self._flush)
343            # wait for flush to actually get through, if we can.
344            # waiting across threads during import can cause deadlocks
345            # so only wait if import lock is not held
346            if not import_lock_held():
347                evt = threading.Event()
348                self.pub_thread.schedule(evt.set)
349                # and give a timeout to avoid
350                if not evt.wait(self.flush_timeout):
351                    # write directly to __stderr__ instead of warning because
352                    # if this is happening sys.stderr may be the problem.
353                    print("IOStream.flush timed out", file=sys.__stderr__)
354        else:
355            self._flush()
356
357    def _flush(self):
358        """This is where the actual send happens.
359
360        _flush should generally be called in the IO thread,
361        unless the thread has been destroyed (e.g. forked subprocess).
362        """
363        self._flush_pending = False
364        self._subprocess_flush_pending = False
365
366        if self.echo is not None:
367            try:
368                self.echo.flush()
369            except OSError as e:
370                if self.echo is not sys.__stderr__:
371                    print("Flush failed: {}".format(e),
372                          file=sys.__stderr__)
373
374        data = self._flush_buffer()
375        if data:
376            # FIXME: this disables Session's fork-safe check,
377            # since pub_thread is itself fork-safe.
378            # There should be a better way to do this.
379            self.session.pid = os.getpid()
380            content = {'name':self.name, 'text':data}
381            self.session.send(self.pub_thread, 'stream', content=content,
382                parent=self.parent_header, ident=self.topic)
383
384    def write(self, string):
385        if self.echo is not None:
386            try:
387                self.echo.write(string)
388            except OSError as e:
389                if self.echo is not sys.__stderr__:
390                    print("Write failed: {}".format(e),
391                          file=sys.__stderr__)
392
393        if self.pub_thread is None:
394            raise ValueError('I/O operation on closed file')
395        else:
396            # Make sure that we're handling unicode
397            if not isinstance(string, str):
398                string = string.decode(self.encoding, 'replace')
399
400            is_child = (not self._is_master_process())
401            # only touch the buffer in the IO thread to avoid races
402            self.pub_thread.schedule(lambda : self._buffer.write(string))
403            if is_child:
404                # mp.Pool cannot be trusted to flush promptly (or ever),
405                # and this helps.
406                if self._subprocess_flush_pending:
407                    return
408                self._subprocess_flush_pending = True
409                # We can not rely on self._io_loop.call_later from a subprocess
410                self.pub_thread.schedule(self._flush)
411            else:
412                self._schedule_flush()
413
414    def writelines(self, sequence):
415        if self.pub_thread is None:
416            raise ValueError('I/O operation on closed file')
417        else:
418            for string in sequence:
419                self.write(string)
420
421    def writable(self):
422        return True
423
424    def _flush_buffer(self):
425        """clear the current buffer and return the current buffer data.
426
427        This should only be called in the IO thread.
428        """
429        data = ''
430        if self._buffer is not None:
431            buf = self._buffer
432            self._new_buffer()
433            data = buf.getvalue()
434            buf.close()
435        return data
436
437    def _new_buffer(self):
438        self._buffer = StringIO()
439