1"""Wrappers for forwarding stdout/stderr over zmq"""
2
3# Copyright (c) IPython Development Team.
4# Distributed under the terms of the Modified BSD License.
5
6import atexit
7from binascii import b2a_hex
8from collections import deque
9from imp import lock_held as import_lock_held
10import os
11import sys
12import threading
13import warnings
14from weakref import WeakSet
15import traceback
16from io import StringIO, TextIOBase
17import io
18
19import zmq
20if zmq.pyzmq_version_info() >= (17, 0):
21    from tornado.ioloop import IOLoop
22else:
23    # deprecated since pyzmq 17
24    from zmq.eventloop.ioloop import IOLoop
25from zmq.eventloop.zmqstream import ZMQStream
26
27from jupyter_client.session import extract_header
28
29
30#-----------------------------------------------------------------------------
31# Globals
32#-----------------------------------------------------------------------------
33
34MASTER = 0
35CHILD = 1
36
37#-----------------------------------------------------------------------------
38# IO classes
39#-----------------------------------------------------------------------------
40
41
42class IOPubThread(object):
43    """An object for sending IOPub messages in a background thread
44
45    Prevents a blocking main thread from delaying output from threads.
46
47    IOPubThread(pub_socket).background_socket is a Socket-API-providing object
48    whose IO is always run in a thread.
49    """
50
51    def __init__(self, socket, pipe=False):
52        """Create IOPub thread
53
54        Parameters
55        ----------
56        socket : zmq.PUB Socket
57            the socket on which messages will be sent.
58        pipe : bool
59            Whether this process should listen for IOPub messages
60            piped from subprocesses.
61        """
62        self.socket = socket
63        self.background_socket = BackgroundSocket(self)
64        self._master_pid = os.getpid()
65        self._pipe_flag = pipe
66        self.io_loop = IOLoop(make_current=False)
67        if pipe:
68            self._setup_pipe_in()
69        self._local = threading.local()
70        self._events = deque()
71        self._event_pipes = WeakSet()
72        self._setup_event_pipe()
73        self.thread = threading.Thread(target=self._thread_main)
74        self.thread.daemon = True
75        self.thread.pydev_do_not_trace = True
76        self.thread.is_pydev_daemon_thread = True
77
78    def _thread_main(self):
79        """The inner loop that's actually run in a thread"""
80        self.io_loop.make_current()
81        self.io_loop.start()
82        self.io_loop.close(all_fds=True)
83
84    def _setup_event_pipe(self):
85        """Create the PULL socket listening for events that should fire in this thread."""
86        ctx = self.socket.context
87        pipe_in = ctx.socket(zmq.PULL)
88        pipe_in.linger = 0
89
90        _uuid = b2a_hex(os.urandom(16)).decode('ascii')
91        iface = self._event_interface = 'inproc://%s' % _uuid
92        pipe_in.bind(iface)
93        self._event_puller = ZMQStream(pipe_in, self.io_loop)
94        self._event_puller.on_recv(self._handle_event)
95
96    @property
97    def _event_pipe(self):
98        """thread-local event pipe for signaling events that should be processed in the thread"""
99        try:
100            event_pipe = self._local.event_pipe
101        except AttributeError:
102            # new thread, new event pipe
103            ctx = self.socket.context
104            event_pipe = ctx.socket(zmq.PUSH)
105            event_pipe.linger = 0
106            event_pipe.connect(self._event_interface)
107            self._local.event_pipe = event_pipe
108            # WeakSet so that event pipes will be closed by garbage collection
109            # when their threads are terminated
110            self._event_pipes.add(event_pipe)
111        return event_pipe
112
113    def _handle_event(self, msg):
114        """Handle an event on the event pipe
115
116        Content of the message is ignored.
117
118        Whenever *an* event arrives on the event stream,
119        *all* waiting events are processed in order.
120        """
121        # freeze event count so new writes don't extend the queue
122        # while we are processing
123        n_events = len(self._events)
124        for i in range(n_events):
125            event_f = self._events.popleft()
126            event_f()
127
128    def _setup_pipe_in(self):
129        """setup listening pipe for IOPub from forked subprocesses"""
130        ctx = self.socket.context
131
132        # use UUID to authenticate pipe messages
133        self._pipe_uuid = os.urandom(16)
134
135        pipe_in = ctx.socket(zmq.PULL)
136        pipe_in.linger = 0
137
138        try:
139            self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
140        except zmq.ZMQError as e:
141            warnings.warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
142                "\nsubprocess output will be unavailable."
143            )
144            self._pipe_flag = False
145            pipe_in.close()
146            return
147        self._pipe_in = ZMQStream(pipe_in, self.io_loop)
148        self._pipe_in.on_recv(self._handle_pipe_msg)
149
150    def _handle_pipe_msg(self, msg):
151        """handle a pipe message from a subprocess"""
152        if not self._pipe_flag or not self._is_master_process():
153            return
154        if msg[0] != self._pipe_uuid:
155            print("Bad pipe message: %s", msg, file=sys.__stderr__)
156            return
157        self.send_multipart(msg[1:])
158
159    def _setup_pipe_out(self):
160        # must be new context after fork
161        ctx = zmq.Context()
162        pipe_out = ctx.socket(zmq.PUSH)
163        pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message
164        pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
165        return ctx, pipe_out
166
167    def _is_master_process(self):
168        return os.getpid() == self._master_pid
169
170    def _check_mp_mode(self):
171        """check for forks, and switch to zmq pipeline if necessary"""
172        if not self._pipe_flag or self._is_master_process():
173            return MASTER
174        else:
175            return CHILD
176
177    def start(self):
178        """Start the IOPub thread"""
179        self.thread.start()
180        # make sure we don't prevent process exit
181        # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be.
182        atexit.register(self.stop)
183
184    def stop(self):
185        """Stop the IOPub thread"""
186        if not self.thread.is_alive():
187            return
188        self.io_loop.add_callback(self.io_loop.stop)
189        self.thread.join()
190        # close *all* event pipes, created in any thread
191        # event pipes can only be used from other threads while self.thread.is_alive()
192        # so after thread.join, this should be safe
193        for event_pipe in self._event_pipes:
194            event_pipe.close()
195
196    def close(self):
197        if self.closed:
198            return
199        self.socket.close()
200        self.socket = None
201
202    @property
203    def closed(self):
204        return self.socket is None
205
206    def schedule(self, f):
207        """Schedule a function to be called in our IO thread.
208
209        If the thread is not running, call immediately.
210        """
211        if self.thread.is_alive():
212            self._events.append(f)
213            # wake event thread (message content is ignored)
214            self._event_pipe.send(b'')
215        else:
216            f()
217
218    def send_multipart(self, *args, **kwargs):
219        """send_multipart schedules actual zmq send in my thread.
220
221        If my thread isn't running (e.g. forked process), send immediately.
222        """
223        self.schedule(lambda : self._really_send(*args, **kwargs))
224
225    def _really_send(self, msg, *args, **kwargs):
226        """The callback that actually sends messages"""
227        mp_mode = self._check_mp_mode()
228
229        if mp_mode != CHILD:
230            # we are master, do a regular send
231            self.socket.send_multipart(msg, *args, **kwargs)
232        else:
233            # we are a child, pipe to master
234            # new context/socket for every pipe-out
235            # since forks don't teardown politely, use ctx.term to ensure send has completed
236            ctx, pipe_out = self._setup_pipe_out()
237            pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs)
238            pipe_out.close()
239            ctx.term()
240
241
242class BackgroundSocket(object):
243    """Wrapper around IOPub thread that provides zmq send[_multipart]"""
244    io_thread = None
245
246    def __init__(self, io_thread):
247        self.io_thread = io_thread
248
249    def __getattr__(self, attr):
250        """Wrap socket attr access for backward-compatibility"""
251        if attr.startswith('__') and attr.endswith('__'):
252            # don't wrap magic methods
253            super(BackgroundSocket, self).__getattr__(attr)
254        if hasattr(self.io_thread.socket, attr):
255            warnings.warn(
256                "Accessing zmq Socket attribute {attr} on BackgroundSocket"
257                " is deprecated since ipykernel 4.3.0"
258                " use .io_thread.socket.{attr}".format(attr=attr),
259                DeprecationWarning,
260                stacklevel=2,
261            )
262            return getattr(self.io_thread.socket, attr)
263        super(BackgroundSocket, self).__getattr__(attr)
264
265    def __setattr__(self, attr, value):
266        if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))):
267            super(BackgroundSocket, self).__setattr__(attr, value)
268        else:
269            warnings.warn(
270                "Setting zmq Socket attribute {attr} on BackgroundSocket"
271                " is deprecated since ipykernel 4.3.0"
272                " use .io_thread.socket.{attr}".format(attr=attr),
273                DeprecationWarning,
274                stacklevel=2,
275            )
276            setattr(self.io_thread.socket, attr, value)
277
278    def send(self, msg, *args, **kwargs):
279        return self.send_multipart([msg], *args, **kwargs)
280
281    def send_multipart(self, *args, **kwargs):
282        """Schedule send in IO thread"""
283        return self.io_thread.send_multipart(*args, **kwargs)
284
285
286class OutStream(TextIOBase):
287    """A file like object that publishes the stream to a 0MQ PUB socket.
288
289    Output is handed off to an IO Thread
290    """
291
292    # timeout for flush to avoid infinite hang
293    # in case of misbehavior
294    flush_timeout = 10
295    # The time interval between automatic flushes, in seconds.
296    flush_interval = 0.2
297    topic = None
298    encoding = 'UTF-8'
299
300
301    def fileno(self):
302        """
303        Things like subprocess will peak and write to the fileno() of stderr/stdout.
304        """
305        if getattr(self, "_original_stdstream_copy", None) is not None:
306            return self._original_stdstream_copy
307        else:
308            raise io.UnsupportedOperation("fileno")
309
310    def _watch_pipe_fd(self):
311        """
312        We've redirected standards steams 0 and 1 into a pipe.
313
314        We need to watch in a thread and redirect them to the right places.
315
316        1) the ZMQ channels to show in notebook interfaces,
317        2) the original stdout/err, to capture errors in terminals.
318
319        We cannot schedule this on the ioloop thread, as this might be blocking.
320
321        """
322
323        try:
324            bts = os.read(self._fid, 1000)
325            while bts and self._should_watch:
326                self.write(bts.decode())
327                os.write(self._original_stdstream_copy, bts)
328                bts = os.read(self._fid, 1000)
329        except Exception:
330            self._exc = sys.exc_info()
331
332    def __init__(
333        self, session, pub_thread, name, pipe=None, echo=None, *, watchfd=True, isatty=False,
334    ):
335        """
336        Parameters
337        ----------
338        name : str {'stderr', 'stdout'}
339            the name of the standard stream to replace
340        watchfd : bool (default, True)
341            Watch the file descripttor corresponding to the replaced stream.
342            This is useful if you know some underlying code will write directly
343            the file descriptor by its number. It will spawn a watching thread,
344            that will swap the give file descriptor for a pipe, read from the
345            pipe, and insert this into the current Stream.
346        isatty : bool (default, False)
347            Indication of whether this stream has termimal capabilities (e.g. can handle colors)
348
349        """
350        if pipe is not None:
351            warnings.warn(
352                "pipe argument to OutStream is deprecated and ignored",
353                " since ipykernel 4.2.3.",
354                DeprecationWarning,
355                stacklevel=2,
356            )
357        # This is necessary for compatibility with Python built-in streams
358        self.session = session
359        if not isinstance(pub_thread, IOPubThread):
360            # Backward-compat: given socket, not thread. Wrap in a thread.
361            warnings.warn(
362                "Since IPykernel 4.3, OutStream should be created with "
363                "IOPubThread, not %r" % pub_thread,
364                DeprecationWarning,
365                stacklevel=2,
366            )
367            pub_thread = IOPubThread(pub_thread)
368            pub_thread.start()
369        self.pub_thread = pub_thread
370        self.name = name
371        self.topic = b"stream." + name.encode()
372        self.parent_header = {}
373        self._master_pid = os.getpid()
374        self._flush_pending = False
375        self._subprocess_flush_pending = False
376        self._io_loop = pub_thread.io_loop
377        self._new_buffer()
378        self.echo = None
379        self._isatty = bool(isatty)
380
381        if (
382            watchfd
383            and (sys.platform.startswith("linux") or sys.platform.startswith("darwin"))
384            and ("PYTEST_CURRENT_TEST" not in os.environ)
385        ):
386            # Pytest set its own capture. Dont redirect from within pytest.
387
388            self._should_watch = True
389            self._setup_stream_redirects(name)
390
391        if echo:
392            if hasattr(echo, 'read') and hasattr(echo, 'write'):
393                self.echo = echo
394            else:
395                raise ValueError("echo argument must be a file like object")
396
397    def isatty(self):
398        """Return a bool indicating whether this is an 'interactive' stream.
399
400        Returns:
401            Boolean
402        """
403        return self._isatty
404
405    def _setup_stream_redirects(self, name):
406        pr, pw = os.pipe()
407        fno = getattr(sys, name).fileno()
408        self._original_stdstream_copy = os.dup(fno)
409        os.dup2(pw, fno)
410
411        self._fid = pr
412
413        self._exc = None
414        self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd)
415        self.watch_fd_thread.daemon = True
416        self.watch_fd_thread.start()
417
418    def _is_master_process(self):
419        return os.getpid() == self._master_pid
420
421    def set_parent(self, parent):
422        self.parent_header = extract_header(parent)
423
424    def close(self):
425        if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
426            self._should_watch = False
427            self.watch_fd_thread.join()
428        if self._exc:
429            etype, value, tb = self._exc
430            traceback.print_exception(etype, value, tb)
431        self.pub_thread = None
432
433    @property
434    def closed(self):
435        return self.pub_thread is None
436
437    def _schedule_flush(self):
438        """schedule a flush in the IO thread
439
440        call this on write, to indicate that flush should be called soon.
441        """
442        if self._flush_pending:
443            return
444        self._flush_pending = True
445
446        # add_timeout has to be handed to the io thread via event pipe
447        def _schedule_in_thread():
448            self._io_loop.call_later(self.flush_interval, self._flush)
449        self.pub_thread.schedule(_schedule_in_thread)
450
451    def flush(self):
452        """trigger actual zmq send
453
454        send will happen in the background thread
455        """
456        if self.pub_thread and self.pub_thread.thread is not None and self.pub_thread.thread.is_alive():
457            # request flush on the background thread
458            self.pub_thread.schedule(self._flush)
459            # wait for flush to actually get through, if we can.
460            # waiting across threads during import can cause deadlocks
461            # so only wait if import lock is not held
462            if not import_lock_held():
463                evt = threading.Event()
464                self.pub_thread.schedule(evt.set)
465                # and give a timeout to avoid
466                if not evt.wait(self.flush_timeout):
467                    # write directly to __stderr__ instead of warning because
468                    # if this is happening sys.stderr may be the problem.
469                    print("IOStream.flush timed out", file=sys.__stderr__)
470        else:
471            self._flush()
472
473    def _flush(self):
474        """This is where the actual send happens.
475
476        _flush should generally be called in the IO thread,
477        unless the thread has been destroyed (e.g. forked subprocess).
478        """
479        self._flush_pending = False
480        self._subprocess_flush_pending = False
481
482        if self.echo is not None:
483            try:
484                self.echo.flush()
485            except OSError as e:
486                if self.echo is not sys.__stderr__:
487                    print("Flush failed: {}".format(e),
488                          file=sys.__stderr__)
489
490        data = self._flush_buffer()
491        if data:
492            # FIXME: this disables Session's fork-safe check,
493            # since pub_thread is itself fork-safe.
494            # There should be a better way to do this.
495            self.session.pid = os.getpid()
496            content = {'name':self.name, 'text':data}
497            self.session.send(self.pub_thread, 'stream', content=content,
498                parent=self.parent_header, ident=self.topic)
499
500    def write(self, string: str) -> int:
501        """Write to current stream after encoding if necessary
502
503        Returns
504        -------
505        len : int
506            number of items from input parameter written to stream.
507
508        """
509
510        if not isinstance(string, str):
511            raise TypeError(
512                f"write() argument must be str, not {type(string)}"
513            )
514
515        if self.echo is not None:
516            try:
517                self.echo.write(string)
518            except OSError as e:
519                if self.echo is not sys.__stderr__:
520                    print("Write failed: {}".format(e),
521                          file=sys.__stderr__)
522
523        if self.pub_thread is None:
524            raise ValueError('I/O operation on closed file')
525        else:
526
527            is_child = (not self._is_master_process())
528            # only touch the buffer in the IO thread to avoid races
529            self.pub_thread.schedule(lambda: self._buffer.write(string))
530            if is_child:
531                # mp.Pool cannot be trusted to flush promptly (or ever),
532                # and this helps.
533                if self._subprocess_flush_pending:
534                    return
535                self._subprocess_flush_pending = True
536                # We can not rely on self._io_loop.call_later from a subprocess
537                self.pub_thread.schedule(self._flush)
538            else:
539                self._schedule_flush()
540
541        return len(string)
542
543    def writelines(self, sequence):
544        if self.pub_thread is None:
545            raise ValueError('I/O operation on closed file')
546        else:
547            for string in sequence:
548                self.write(string)
549
550    def writable(self):
551        return True
552
553    def _flush_buffer(self):
554        """clear the current buffer and return the current buffer data.
555
556        This should only be called in the IO thread.
557        """
558        data = ''
559        if self._buffer is not None:
560            buf = self._buffer
561            self._new_buffer()
562            data = buf.getvalue()
563            buf.close()
564        return data
565
566    def _new_buffer(self):
567        self._buffer = StringIO()
568