1"""Wrappers for forwarding stdout/stderr over zmq""" 2 3# Copyright (c) IPython Development Team. 4# Distributed under the terms of the Modified BSD License. 5 6import atexit 7from binascii import b2a_hex 8from collections import deque 9from imp import lock_held as import_lock_held 10import os 11import sys 12import threading 13import warnings 14from io import StringIO, TextIOBase 15 16import zmq 17if zmq.pyzmq_version_info() >= (17, 0): 18 from tornado.ioloop import IOLoop 19else: 20 # deprecated since pyzmq 17 21 from zmq.eventloop.ioloop import IOLoop 22from zmq.eventloop.zmqstream import ZMQStream 23 24from jupyter_client.session import extract_header 25 26from ipython_genutils import py3compat 27 28#----------------------------------------------------------------------------- 29# Globals 30#----------------------------------------------------------------------------- 31 32MASTER = 0 33CHILD = 1 34 35#----------------------------------------------------------------------------- 36# IO classes 37#----------------------------------------------------------------------------- 38 39class IOPubThread(object): 40 """An object for sending IOPub messages in a background thread 41 42 Prevents a blocking main thread from delaying output from threads. 43 44 IOPubThread(pub_socket).background_socket is a Socket-API-providing object 45 whose IO is always run in a thread. 46 """ 47 48 def __init__(self, socket, pipe=False): 49 """Create IOPub thread 50 51 Parameters 52 ---------- 53 54 socket: zmq.PUB Socket 55 the socket on which messages will be sent. 56 pipe: bool 57 Whether this process should listen for IOPub messages 58 piped from subprocesses. 59 """ 60 self.socket = socket 61 self.background_socket = BackgroundSocket(self) 62 self._master_pid = os.getpid() 63 self._pipe_flag = pipe 64 self.io_loop = IOLoop(make_current=False) 65 if pipe: 66 self._setup_pipe_in() 67 self._local = threading.local() 68 self._events = deque() 69 self._setup_event_pipe() 70 self.thread = threading.Thread(target=self._thread_main) 71 self.thread.daemon = True 72 73 def _thread_main(self): 74 """The inner loop that's actually run in a thread""" 75 self.io_loop.make_current() 76 self.io_loop.start() 77 self.io_loop.close(all_fds=True) 78 79 def _setup_event_pipe(self): 80 """Create the PULL socket listening for events that should fire in this thread.""" 81 ctx = self.socket.context 82 pipe_in = ctx.socket(zmq.PULL) 83 pipe_in.linger = 0 84 85 _uuid = b2a_hex(os.urandom(16)).decode('ascii') 86 iface = self._event_interface = 'inproc://%s' % _uuid 87 pipe_in.bind(iface) 88 self._event_puller = ZMQStream(pipe_in, self.io_loop) 89 self._event_puller.on_recv(self._handle_event) 90 91 @property 92 def _event_pipe(self): 93 """thread-local event pipe for signaling events that should be processed in the thread""" 94 try: 95 event_pipe = self._local.event_pipe 96 except AttributeError: 97 # new thread, new event pipe 98 ctx = self.socket.context 99 event_pipe = ctx.socket(zmq.PUSH) 100 event_pipe.linger = 0 101 event_pipe.connect(self._event_interface) 102 self._local.event_pipe = event_pipe 103 return event_pipe 104 105 def _handle_event(self, msg): 106 """Handle an event on the event pipe 107 108 Content of the message is ignored. 109 110 Whenever *an* event arrives on the event stream, 111 *all* waiting events are processed in order. 112 """ 113 # freeze event count so new writes don't extend the queue 114 # while we are processing 115 n_events = len(self._events) 116 for i in range(n_events): 117 event_f = self._events.popleft() 118 event_f() 119 120 def _setup_pipe_in(self): 121 """setup listening pipe for IOPub from forked subprocesses""" 122 ctx = self.socket.context 123 124 # use UUID to authenticate pipe messages 125 self._pipe_uuid = os.urandom(16) 126 127 pipe_in = ctx.socket(zmq.PULL) 128 pipe_in.linger = 0 129 130 try: 131 self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1") 132 except zmq.ZMQError as e: 133 warnings.warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e + 134 "\nsubprocess output will be unavailable." 135 ) 136 self._pipe_flag = False 137 pipe_in.close() 138 return 139 self._pipe_in = ZMQStream(pipe_in, self.io_loop) 140 self._pipe_in.on_recv(self._handle_pipe_msg) 141 142 def _handle_pipe_msg(self, msg): 143 """handle a pipe message from a subprocess""" 144 if not self._pipe_flag or not self._is_master_process(): 145 return 146 if msg[0] != self._pipe_uuid: 147 print("Bad pipe message: %s", msg, file=sys.__stderr__) 148 return 149 self.send_multipart(msg[1:]) 150 151 def _setup_pipe_out(self): 152 # must be new context after fork 153 ctx = zmq.Context() 154 pipe_out = ctx.socket(zmq.PUSH) 155 pipe_out.linger = 3000 # 3s timeout for pipe_out sends before discarding the message 156 pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port) 157 return ctx, pipe_out 158 159 def _is_master_process(self): 160 return os.getpid() == self._master_pid 161 162 def _check_mp_mode(self): 163 """check for forks, and switch to zmq pipeline if necessary""" 164 if not self._pipe_flag or self._is_master_process(): 165 return MASTER 166 else: 167 return CHILD 168 169 def start(self): 170 """Start the IOPub thread""" 171 self.thread.start() 172 # make sure we don't prevent process exit 173 # I'm not sure why setting daemon=True above isn't enough, but it doesn't appear to be. 174 atexit.register(self.stop) 175 176 def stop(self): 177 """Stop the IOPub thread""" 178 if not self.thread.is_alive(): 179 return 180 self.io_loop.add_callback(self.io_loop.stop) 181 self.thread.join() 182 if hasattr(self._local, 'event_pipe'): 183 self._local.event_pipe.close() 184 185 def close(self): 186 if self.closed: 187 return 188 self.socket.close() 189 self.socket = None 190 191 @property 192 def closed(self): 193 return self.socket is None 194 195 def schedule(self, f): 196 """Schedule a function to be called in our IO thread. 197 198 If the thread is not running, call immediately. 199 """ 200 if self.thread.is_alive(): 201 self._events.append(f) 202 # wake event thread (message content is ignored) 203 self._event_pipe.send(b'') 204 else: 205 f() 206 207 def send_multipart(self, *args, **kwargs): 208 """send_multipart schedules actual zmq send in my thread. 209 210 If my thread isn't running (e.g. forked process), send immediately. 211 """ 212 self.schedule(lambda : self._really_send(*args, **kwargs)) 213 214 def _really_send(self, msg, *args, **kwargs): 215 """The callback that actually sends messages""" 216 mp_mode = self._check_mp_mode() 217 218 if mp_mode != CHILD: 219 # we are master, do a regular send 220 self.socket.send_multipart(msg, *args, **kwargs) 221 else: 222 # we are a child, pipe to master 223 # new context/socket for every pipe-out 224 # since forks don't teardown politely, use ctx.term to ensure send has completed 225 ctx, pipe_out = self._setup_pipe_out() 226 pipe_out.send_multipart([self._pipe_uuid] + msg, *args, **kwargs) 227 pipe_out.close() 228 ctx.term() 229 230 231class BackgroundSocket(object): 232 """Wrapper around IOPub thread that provides zmq send[_multipart]""" 233 io_thread = None 234 235 def __init__(self, io_thread): 236 self.io_thread = io_thread 237 238 def __getattr__(self, attr): 239 """Wrap socket attr access for backward-compatibility""" 240 if attr.startswith('__') and attr.endswith('__'): 241 # don't wrap magic methods 242 super(BackgroundSocket, self).__getattr__(attr) 243 if hasattr(self.io_thread.socket, attr): 244 warnings.warn("Accessing zmq Socket attribute %s on BackgroundSocket" % attr, 245 DeprecationWarning, stacklevel=2) 246 return getattr(self.io_thread.socket, attr) 247 super(BackgroundSocket, self).__getattr__(attr) 248 249 def __setattr__(self, attr, value): 250 if attr == 'io_thread' or (attr.startswith('__' and attr.endswith('__'))): 251 super(BackgroundSocket, self).__setattr__(attr, value) 252 else: 253 warnings.warn("Setting zmq Socket attribute %s on BackgroundSocket" % attr, 254 DeprecationWarning, stacklevel=2) 255 setattr(self.io_thread.socket, attr, value) 256 257 def send(self, msg, *args, **kwargs): 258 return self.send_multipart([msg], *args, **kwargs) 259 260 def send_multipart(self, *args, **kwargs): 261 """Schedule send in IO thread""" 262 return self.io_thread.send_multipart(*args, **kwargs) 263 264 265class OutStream(TextIOBase): 266 """A file like object that publishes the stream to a 0MQ PUB socket. 267 268 Output is handed off to an IO Thread 269 """ 270 271 # timeout for flush to avoid infinite hang 272 # in case of misbehavior 273 flush_timeout = 10 274 # The time interval between automatic flushes, in seconds. 275 flush_interval = 0.2 276 topic = None 277 encoding = 'UTF-8' 278 279 def __init__(self, session, pub_thread, name, pipe=None, echo=None): 280 if pipe is not None: 281 warnings.warn("pipe argument to OutStream is deprecated and ignored", 282 DeprecationWarning) 283 # This is necessary for compatibility with Python built-in streams 284 self.session = session 285 if not isinstance(pub_thread, IOPubThread): 286 # Backward-compat: given socket, not thread. Wrap in a thread. 287 warnings.warn("OutStream should be created with IOPubThread, not %r" % pub_thread, 288 DeprecationWarning, stacklevel=2) 289 pub_thread = IOPubThread(pub_thread) 290 pub_thread.start() 291 self.pub_thread = pub_thread 292 self.name = name 293 self.topic = b'stream.' + py3compat.cast_bytes(name) 294 self.parent_header = {} 295 self._master_pid = os.getpid() 296 self._flush_pending = False 297 self._subprocess_flush_pending = False 298 self._io_loop = pub_thread.io_loop 299 self._new_buffer() 300 self.echo = None 301 302 if echo: 303 if hasattr(echo, 'read') and hasattr(echo, 'write'): 304 self.echo = echo 305 else: 306 raise ValueError("echo argument must be a file like object") 307 308 def _is_master_process(self): 309 return os.getpid() == self._master_pid 310 311 def set_parent(self, parent): 312 self.parent_header = extract_header(parent) 313 314 def close(self): 315 self.pub_thread = None 316 317 @property 318 def closed(self): 319 return self.pub_thread is None 320 321 def _schedule_flush(self): 322 """schedule a flush in the IO thread 323 324 call this on write, to indicate that flush should be called soon. 325 """ 326 if self._flush_pending: 327 return 328 self._flush_pending = True 329 330 # add_timeout has to be handed to the io thread via event pipe 331 def _schedule_in_thread(): 332 self._io_loop.call_later(self.flush_interval, self._flush) 333 self.pub_thread.schedule(_schedule_in_thread) 334 335 def flush(self): 336 """trigger actual zmq send 337 338 send will happen in the background thread 339 """ 340 if self.pub_thread and self.pub_thread.thread is not None and self.pub_thread.thread.is_alive(): 341 # request flush on the background thread 342 self.pub_thread.schedule(self._flush) 343 # wait for flush to actually get through, if we can. 344 # waiting across threads during import can cause deadlocks 345 # so only wait if import lock is not held 346 if not import_lock_held(): 347 evt = threading.Event() 348 self.pub_thread.schedule(evt.set) 349 # and give a timeout to avoid 350 if not evt.wait(self.flush_timeout): 351 # write directly to __stderr__ instead of warning because 352 # if this is happening sys.stderr may be the problem. 353 print("IOStream.flush timed out", file=sys.__stderr__) 354 else: 355 self._flush() 356 357 def _flush(self): 358 """This is where the actual send happens. 359 360 _flush should generally be called in the IO thread, 361 unless the thread has been destroyed (e.g. forked subprocess). 362 """ 363 self._flush_pending = False 364 self._subprocess_flush_pending = False 365 366 if self.echo is not None: 367 try: 368 self.echo.flush() 369 except OSError as e: 370 if self.echo is not sys.__stderr__: 371 print("Flush failed: {}".format(e), 372 file=sys.__stderr__) 373 374 data = self._flush_buffer() 375 if data: 376 # FIXME: this disables Session's fork-safe check, 377 # since pub_thread is itself fork-safe. 378 # There should be a better way to do this. 379 self.session.pid = os.getpid() 380 content = {'name':self.name, 'text':data} 381 self.session.send(self.pub_thread, 'stream', content=content, 382 parent=self.parent_header, ident=self.topic) 383 384 def write(self, string): 385 if self.echo is not None: 386 try: 387 self.echo.write(string) 388 except OSError as e: 389 if self.echo is not sys.__stderr__: 390 print("Write failed: {}".format(e), 391 file=sys.__stderr__) 392 393 if self.pub_thread is None: 394 raise ValueError('I/O operation on closed file') 395 else: 396 # Make sure that we're handling unicode 397 if not isinstance(string, str): 398 string = string.decode(self.encoding, 'replace') 399 400 is_child = (not self._is_master_process()) 401 # only touch the buffer in the IO thread to avoid races 402 self.pub_thread.schedule(lambda : self._buffer.write(string)) 403 if is_child: 404 # mp.Pool cannot be trusted to flush promptly (or ever), 405 # and this helps. 406 if self._subprocess_flush_pending: 407 return 408 self._subprocess_flush_pending = True 409 # We can not rely on self._io_loop.call_later from a subprocess 410 self.pub_thread.schedule(self._flush) 411 else: 412 self._schedule_flush() 413 414 def writelines(self, sequence): 415 if self.pub_thread is None: 416 raise ValueError('I/O operation on closed file') 417 else: 418 for string in sequence: 419 self.write(string) 420 421 def writable(self): 422 return True 423 424 def _flush_buffer(self): 425 """clear the current buffer and return the current buffer data. 426 427 This should only be called in the IO thread. 428 """ 429 data = '' 430 if self._buffer is not None: 431 buf = self._buffer 432 self._new_buffer() 433 data = buf.getvalue() 434 buf.close() 435 return data 436 437 def _new_buffer(self): 438 self._buffer = StringIO() 439