1"""Wrappers for forwarding stdout/stderr over zmq""" 2 3# Copyright (c) IPython Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6import atexit 7from binascii import b2a_hex 8from collections import deque 9from imp import lock_held as import_lock_held 10import os 11import sys 12import threading 13import warnings 14from weakref import WeakSet 15import traceback 16from io import StringIO, TextIOBase 17import io 18 19import zmq 20if zmq.pyzmq_version_info() >= (17, 0): 21 from tornado.ioloop import IOLoop 22else: 23 # deprecated since pyzmq 17 24 from zmq.eventloop.ioloop import IOLoop 25from zmq.eventloop.zmqstream import ZMQStream 26 27from jupyter_client.session import extract_header 28 29 30#----------------------------------------------------------------------------- 31# Globals 32#----------------------------------------------------------------------------- 33 34MASTER = 0 35CHILD = 1 36 37#----------------------------------------------------------------------------- 38# IO classes 39#----------------------------------------------------------------------------- 40 41 42class IOPubThread(object): 43 """An object for sending IOPub messages in a background thread 44 45 Prevents a blocking main thread from delaying output from threads. 46 47 IOPubThread(pub_socket).background_socket is a Socket-API-providing object 48 whose IO is always run in a thread. 49 """ 50 51 def __init__(self, socket, pipe=False): 52 """Create IOPub thread 53 54 Parameters 55 ---------- 56 socket : zmq.PUB Socket 57 the socket on which messages will be sent. 58 pipe : bool 59 Whether this process should listen for IOPub messages 60 piped from subprocesses. 61 """ 62 self.socket = socket 63 self.background_socket = BackgroundSocket(self) 64 self._master_pid = os.getpid() 65 self._pipe_flag = pipe 66 self.io_loop = IOLoop(make_current=False) 67 if pipe: 68 self._setup_pipe_in() 69 self._local = threading.local() 70 self._events = deque() 71 self._event_pipes = WeakSet() 72 self._setup_event_pipe() 73 self.thread = threading.Thread(target=self._thread_main) 74 self.thread.daemon = True 75 self.thread.pydev_do_not_trace = True 76 self.thread.is_pydev_daemon_thread = True 77 78 def _thread_main(self): 79 """The inner loop that's actually run in a thread""" 80 self.io_loop.make_current() 81 self.io_loop.start() 82 self.io_loop.close(all_fds=True) 83 84 def _setup_event_pipe(self): 85 """Create the PULL socket listening for events that should fire in this thread.""" 86 ctx = self.socket.context 87 pipe_in = ctx.socket(zmq.PULL) 88 pipe_in.linger = 0 89 90 _uuid = b2a_hex(os.urandom(16)).decode('ascii') 91 iface = self._event_interface = 'inproc://%s' % _uuid 92 pipe_in.bind(iface) 93 self._event_puller = ZMQStream(pipe_in, self.io_loop) 94 self._event_puller.on_recv(self._handle_event) 95 96 @property 97 def _event_pipe(self): 98 """thread-local event pipe for signaling events that should be processed in the thread""" 99 try: 100 event_pipe = self._local.event_pipe 101 except AttributeError: 102 # new thread, new event pipe 103 ctx = self.socket.context 104 event_pipe = ctx.socket(zmq.PUSH) 105 event_pipe.linger = 0 106 event_pipe.connect(self._event_interface) 107 self._local.event_pipe = event_pipe 108 # WeakSet so that event pipes will be closed by garbage collection 109 # when their threads are terminated 110 self._event_pipes.add(event_pipe) 111 return event_pipe 112 113 def _handle_event(self, msg): 114 """Handle an event on the event pipe 115 116 Content of the message is ignored. 117 118 Whenever *an* event arrives on the event stream, 119 *all* waiting events are processed in order. 120 """ 121 # freeze event count so new writes don't extend the queue 122 # while we are processing 123 n_events = len(self._events) 124 for i in range(n_events): 125 event_f = self._events.popleft() 126 event_f() 127 128 def _setup_pipe_in(self): 129 """setup listening pipe for IOPub from forked subprocesses""" 130 ctx = self.socket.context 131 132 # use UUID to authenticate pipe messages 133 self._pipe_uuid = os.urandom(16) 134 135 pipe_in = ctx.socket(zmq.PULL) 136 pipe_in.linger = 0 137 138 try: 139 self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1") 140 except zmq.ZMQError as e: 141 warnings.warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e + 142 "\nsubprocess output will be unavailable." 143 ) 144 self._pipe_flag = False 145 pipe_in.close() 146 return 147 self._pipe_in = ZMQStream(pipe_in, self.io_loop) 148 self._pipe_in.on_recv(self._handle_pipe_msg) 149 150 def _handle_pipe_msg(self, msg): 151 """handle a pipe message from a subprocess""" 152 if not self._pipe_flag or not self._is_master_process(): 153 return 154 if msg[0] != self._pipe_uuid: 155 print("Bad pipe message: %s", msg, file=sys.__stderr__) 156 return 157 self.send_multipart(msg[1:]) 158 159 def _setup_pipe_out(self): 160 # must be new context after fork 161 ctx = zmq.Context() 162 pipe_out = ctx.socket(zmq.PUSH) 163 pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message 164 pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port) 165 return ctx, pipe_out 166 167 def _is_master_process(self): 168 return os.getpid() == self._master_pid 169 170 def _check_mp_mode(self): 171 """check for forks, and switch to zmq pipeline if necessary""" 172 if not self._pipe_flag or self._is_master_process(): 173 return MASTER 174 else: 175 return CHILD 176 177 def start(self): 178 """Start the IOPub thread""" 179 self.thread.start() 180 # make sure we don't prevent process exit 181 # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be. 182 atexit.register(self.stop) 183 184 def stop(self): 185 """Stop the IOPub thread""" 186 if not self.thread.is_alive(): 187 return 188 self.io_loop.add_callback(self.io_loop.stop) 189 self.thread.join() 190 # close *all* event pipes, created in any thread 191 # event pipes can only be used from other threads while self.thread.is_alive() 192 # so after thread.join, this should be safe 193 for event_pipe in self._event_pipes: 194 event_pipe.close() 195 196 def close(self): 197 if self.closed: 198 return 199 self.socket.close() 200 self.socket = None 201 202 @property 203 def closed(self): 204 return self.socket is None 205 206 def schedule(self, f): 207 """Schedule a function to be called in our IO thread. 208 209 If the thread is not running, call immediately. 210 """ 211 if self.thread.is_alive(): 212 self._events.append(f) 213 # wake event thread (message content is ignored) 214 self._event_pipe.send(b'') 215 else: 216 f() 217 218 def send_multipart(self, *args, **kwargs): 219 """send_multipart schedules actual zmq send in my thread. 220 221 If my thread isn't running (e.g. forked process), send immediately. 222 """ 223 self.schedule(lambda : self._really_send(*args, **kwargs)) 224 225 def _really_send(self, msg, *args, **kwargs): 226 """The callback that actually sends messages""" 227 mp_mode = self._check_mp_mode() 228 229 if mp_mode != CHILD: 230 # we are master, do a regular send 231 self.socket.send_multipart(msg, *args, **kwargs) 232 else: 233 # we are a child, pipe to master 234 # new context/socket for every pipe-out 235 # since forks don't teardown politely, use ctx.term to ensure send has completed 236 ctx, pipe_out = self._setup_pipe_out() 237 pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs) 238 pipe_out.close() 239 ctx.term() 240 241 242class BackgroundSocket(object): 243 """Wrapper around IOPub thread that provides zmq send[_multipart]""" 244 io_thread = None 245 246 def __init__(self, io_thread): 247 self.io_thread = io_thread 248 249 def __getattr__(self, attr): 250 """Wrap socket attr access for backward-compatibility""" 251 if attr.startswith('__') and attr.endswith('__'): 252 # don't wrap magic methods 253 super(BackgroundSocket, self).__getattr__(attr) 254 if hasattr(self.io_thread.socket, attr): 255 warnings.warn( 256 "Accessing zmq Socket attribute {attr} on BackgroundSocket" 257 " is deprecated since ipykernel 4.3.0" 258 " use .io_thread.socket.{attr}".format(attr=attr), 259 DeprecationWarning, 260 stacklevel=2, 261 ) 262 return getattr(self.io_thread.socket, attr) 263 super(BackgroundSocket, self).__getattr__(attr) 264 265 def __setattr__(self, attr, value): 266 if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))): 267 super(BackgroundSocket, self).__setattr__(attr, value) 268 else: 269 warnings.warn( 270 "Setting zmq Socket attribute {attr} on BackgroundSocket" 271 " is deprecated since ipykernel 4.3.0" 272 " use .io_thread.socket.{attr}".format(attr=attr), 273 DeprecationWarning, 274 stacklevel=2, 275 ) 276 setattr(self.io_thread.socket, attr, value) 277 278 def send(self, msg, *args, **kwargs): 279 return self.send_multipart([msg], *args, **kwargs) 280 281 def send_multipart(self, *args, **kwargs): 282 """Schedule send in IO thread""" 283 return self.io_thread.send_multipart(*args, **kwargs) 284 285 286class OutStream(TextIOBase): 287 """A file like object that publishes the stream to a 0MQ PUB socket. 288 289 Output is handed off to an IO Thread 290 """ 291 292 # timeout for flush to avoid infinite hang 293 # in case of misbehavior 294 flush_timeout = 10 295 # The time interval between automatic flushes, in seconds. 296 flush_interval = 0.2 297 topic = None 298 encoding = 'UTF-8' 299 300 301 def fileno(self): 302 """ 303 Things like subprocess will peak and write to the fileno() of stderr/stdout. 304 """ 305 if getattr(self, "_original_stdstream_copy", None) is not None: 306 return self._original_stdstream_copy 307 else: 308 raise io.UnsupportedOperation("fileno") 309 310 def _watch_pipe_fd(self): 311 """ 312 We've redirected standards steams 0 and 1 into a pipe. 313 314 We need to watch in a thread and redirect them to the right places. 315 316 1) the ZMQ channels to show in notebook interfaces, 317 2) the original stdout/err, to capture errors in terminals. 318 319 We cannot schedule this on the ioloop thread, as this might be blocking. 320 321 """ 322 323 try: 324 bts = os.read(self._fid, 1000) 325 while bts and self._should_watch: 326 self.write(bts.decode()) 327 os.write(self._original_stdstream_copy, bts) 328 bts = os.read(self._fid, 1000) 329 except Exception: 330 self._exc = sys.exc_info() 331 332 def __init__( 333 self, session, pub_thread, name, pipe=None, echo=None, *, watchfd=True, isatty=False, 334 ): 335 """ 336 Parameters 337 ---------- 338 name : str {'stderr', 'stdout'} 339 the name of the standard stream to replace 340 watchfd : bool (default, True) 341 Watch the file descripttor corresponding to the replaced stream. 342 This is useful if you know some underlying code will write directly 343 the file descriptor by its number. It will spawn a watching thread, 344 that will swap the give file descriptor for a pipe, read from the 345 pipe, and insert this into the current Stream. 346 isatty : bool (default, False) 347 Indication of whether this stream has termimal capabilities (e.g. can handle colors) 348 349 """ 350 if pipe is not None: 351 warnings.warn( 352 "pipe argument to OutStream is deprecated and ignored", 353 " since ipykernel 4.2.3.", 354 DeprecationWarning, 355 stacklevel=2, 356 ) 357 # This is necessary for compatibility with Python built-in streams 358 self.session = session 359 if not isinstance(pub_thread, IOPubThread): 360 # Backward-compat: given socket, not thread. Wrap in a thread. 361 warnings.warn( 362 "Since IPykernel 4.3, OutStream should be created with " 363 "IOPubThread, not %r" % pub_thread, 364 DeprecationWarning, 365 stacklevel=2, 366 ) 367 pub_thread = IOPubThread(pub_thread) 368 pub_thread.start() 369 self.pub_thread = pub_thread 370 self.name = name 371 self.topic = b"stream." + name.encode() 372 self.parent_header = {} 373 self._master_pid = os.getpid() 374 self._flush_pending = False 375 self._subprocess_flush_pending = False 376 self._io_loop = pub_thread.io_loop 377 self._new_buffer() 378 self.echo = None 379 self._isatty = bool(isatty) 380 381 if ( 382 watchfd 383 and (sys.platform.startswith("linux") or sys.platform.startswith("darwin")) 384 and ("PYTEST_CURRENT_TEST" not in os.environ) 385 ): 386 # Pytest set its own capture. Dont redirect from within pytest. 387 388 self._should_watch = True 389 self._setup_stream_redirects(name) 390 391 if echo: 392 if hasattr(echo, 'read') and hasattr(echo, 'write'): 393 self.echo = echo 394 else: 395 raise ValueError("echo argument must be a file like object") 396 397 def isatty(self): 398 """Return a bool indicating whether this is an 'interactive' stream. 399 400 Returns: 401 Boolean 402 """ 403 return self._isatty 404 405 def _setup_stream_redirects(self, name): 406 pr, pw = os.pipe() 407 fno = getattr(sys, name).fileno() 408 self._original_stdstream_copy = os.dup(fno) 409 os.dup2(pw, fno) 410 411 self._fid = pr 412 413 self._exc = None 414 self.watch_fd_thread = threading.Thread(target=self._watch_pipe_fd) 415 self.watch_fd_thread.daemon = True 416 self.watch_fd_thread.start() 417 418 def _is_master_process(self): 419 return os.getpid() == self._master_pid 420 421 def set_parent(self, parent): 422 self.parent_header = extract_header(parent) 423 424 def close(self): 425 if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): 426 self._should_watch = False 427 self.watch_fd_thread.join() 428 if self._exc: 429 etype, value, tb = self._exc 430 traceback.print_exception(etype, value, tb) 431 self.pub_thread = None 432 433 @property 434 def closed(self): 435 return self.pub_thread is None 436 437 def _schedule_flush(self): 438 """schedule a flush in the IO thread 439 440 call this on write, to indicate that flush should be called soon. 441 """ 442 if self._flush_pending: 443 return 444 self._flush_pending = True 445 446 # add_timeout has to be handed to the io thread via event pipe 447 def _schedule_in_thread(): 448 self._io_loop.call_later(self.flush_interval, self._flush) 449 self.pub_thread.schedule(_schedule_in_thread) 450 451 def flush(self): 452 """trigger actual zmq send 453 454 send will happen in the background thread 455 """ 456 if self.pub_thread and self.pub_thread.thread is not None and self.pub_thread.thread.is_alive(): 457 # request flush on the background thread 458 self.pub_thread.schedule(self._flush) 459 # wait for flush to actually get through, if we can. 460 # waiting across threads during import can cause deadlocks 461 # so only wait if import lock is not held 462 if not import_lock_held(): 463 evt = threading.Event() 464 self.pub_thread.schedule(evt.set) 465 # and give a timeout to avoid 466 if not evt.wait(self.flush_timeout): 467 # write directly to __stderr__ instead of warning because 468 # if this is happening sys.stderr may be the problem. 469 print("IOStream.flush timed out", file=sys.__stderr__) 470 else: 471 self._flush() 472 473 def _flush(self): 474 """This is where the actual send happens. 475 476 _flush should generally be called in the IO thread, 477 unless the thread has been destroyed (e.g. forked subprocess). 478 """ 479 self._flush_pending = False 480 self._subprocess_flush_pending = False 481 482 if self.echo is not None: 483 try: 484 self.echo.flush() 485 except OSError as e: 486 if self.echo is not sys.__stderr__: 487 print("Flush failed: {}".format(e), 488 file=sys.__stderr__) 489 490 data = self._flush_buffer() 491 if data: 492 # FIXME: this disables Session's fork-safe check, 493 # since pub_thread is itself fork-safe. 494 # There should be a better way to do this. 495 self.session.pid = os.getpid() 496 content = {'name':self.name, 'text':data} 497 self.session.send(self.pub_thread, 'stream', content=content, 498 parent=self.parent_header, ident=self.topic) 499 500 def write(self, string: str) -> int: 501 """Write to current stream after encoding if necessary 502 503 Returns 504 ------- 505 len : int 506 number of items from input parameter written to stream. 507 508 """ 509 510 if not isinstance(string, str): 511 raise TypeError( 512 f"write() argument must be str, not {type(string)}" 513 ) 514 515 if self.echo is not None: 516 try: 517 self.echo.write(string) 518 except OSError as e: 519 if self.echo is not sys.__stderr__: 520 print("Write failed: {}".format(e), 521 file=sys.__stderr__) 522 523 if self.pub_thread is None: 524 raise ValueError('I/O operation on closed file') 525 else: 526 527 is_child = (not self._is_master_process()) 528 # only touch the buffer in the IO thread to avoid races 529 self.pub_thread.schedule(lambda: self._buffer.write(string)) 530 if is_child: 531 # mp.Pool cannot be trusted to flush promptly (or ever), 532 # and this helps. 533 if self._subprocess_flush_pending: 534 return 535 self._subprocess_flush_pending = True 536 # We can not rely on self._io_loop.call_later from a subprocess 537 self.pub_thread.schedule(self._flush) 538 else: 539 self._schedule_flush() 540 541 return len(string) 542 543 def writelines(self, sequence): 544 if self.pub_thread is None: 545 raise ValueError('I/O operation on closed file') 546 else: 547 for string in sequence: 548 self.write(string) 549 550 def writable(self): 551 return True 552 553 def _flush_buffer(self): 554 """clear the current buffer and return the current buffer data. 555 556 This should only be called in the IO thread. 557 """ 558 data = '' 559 if self._buffer is not None: 560 buf = self._buffer 561 self._new_buffer() 562 data = buf.getvalue() 563 buf.close() 564 return data 565 566 def _new_buffer(self): 567 self._buffer = StringIO() 568