1# Copyright 2019, David Wilson 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are met: 5# 6# 1. Redistributions of source code must retain the above copyright notice, 7# this list of conditions and the following disclaimer. 8# 9# 2. Redistributions in binary form must reproduce the above copyright notice, 10# this list of conditions and the following disclaimer in the documentation 11# and/or other materials provided with the distribution. 12# 13# 3. Neither the name of the copyright holder nor the names of its contributors 14# may be used to endorse or promote products derived from this software without 15# specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28 29# !mitogen: minify_safe 30 31""" 32This module defines functionality common to master and parent processes. It is 33sent to any child context that is due to become a parent, due to recursive 34connection. 35""" 36 37import codecs 38import errno 39import fcntl 40import getpass 41import heapq 42import inspect 43import logging 44import os 45import re 46import signal 47import socket 48import struct 49import subprocess 50import sys 51import termios 52import textwrap 53import threading 54import zlib 55 56# Absolute imports for <2.5. 57select = __import__('select') 58 59try: 60 import thread 61except ImportError: 62 import threading as thread 63 64import mitogen.core 65from mitogen.core import b 66from mitogen.core import bytes_partition 67from mitogen.core import IOLOG 68 69 70LOG = logging.getLogger(__name__) 71 72# #410: we must avoid the use of socketpairs if SELinux is enabled. 73try: 74 fp = open('/sys/fs/selinux/enforce', 'rb') 75 try: 76 SELINUX_ENABLED = bool(int(fp.read())) 77 finally: 78 fp.close() 79except IOError: 80 SELINUX_ENABLED = False 81 82 83try: 84 next 85except NameError: 86 # Python 2.4/2.5 87 from mitogen.core import next 88 89 90itervalues = getattr(dict, 'itervalues', dict.values) 91 92if mitogen.core.PY3: 93 xrange = range 94 closure_attr = '__closure__' 95 IM_SELF_ATTR = '__self__' 96else: 97 closure_attr = 'func_closure' 98 IM_SELF_ATTR = 'im_self' 99 100 101try: 102 SC_OPEN_MAX = os.sysconf('SC_OPEN_MAX') 103except ValueError: 104 SC_OPEN_MAX = 1024 105 106BROKER_SHUTDOWN_MSG = ( 107 'Connection cancelled because the associated Broker began to shut down.' 108) 109 110OPENPTY_MSG = ( 111 "Failed to create a PTY: %s. It is likely the maximum number of PTYs has " 112 "been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS " 113 "X, the 'kernel.pty.max' sysctl on Linux, or modifying your configuration " 114 "to avoid PTY use." 115) 116 117SYS_EXECUTABLE_MSG = ( 118 "The Python sys.executable variable is unset, indicating Python was " 119 "unable to determine its original program name. Unless explicitly " 120 "configured otherwise, child contexts will be started using " 121 "'/usr/bin/python'" 122) 123_sys_executable_warning_logged = False 124 125 126def _ioctl_cast(n): 127 """ 128 Linux ioctl() request parameter is unsigned, whereas on BSD/Darwin it is 129 signed. Until 2.5 Python exclusively implemented the BSD behaviour, 130 preventing use of large unsigned int requests like the TTY layer uses 131 below. So on 2.4, we cast our unsigned to look like signed for Python. 132 """ 133 if sys.version_info < (2, 5): 134 n, = struct.unpack('i', struct.pack('I', n)) 135 return n 136 137 138# If not :data:`None`, called prior to exec() of any new child process. Used by 139# :func:`mitogen.utils.reset_affinity` to allow the child to be freely 140# scheduled. 141_preexec_hook = None 142 143# Get PTY number; asm-generic/ioctls.h 144LINUX_TIOCGPTN = _ioctl_cast(2147767344) 145 146# Lock/unlock PTY; asm-generic/ioctls.h 147LINUX_TIOCSPTLCK = _ioctl_cast(1074025521) 148 149IS_LINUX = os.uname()[0] == 'Linux' 150 151SIGNAL_BY_NUM = dict( 152 (getattr(signal, name), name) 153 for name in sorted(vars(signal), reverse=True) 154 if name.startswith('SIG') and not name.startswith('SIG_') 155) 156 157_core_source_lock = threading.Lock() 158_core_source_partial = None 159 160 161def get_log_level(): 162 return (LOG.getEffectiveLevel() or logging.INFO) 163 164 165def get_sys_executable(): 166 """ 167 Return :data:`sys.executable` if it is set, otherwise return 168 ``"/usr/bin/python"`` and log a warning. 169 """ 170 if sys.executable: 171 return sys.executable 172 173 global _sys_executable_warning_logged 174 if not _sys_executable_warning_logged: 175 LOG.warn(SYS_EXECUTABLE_MSG) 176 _sys_executable_warning_logged = True 177 178 return '/usr/bin/python' 179 180 181def _get_core_source(): 182 """ 183 In non-masters, simply fetch the cached mitogen.core source code via the 184 import mechanism. In masters, this function is replaced with a version that 185 performs minification directly. 186 """ 187 return inspect.getsource(mitogen.core) 188 189 190def get_core_source_partial(): 191 """ 192 _get_core_source() is expensive, even with @lru_cache in minify.py, threads 193 can enter it simultaneously causing severe slowdowns. 194 """ 195 global _core_source_partial 196 197 if _core_source_partial is None: 198 _core_source_lock.acquire() 199 try: 200 if _core_source_partial is None: 201 _core_source_partial = PartialZlib( 202 _get_core_source().encode('utf-8') 203 ) 204 finally: 205 _core_source_lock.release() 206 207 return _core_source_partial 208 209 210def get_default_remote_name(): 211 """ 212 Return the default name appearing in argv[0] of remote machines. 213 """ 214 s = u'%s@%s:%d' 215 s %= (getpass.getuser(), socket.gethostname(), os.getpid()) 216 # In mixed UNIX/Windows environments, the username may contain slashes. 217 return s.translate({ 218 ord(u'\\'): ord(u'_'), 219 ord(u'/'): ord(u'_') 220 }) 221 222 223def is_immediate_child(msg, stream): 224 """ 225 Handler policy that requires messages to arrive only from immediately 226 connected children. 227 """ 228 return msg.src_id == stream.protocol.remote_id 229 230 231def flags(names): 232 """ 233 Return the result of ORing a set of (space separated) :py:mod:`termios` 234 module constants together. 235 """ 236 return sum(getattr(termios, name, 0) 237 for name in names.split()) 238 239 240def cfmakeraw(tflags): 241 """ 242 Given a list returned by :py:func:`termios.tcgetattr`, return a list 243 modified in a manner similar to the `cfmakeraw()` C library function, but 244 additionally disabling local echo. 245 """ 246 # BSD: github.com/freebsd/freebsd/blob/master/lib/libc/gen/termios.c#L162 247 # Linux: github.com/lattera/glibc/blob/master/termios/cfmakeraw.c#L20 248 iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags 249 iflag &= ~flags('IMAXBEL IXOFF INPCK BRKINT PARMRK ' 250 'ISTRIP INLCR ICRNL IXON IGNPAR') 251 iflag &= ~flags('IGNBRK BRKINT PARMRK') 252 oflag &= ~flags('OPOST') 253 lflag &= ~flags('ECHO ECHOE ECHOK ECHONL ICANON ISIG ' 254 'IEXTEN NOFLSH TOSTOP PENDIN') 255 cflag &= ~flags('CSIZE PARENB') 256 cflag |= flags('CS8 CREAD') 257 return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] 258 259 260def disable_echo(fd): 261 old = termios.tcgetattr(fd) 262 new = cfmakeraw(old) 263 flags = getattr(termios, 'TCSASOFT', 0) 264 if not mitogen.core.IS_WSL: 265 # issue #319: Windows Subsystem for Linux as of July 2018 throws EINVAL 266 # if TCSAFLUSH is specified. 267 flags |= termios.TCSAFLUSH 268 termios.tcsetattr(fd, flags, new) 269 270 271def create_socketpair(size=None): 272 """ 273 Create a :func:`socket.socketpair` for use as a child's UNIX stdio 274 channels. As socketpairs are bidirectional, they are economical on file 275 descriptor usage as one descriptor can be used for ``stdin`` and 276 ``stdout``. As they are sockets their buffers are tunable, allowing large 277 buffers to improve file transfer throughput and reduce IO loop iterations. 278 """ 279 if size is None: 280 size = mitogen.core.CHUNK_SIZE 281 282 parentfp, childfp = socket.socketpair() 283 for fp in parentfp, childfp: 284 fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size) 285 286 return parentfp, childfp 287 288 289def create_best_pipe(escalates_privilege=False): 290 """ 291 By default we prefer to communicate with children over a UNIX socket, as a 292 single file descriptor can represent bidirectional communication, and a 293 cross-platform API exists to align buffer sizes with the needs of the 294 library. 295 296 SELinux prevents us setting up a privileged process to inherit an AF_UNIX 297 socket, a facility explicitly designed as a better replacement for pipes, 298 because at some point in the mid 90s it might have been commonly possible 299 for AF_INET sockets to end up undesirably connected to a privileged 300 process, so let's make up arbitrary rules breaking all sockets instead. 301 302 If SELinux is detected, fall back to using pipes. 303 304 :param bool escalates_privilege: 305 If :data:`True`, the target program may escalate privileges, causing 306 SELinux to disconnect AF_UNIX sockets, so avoid those. 307 :returns: 308 `(parent_rfp, child_wfp, child_rfp, parent_wfp)` 309 """ 310 if (not escalates_privilege) or (not SELINUX_ENABLED): 311 parentfp, childfp = create_socketpair() 312 return parentfp, childfp, childfp, parentfp 313 314 parent_rfp, child_wfp = mitogen.core.pipe() 315 try: 316 child_rfp, parent_wfp = mitogen.core.pipe() 317 return parent_rfp, child_wfp, child_rfp, parent_wfp 318 except: 319 parent_rfp.close() 320 child_wfp.close() 321 raise 322 323 324def popen(**kwargs): 325 """ 326 Wrap :class:`subprocess.Popen` to ensure any global :data:`_preexec_hook` 327 is invoked in the child. 328 """ 329 real_preexec_fn = kwargs.pop('preexec_fn', None) 330 def preexec_fn(): 331 if _preexec_hook: 332 _preexec_hook() 333 if real_preexec_fn: 334 real_preexec_fn() 335 return subprocess.Popen(preexec_fn=preexec_fn, **kwargs) 336 337 338def create_child(args, merge_stdio=False, stderr_pipe=False, 339 escalates_privilege=False, preexec_fn=None): 340 """ 341 Create a child process whose stdin/stdout is connected to a socket. 342 343 :param list args: 344 Program argument vector. 345 :param bool merge_stdio: 346 If :data:`True`, arrange for `stderr` to be connected to the `stdout` 347 socketpair, rather than inherited from the parent process. This may be 348 necessary to ensure that no TTY is connected to any stdio handle, for 349 instance when using LXC. 350 :param bool stderr_pipe: 351 If :data:`True` and `merge_stdio` is :data:`False`, arrange for 352 `stderr` to be connected to a separate pipe, to allow any ongoing debug 353 logs generated by e.g. SSH to be output as the session progresses, 354 without interfering with `stdout`. 355 :param bool escalates_privilege: 356 If :data:`True`, the target program may escalate privileges, causing 357 SELinux to disconnect AF_UNIX sockets, so avoid those. 358 :param function preexec_fn: 359 If not :data:`None`, a function to run within the post-fork child 360 before executing the target program. 361 :returns: 362 :class:`Process` instance. 363 """ 364 parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe( 365 escalates_privilege=escalates_privilege 366 ) 367 368 stderr = None 369 stderr_r = None 370 if merge_stdio: 371 stderr = child_wfp 372 elif stderr_pipe: 373 stderr_r, stderr = mitogen.core.pipe() 374 mitogen.core.set_cloexec(stderr_r.fileno()) 375 376 try: 377 proc = popen( 378 args=args, 379 stdin=child_rfp, 380 stdout=child_wfp, 381 stderr=stderr, 382 close_fds=True, 383 preexec_fn=preexec_fn, 384 ) 385 except: 386 child_rfp.close() 387 child_wfp.close() 388 parent_rfp.close() 389 parent_wfp.close() 390 if stderr_pipe: 391 stderr.close() 392 stderr_r.close() 393 raise 394 395 child_rfp.close() 396 child_wfp.close() 397 if stderr_pipe: 398 stderr.close() 399 400 return PopenProcess( 401 proc=proc, 402 stdin=parent_wfp, 403 stdout=parent_rfp, 404 stderr=stderr_r, 405 ) 406 407 408def _acquire_controlling_tty(): 409 os.setsid() 410 if sys.platform in ('linux', 'linux2'): 411 # On Linux, the controlling tty becomes the first tty opened by a 412 # process lacking any prior tty. 413 os.close(os.open(os.ttyname(2), os.O_RDWR)) 414 if hasattr(termios, 'TIOCSCTTY') and not mitogen.core.IS_WSL: 415 # #550: prehistoric WSL does not like TIOCSCTTY. 416 # On BSD an explicit ioctl is required. For some inexplicable reason, 417 # Python 2.6 on Travis also requires it. 418 fcntl.ioctl(2, termios.TIOCSCTTY) 419 420 421def _linux_broken_devpts_openpty(): 422 """ 423 #462: On broken Linux hosts with mismatched configuration (e.g. old 424 /etc/fstab template installed), /dev/pts may be mounted without the gid= 425 mount option, causing new slave devices to be created with the group ID of 426 the calling process. This upsets glibc, whose openpty() is required by 427 specification to produce a slave owned by a special group ID (which is 428 always the 'tty' group). 429 430 Glibc attempts to use "pt_chown" to fix ownership. If that fails, it 431 chown()s the PTY directly, which fails due to non-root, causing openpty() 432 to fail with EPERM ("Operation not permitted"). Since we don't need the 433 magical TTY group to run sudo and su, open the PTY ourselves in this case. 434 """ 435 master_fd = None 436 try: 437 # Opening /dev/ptmx causes a PTY pair to be allocated, and the 438 # corresponding slave /dev/pts/* device to be created, owned by UID/GID 439 # matching this process. 440 master_fd = os.open('/dev/ptmx', os.O_RDWR) 441 # Clear the lock bit from the PTY. This a prehistoric feature from a 442 # time when slave device files were persistent. 443 fcntl.ioctl(master_fd, LINUX_TIOCSPTLCK, struct.pack('i', 0)) 444 # Since v4.13 TIOCGPTPEER exists to open the slave in one step, but we 445 # must support older kernels. Ask for the PTY number. 446 pty_num_s = fcntl.ioctl(master_fd, LINUX_TIOCGPTN, 447 struct.pack('i', 0)) 448 pty_num, = struct.unpack('i', pty_num_s) 449 pty_name = '/dev/pts/%d' % (pty_num,) 450 # Now open it with O_NOCTTY to ensure it doesn't change our controlling 451 # TTY. Otherwise when we close the FD we get killed by the kernel, and 452 # the child we spawn that should really attach to it will get EPERM 453 # during _acquire_controlling_tty(). 454 slave_fd = os.open(pty_name, os.O_RDWR|os.O_NOCTTY) 455 return master_fd, slave_fd 456 except OSError: 457 if master_fd is not None: 458 os.close(master_fd) 459 e = sys.exc_info()[1] 460 raise mitogen.core.StreamError(OPENPTY_MSG, e) 461 462 463def openpty(): 464 """ 465 Call :func:`os.openpty`, raising a descriptive error if the call fails. 466 467 :raises mitogen.core.StreamError: 468 Creating a PTY failed. 469 :returns: 470 `(master_fp, slave_fp)` file-like objects. 471 """ 472 try: 473 master_fd, slave_fd = os.openpty() 474 except OSError: 475 e = sys.exc_info()[1] 476 if not (IS_LINUX and e.args[0] == errno.EPERM): 477 raise mitogen.core.StreamError(OPENPTY_MSG, e) 478 master_fd, slave_fd = _linux_broken_devpts_openpty() 479 480 master_fp = os.fdopen(master_fd, 'r+b', 0) 481 slave_fp = os.fdopen(slave_fd, 'r+b', 0) 482 disable_echo(master_fd) 483 disable_echo(slave_fd) 484 mitogen.core.set_block(slave_fd) 485 return master_fp, slave_fp 486 487 488def tty_create_child(args): 489 """ 490 Return a file descriptor connected to the master end of a pseudo-terminal, 491 whose slave end is connected to stdin/stdout/stderr of a new child process. 492 The child is created such that the pseudo-terminal becomes its controlling 493 TTY, ensuring access to /dev/tty returns a new file descriptor open on the 494 slave end. 495 496 :param list args: 497 Program argument vector. 498 :returns: 499 :class:`Process` instance. 500 """ 501 master_fp, slave_fp = openpty() 502 try: 503 proc = popen( 504 args=args, 505 stdin=slave_fp, 506 stdout=slave_fp, 507 stderr=slave_fp, 508 preexec_fn=_acquire_controlling_tty, 509 close_fds=True, 510 ) 511 except: 512 master_fp.close() 513 slave_fp.close() 514 raise 515 516 slave_fp.close() 517 return PopenProcess( 518 proc=proc, 519 stdin=master_fp, 520 stdout=master_fp, 521 ) 522 523 524def hybrid_tty_create_child(args, escalates_privilege=False): 525 """ 526 Like :func:`tty_create_child`, except attach stdin/stdout to a socketpair 527 like :func:`create_child`, but leave stderr and the controlling TTY 528 attached to a TTY. 529 530 This permits high throughput communication with programs that are reached 531 via some program that requires a TTY for password input, like many 532 configurations of sudo. The UNIX TTY layer tends to have tiny (no more than 533 14KiB) buffers, forcing many IO loop iterations when transferring bulk 534 data, causing significant performance loss. 535 536 :param bool escalates_privilege: 537 If :data:`True`, the target program may escalate privileges, causing 538 SELinux to disconnect AF_UNIX sockets, so avoid those. 539 :param list args: 540 Program argument vector. 541 :returns: 542 :class:`Process` instance. 543 """ 544 master_fp, slave_fp = openpty() 545 try: 546 parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe( 547 escalates_privilege=escalates_privilege, 548 ) 549 try: 550 mitogen.core.set_block(child_rfp) 551 mitogen.core.set_block(child_wfp) 552 proc = popen( 553 args=args, 554 stdin=child_rfp, 555 stdout=child_wfp, 556 stderr=slave_fp, 557 preexec_fn=_acquire_controlling_tty, 558 close_fds=True, 559 ) 560 except: 561 parent_rfp.close() 562 child_wfp.close() 563 parent_wfp.close() 564 child_rfp.close() 565 raise 566 except: 567 master_fp.close() 568 slave_fp.close() 569 raise 570 571 slave_fp.close() 572 child_rfp.close() 573 child_wfp.close() 574 return PopenProcess( 575 proc=proc, 576 stdin=parent_wfp, 577 stdout=parent_rfp, 578 stderr=master_fp, 579 ) 580 581 582class Timer(object): 583 """ 584 Represents a future event. 585 """ 586 #: Set to :data:`False` if :meth:`cancel` has been called, or immediately 587 #: prior to being executed by :meth:`TimerList.expire`. 588 active = True 589 590 def __init__(self, when, func): 591 self.when = when 592 self.func = func 593 594 def __repr__(self): 595 return 'Timer(%r, %r)' % (self.when, self.func) 596 597 def __eq__(self, other): 598 return self.when == other.when 599 600 def __lt__(self, other): 601 return self.when < other.when 602 603 def __le__(self, other): 604 return self.when <= other.when 605 606 def cancel(self): 607 """ 608 Cancel this event. If it has not yet executed, it will not execute 609 during any subsequent :meth:`TimerList.expire` call. 610 """ 611 self.active = False 612 613 614class TimerList(object): 615 """ 616 Efficiently manage a list of cancellable future events relative to wall 617 clock time. An instance of this class is installed as 618 :attr:`mitogen.master.Broker.timers` by default, and as 619 :attr:`mitogen.core.Broker.timers` in children after a call to 620 :func:`mitogen.parent.upgrade_router`. 621 622 You can use :class:`TimerList` to cause the broker to wake at arbitrary 623 future moments, useful for implementing timeouts and polling in an 624 asynchronous context. 625 626 :class:`TimerList` methods can only be called from asynchronous context, 627 for example via :meth:`mitogen.core.Broker.defer`. 628 629 The broker automatically adjusts its sleep delay according to the installed 630 timer list, and arranges for timers to expire via automatic calls to 631 :meth:`expire`. The main user interface to :class:`TimerList` is 632 :meth:`schedule`. 633 """ 634 _now = mitogen.core.now 635 636 def __init__(self): 637 self._lst = [] 638 639 def get_timeout(self): 640 """ 641 Return the floating point seconds until the next event is due. 642 643 :returns: 644 Floating point delay, or 0.0, or :data:`None` if no events are 645 scheduled. 646 """ 647 while self._lst and not self._lst[0].active: 648 heapq.heappop(self._lst) 649 if self._lst: 650 return max(0, self._lst[0].when - self._now()) 651 652 def schedule(self, when, func): 653 """ 654 Schedule a future event. 655 656 :param float when: 657 UNIX time in seconds when event should occur. 658 :param callable func: 659 Callable to invoke on expiry. 660 :returns: 661 A :class:`Timer` instance, exposing :meth:`Timer.cancel`, which may 662 be used to cancel the future invocation. 663 """ 664 timer = Timer(when, func) 665 heapq.heappush(self._lst, timer) 666 return timer 667 668 def expire(self): 669 """ 670 Invoke callbacks for any events in the past. 671 """ 672 now = self._now() 673 while self._lst and self._lst[0].when <= now: 674 timer = heapq.heappop(self._lst) 675 if timer.active: 676 timer.active = False 677 timer.func() 678 679 680class PartialZlib(object): 681 """ 682 Because the mitogen.core source has a line appended to it during bootstrap, 683 it must be recompressed for each connection. This is not a problem for a 684 small number of connections, but it amounts to 30 seconds CPU time by the 685 time 500 targets are in use. 686 687 For that reason, build a compressor containing mitogen.core and flush as 688 much of it as possible into an initial buffer. Then to append the custom 689 line, clone the compressor and compress just that line. 690 691 A full compression costs ~6ms on a modern machine, this method costs ~35 692 usec. 693 """ 694 def __init__(self, s): 695 self.s = s 696 if sys.version_info > (2, 5): 697 self._compressor = zlib.compressobj(9) 698 self._out = self._compressor.compress(s) 699 self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH) 700 else: 701 self._compressor = None 702 703 def append(self, s): 704 """ 705 Append the bytestring `s` to the compressor state and return the 706 final compressed output. 707 """ 708 if self._compressor is None: 709 return zlib.compress(self.s + s, 9) 710 else: 711 compressor = self._compressor.copy() 712 out = self._out 713 out += compressor.compress(s) 714 return out + compressor.flush() 715 716 717def _upgrade_broker(broker): 718 """ 719 Extract the poller state from Broker and replace it with the industrial 720 strength poller for this OS. Must run on the Broker thread. 721 """ 722 # This function is deadly! The act of calling start_receive() generates log 723 # messages which must be silenced as the upgrade progresses, otherwise the 724 # poller state will change as it is copied, resulting in write fds that are 725 # lost. (Due to LogHandler->Router->Stream->Protocol->Broker->Poller, where 726 # Stream only calls start_transmit() when transitioning from empty to 727 # non-empty buffer. If the start_transmit() is lost, writes from the child 728 # hang permanently). 729 root = logging.getLogger() 730 old_level = root.level 731 root.setLevel(logging.CRITICAL) 732 try: 733 old = broker.poller 734 new = PREFERRED_POLLER() 735 for fd, data in old.readers: 736 new.start_receive(fd, data) 737 for fd, data in old.writers: 738 new.start_transmit(fd, data) 739 740 old.close() 741 broker.poller = new 742 finally: 743 root.setLevel(old_level) 744 745 broker.timers = TimerList() 746 LOG.debug('upgraded %r with %r (new: %d readers, %d writers; ' 747 'old: %d readers, %d writers)', old, new, 748 len(new.readers), len(new.writers), 749 len(old.readers), len(old.writers)) 750 751 752@mitogen.core.takes_econtext 753def upgrade_router(econtext): 754 if not isinstance(econtext.router, Router): # TODO 755 econtext.broker.defer(_upgrade_broker, econtext.broker) 756 econtext.router.__class__ = Router # TODO 757 econtext.router.upgrade( 758 importer=econtext.importer, 759 parent=econtext.parent, 760 ) 761 762 763def get_connection_class(name): 764 """ 765 Given the name of a Mitogen connection method, import its implementation 766 module and return its Stream subclass. 767 """ 768 if name == u'local': 769 name = u'parent' 770 module = mitogen.core.import_module(u'mitogen.' + name) 771 return module.Connection 772 773 774@mitogen.core.takes_econtext 775def _proxy_connect(name, method_name, kwargs, econtext): 776 """ 777 Implements the target portion of Router._proxy_connect() by upgrading the 778 local process to a parent if it was not already, then calling back into 779 Router._connect() using the arguments passed to the parent's 780 Router.connect(). 781 782 :returns: 783 Dict containing: 784 * ``id``: :data:`None`, or integer new context ID. 785 * ``name``: :data:`None`, or string name attribute of new Context. 786 * ``msg``: :data:`None`, or StreamError exception text. 787 """ 788 upgrade_router(econtext) 789 790 try: 791 context = econtext.router._connect( 792 klass=get_connection_class(method_name), 793 name=name, 794 **kwargs 795 ) 796 except mitogen.core.StreamError: 797 return { 798 u'id': None, 799 u'name': None, 800 u'msg': 'error occurred on host %s: %s' % ( 801 socket.gethostname(), 802 sys.exc_info()[1], 803 ), 804 } 805 806 return { 807 u'id': context.context_id, 808 u'name': context.name, 809 u'msg': None, 810 } 811 812 813def returncode_to_str(n): 814 """ 815 Parse and format a :func:`os.waitpid` exit status. 816 """ 817 if n < 0: 818 return 'exited due to signal %d (%s)' % (-n, SIGNAL_BY_NUM.get(-n)) 819 return 'exited with return code %d' % (n,) 820 821 822class EofError(mitogen.core.StreamError): 823 """ 824 Raised by :class:`Connection` when an empty read is detected from the 825 remote process before bootstrap completes. 826 """ 827 # inherits from StreamError to maintain compatibility. 828 pass 829 830 831class CancelledError(mitogen.core.StreamError): 832 """ 833 Raised by :class:`Connection` when :meth:`mitogen.core.Broker.shutdown` is 834 called before bootstrap completes. 835 """ 836 pass 837 838 839class Argv(object): 840 """ 841 Wrapper to defer argv formatting when debug logging is disabled. 842 """ 843 def __init__(self, argv): 844 self.argv = argv 845 846 must_escape = frozenset('\\$"`!') 847 must_escape_or_space = must_escape | frozenset(' ') 848 849 def escape(self, x): 850 if not self.must_escape_or_space.intersection(x): 851 return x 852 853 s = '"' 854 for c in x: 855 if c in self.must_escape: 856 s += '\\' 857 s += c 858 s += '"' 859 return s 860 861 def __str__(self): 862 return ' '.join(map(self.escape, self.argv)) 863 864 865class CallSpec(object): 866 """ 867 Wrapper to defer call argument formatting when debug logging is disabled. 868 """ 869 def __init__(self, func, args, kwargs): 870 self.func = func 871 self.args = args 872 self.kwargs = kwargs 873 874 def _get_name(self): 875 bits = [self.func.__module__] 876 if inspect.ismethod(self.func): 877 im_self = getattr(self.func, IM_SELF_ATTR) 878 bits.append(getattr(im_self, '__name__', None) or 879 getattr(type(im_self), '__name__', None)) 880 bits.append(self.func.__name__) 881 return u'.'.join(bits) 882 883 def _get_args(self): 884 return u', '.join(repr(a) for a in self.args) 885 886 def _get_kwargs(self): 887 s = u'' 888 if self.kwargs: 889 s = u', '.join('%s=%r' % (k, v) for k, v in self.kwargs.items()) 890 if self.args: 891 s = u', ' + s 892 return s 893 894 def __repr__(self): 895 return '%s(%s%s)' % ( 896 self._get_name(), 897 self._get_args(), 898 self._get_kwargs(), 899 ) 900 901 902class PollPoller(mitogen.core.Poller): 903 """ 904 Poller based on the POSIX :linux:man2:`poll` interface. Not available on 905 some versions of OS X, otherwise it is the preferred poller for small FD 906 counts, as there is no setup/teardown/configuration system call overhead. 907 """ 908 SUPPORTED = hasattr(select, 'poll') 909 _repr = 'PollPoller()' 910 911 def __init__(self): 912 super(PollPoller, self).__init__() 913 self._pollobj = select.poll() 914 915 # TODO: no proof we dont need writemask too 916 _readmask = ( 917 getattr(select, 'POLLIN', 0) | 918 getattr(select, 'POLLHUP', 0) 919 ) 920 921 def _update(self, fd): 922 mask = (((fd in self._rfds) and self._readmask) | 923 ((fd in self._wfds) and select.POLLOUT)) 924 if mask: 925 self._pollobj.register(fd, mask) 926 else: 927 try: 928 self._pollobj.unregister(fd) 929 except KeyError: 930 pass 931 932 def _poll(self, timeout): 933 if timeout: 934 timeout *= 1000 935 936 events, _ = mitogen.core.io_op(self._pollobj.poll, timeout) 937 for fd, event in events: 938 if event & self._readmask: 939 IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd) 940 data, gen = self._rfds.get(fd, (None, None)) 941 if gen and gen < self._generation: 942 yield data 943 if event & select.POLLOUT: 944 IOLOG.debug('%r: POLLOUT for %r', self, fd) 945 data, gen = self._wfds.get(fd, (None, None)) 946 if gen and gen < self._generation: 947 yield data 948 949 950class KqueuePoller(mitogen.core.Poller): 951 """ 952 Poller based on the FreeBSD/Darwin :freebsd:man2:`kqueue` interface. 953 """ 954 SUPPORTED = hasattr(select, 'kqueue') 955 _repr = 'KqueuePoller()' 956 957 def __init__(self): 958 super(KqueuePoller, self).__init__() 959 self._kqueue = select.kqueue() 960 self._changelist = [] 961 962 def close(self): 963 super(KqueuePoller, self).close() 964 self._kqueue.close() 965 966 def _control(self, fd, filters, flags): 967 mitogen.core._vv and IOLOG.debug( 968 '%r._control(%r, %r, %r)', self, fd, filters, flags) 969 # TODO: at shutdown it is currently possible for KQ_EV_ADD/KQ_EV_DEL 970 # pairs to be pending after the associated file descriptor has already 971 # been closed. Fixing this requires maintaining extra state, or perhaps 972 # making fd closure the poller's responsibility. In the meantime, 973 # simply apply changes immediately. 974 # self._changelist.append(select.kevent(fd, filters, flags)) 975 changelist = [select.kevent(fd, filters, flags)] 976 events, _ = mitogen.core.io_op(self._kqueue.control, changelist, 0, 0) 977 assert not events 978 979 def start_receive(self, fd, data=None): 980 mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)', 981 self, fd, data) 982 if fd not in self._rfds: 983 self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD) 984 self._rfds[fd] = (data or fd, self._generation) 985 986 def stop_receive(self, fd): 987 mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) 988 if fd in self._rfds: 989 self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE) 990 del self._rfds[fd] 991 992 def start_transmit(self, fd, data=None): 993 mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)', 994 self, fd, data) 995 if fd not in self._wfds: 996 self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD) 997 self._wfds[fd] = (data or fd, self._generation) 998 999 def stop_transmit(self, fd): 1000 mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) 1001 if fd in self._wfds: 1002 self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE) 1003 del self._wfds[fd] 1004 1005 def _poll(self, timeout): 1006 changelist = self._changelist 1007 self._changelist = [] 1008 events, _ = mitogen.core.io_op(self._kqueue.control, 1009 changelist, 32, timeout) 1010 for event in events: 1011 fd = event.ident 1012 if event.flags & select.KQ_EV_ERROR: 1013 LOG.debug('ignoring stale event for fd %r: errno=%d: %s', 1014 fd, event.data, errno.errorcode.get(event.data)) 1015 elif event.filter == select.KQ_FILTER_READ: 1016 data, gen = self._rfds.get(fd, (None, None)) 1017 # Events can still be read for an already-discarded fd. 1018 if gen and gen < self._generation: 1019 mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) 1020 yield data 1021 elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds: 1022 data, gen = self._wfds.get(fd, (None, None)) 1023 if gen and gen < self._generation: 1024 mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) 1025 yield data 1026 1027 1028class EpollPoller(mitogen.core.Poller): 1029 """ 1030 Poller based on the Linux :linux:man2:`epoll` interface. 1031 """ 1032 SUPPORTED = hasattr(select, 'epoll') 1033 _repr = 'EpollPoller()' 1034 1035 def __init__(self): 1036 super(EpollPoller, self).__init__() 1037 self._epoll = select.epoll(32) 1038 self._registered_fds = set() 1039 1040 def close(self): 1041 super(EpollPoller, self).close() 1042 self._epoll.close() 1043 1044 def _control(self, fd): 1045 mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd) 1046 mask = (((fd in self._rfds) and select.EPOLLIN) | 1047 ((fd in self._wfds) and select.EPOLLOUT)) 1048 if mask: 1049 if fd in self._registered_fds: 1050 self._epoll.modify(fd, mask) 1051 else: 1052 self._epoll.register(fd, mask) 1053 self._registered_fds.add(fd) 1054 elif fd in self._registered_fds: 1055 self._epoll.unregister(fd) 1056 self._registered_fds.remove(fd) 1057 1058 def start_receive(self, fd, data=None): 1059 mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)', 1060 self, fd, data) 1061 self._rfds[fd] = (data or fd, self._generation) 1062 self._control(fd) 1063 1064 def stop_receive(self, fd): 1065 mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd) 1066 self._rfds.pop(fd, None) 1067 self._control(fd) 1068 1069 def start_transmit(self, fd, data=None): 1070 mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)', 1071 self, fd, data) 1072 self._wfds[fd] = (data or fd, self._generation) 1073 self._control(fd) 1074 1075 def stop_transmit(self, fd): 1076 mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd) 1077 self._wfds.pop(fd, None) 1078 self._control(fd) 1079 1080 _inmask = (getattr(select, 'EPOLLIN', 0) | 1081 getattr(select, 'EPOLLHUP', 0)) 1082 1083 def _poll(self, timeout): 1084 the_timeout = -1 1085 if timeout is not None: 1086 the_timeout = timeout 1087 1088 events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32) 1089 for fd, event in events: 1090 if event & self._inmask: 1091 data, gen = self._rfds.get(fd, (None, None)) 1092 if gen and gen < self._generation: 1093 # Events can still be read for an already-discarded fd. 1094 mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd) 1095 yield data 1096 if event & select.EPOLLOUT: 1097 data, gen = self._wfds.get(fd, (None, None)) 1098 if gen and gen < self._generation: 1099 mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd) 1100 yield data 1101 1102 1103# 2.4 and 2.5 only had select.select() and select.poll(). 1104for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller: 1105 if _klass.SUPPORTED: 1106 PREFERRED_POLLER = _klass 1107 1108# For processes that start many threads or connections, it's possible Latch 1109# will also get high-numbered FDs, and so select() becomes useless there too. 1110# So swap in our favourite poller. 1111if PollPoller.SUPPORTED: 1112 mitogen.core.Latch.poller_class = PollPoller 1113else: 1114 mitogen.core.Latch.poller_class = PREFERRED_POLLER 1115 1116 1117class LineLoggingProtocolMixin(object): 1118 def __init__(self, **kwargs): 1119 super(LineLoggingProtocolMixin, self).__init__(**kwargs) 1120 self.logged_lines = [] 1121 self.logged_partial = None 1122 1123 def on_line_received(self, line): 1124 self.logged_partial = None 1125 self.logged_lines.append((mitogen.core.now(), line)) 1126 self.logged_lines[:] = self.logged_lines[-100:] 1127 return super(LineLoggingProtocolMixin, self).on_line_received(line) 1128 1129 def on_partial_line_received(self, line): 1130 self.logged_partial = line 1131 return super(LineLoggingProtocolMixin, self).on_partial_line_received(line) 1132 1133 def on_disconnect(self, broker): 1134 if self.logged_partial: 1135 self.logged_lines.append((mitogen.core.now(), self.logged_partial)) 1136 self.logged_partial = None 1137 super(LineLoggingProtocolMixin, self).on_disconnect(broker) 1138 1139 1140def get_history(streams): 1141 history = [] 1142 for stream in streams: 1143 if stream: 1144 history.extend(getattr(stream.protocol, 'logged_lines', [])) 1145 history.sort() 1146 1147 s = b('\n').join(h[1] for h in history) 1148 return mitogen.core.to_text(s) 1149 1150 1151class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol): 1152 """ 1153 Implement a delimited protocol where messages matching a set of regular 1154 expressions are dispatched to individual handler methods. Input is 1155 dispatches using :attr:`PATTERNS` and :attr:`PARTIAL_PATTERNS`, before 1156 falling back to :meth:`on_unrecognized_line_received` and 1157 :meth:`on_unrecognized_partial_line_received`. 1158 """ 1159 #: A sequence of 2-tuples of the form `(compiled pattern, method)` for 1160 #: patterns that should be matched against complete (delimited) messages, 1161 #: i.e. full lines. 1162 PATTERNS = [] 1163 1164 #: Like :attr:`PATTERNS`, but patterns that are matched against incomplete 1165 #: lines. 1166 PARTIAL_PATTERNS = [] 1167 1168 def on_line_received(self, line): 1169 super(RegexProtocol, self).on_line_received(line) 1170 for pattern, func in self.PATTERNS: 1171 match = pattern.search(line) 1172 if match is not None: 1173 return func(self, line, match) 1174 1175 return self.on_unrecognized_line_received(line) 1176 1177 def on_unrecognized_line_received(self, line): 1178 LOG.debug('%s: (unrecognized): %s', 1179 self.stream.name, line.decode('utf-8', 'replace')) 1180 1181 def on_partial_line_received(self, line): 1182 super(RegexProtocol, self).on_partial_line_received(line) 1183 LOG.debug('%s: (partial): %s', 1184 self.stream.name, line.decode('utf-8', 'replace')) 1185 for pattern, func in self.PARTIAL_PATTERNS: 1186 match = pattern.search(line) 1187 if match is not None: 1188 return func(self, line, match) 1189 1190 return self.on_unrecognized_partial_line_received(line) 1191 1192 def on_unrecognized_partial_line_received(self, line): 1193 LOG.debug('%s: (unrecognized partial): %s', 1194 self.stream.name, line.decode('utf-8', 'replace')) 1195 1196 1197class BootstrapProtocol(RegexProtocol): 1198 """ 1199 Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER` 1200 to be written by the first stage to indicate it can receive the bootstrap, 1201 then await :attr:`EC1_MARKER` to indicate success, and 1202 :class:`MitogenProtocol` can be enabled. 1203 """ 1204 #: Sentinel value emitted by the first stage to indicate it is ready to 1205 #: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have 1206 #: length of at least `max(len('password'), len('debug1:'))` 1207 EC0_MARKER = b('MITO000') 1208 EC1_MARKER = b('MITO001') 1209 EC2_MARKER = b('MITO002') 1210 1211 def __init__(self, broker): 1212 super(BootstrapProtocol, self).__init__() 1213 self._writer = mitogen.core.BufferedWriter(broker, self) 1214 1215 def on_transmit(self, broker): 1216 self._writer.on_transmit(broker) 1217 1218 def _on_ec0_received(self, line, match): 1219 LOG.debug('%r: first stage started succcessfully', self) 1220 self._writer.write(self.stream.conn.get_preamble()) 1221 1222 def _on_ec1_received(self, line, match): 1223 LOG.debug('%r: first stage received mitogen.core source', self) 1224 1225 def _on_ec2_received(self, line, match): 1226 LOG.debug('%r: new child booted successfully', self) 1227 self.stream.conn._complete_connection() 1228 return False 1229 1230 def on_unrecognized_line_received(self, line): 1231 LOG.debug('%s: stdout: %s', self.stream.name, 1232 line.decode('utf-8', 'replace')) 1233 1234 PATTERNS = [ 1235 (re.compile(EC0_MARKER), _on_ec0_received), 1236 (re.compile(EC1_MARKER), _on_ec1_received), 1237 (re.compile(EC2_MARKER), _on_ec2_received), 1238 ] 1239 1240 1241class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol): 1242 """ 1243 For "hybrid TTY/socketpair" mode, after connection setup a spare TTY master 1244 FD exists that cannot be closed, and to which SSH or sudo may continue 1245 writing log messages. 1246 1247 The descriptor cannot be closed since the UNIX TTY layer sends SIGHUP to 1248 processes whose controlling TTY is the slave whose master side was closed. 1249 LogProtocol takes over this FD and creates log messages for anything 1250 written to it. 1251 """ 1252 def on_line_received(self, line): 1253 """ 1254 Read a line, decode it as UTF-8, and log it. 1255 """ 1256 super(LogProtocol, self).on_line_received(line) 1257 LOG.info(u'%s: %s', self.stream.name, line.decode('utf-8', 'replace')) 1258 1259 1260class MitogenProtocol(mitogen.core.MitogenProtocol): 1261 """ 1262 Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child 1263 during graceful shutdown. 1264 """ 1265 def on_shutdown(self, broker): 1266 """ 1267 Respond to the broker's request for the stream to shut down by sending 1268 SHUTDOWN to the child. 1269 """ 1270 LOG.debug('%r: requesting child shutdown', self) 1271 self._send( 1272 mitogen.core.Message( 1273 src_id=mitogen.context_id, 1274 dst_id=self.remote_id, 1275 handle=mitogen.core.SHUTDOWN, 1276 ) 1277 ) 1278 1279 1280class Options(object): 1281 name = None 1282 1283 #: The path to the remote Python interpreter. 1284 python_path = get_sys_executable() 1285 1286 #: Maximum time to wait for a connection attempt. 1287 connect_timeout = 30.0 1288 1289 #: True to cause context to write verbose /tmp/mitogen.<pid>.log. 1290 debug = False 1291 1292 #: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log. 1293 profiling = False 1294 1295 #: True if unidirectional routing is enabled in the new child. 1296 unidirectional = False 1297 1298 #: Passed via Router wrapper methods, must eventually be passed to 1299 #: ExternalContext.main(). 1300 max_message_size = None 1301 1302 #: Remote name. 1303 remote_name = None 1304 1305 #: Derived from :py:attr:`connect_timeout`; absolute floating point 1306 #: UNIX timestamp after which the connection attempt should be abandoned. 1307 connect_deadline = None 1308 1309 def __init__(self, max_message_size, name=None, remote_name=None, 1310 python_path=None, debug=False, connect_timeout=None, 1311 profiling=False, unidirectional=False, old_router=None): 1312 self.name = name 1313 self.max_message_size = max_message_size 1314 if python_path: 1315 self.python_path = python_path 1316 if connect_timeout: 1317 self.connect_timeout = connect_timeout 1318 if remote_name is None: 1319 remote_name = get_default_remote_name() 1320 if '/' in remote_name or '\\' in remote_name: 1321 raise ValueError('remote_name= cannot contain slashes') 1322 if remote_name: 1323 self.remote_name = mitogen.core.to_text(remote_name) 1324 self.debug = debug 1325 self.profiling = profiling 1326 self.unidirectional = unidirectional 1327 self.max_message_size = max_message_size 1328 self.connect_deadline = mitogen.core.now() + self.connect_timeout 1329 1330 1331class Connection(object): 1332 """ 1333 Manage the lifetime of a set of :class:`Streams <Stream>` connecting to a 1334 remote Python interpreter, including bootstrap, disconnection, and external 1335 tool integration. 1336 1337 Base for streams capable of starting children. 1338 """ 1339 options_class = Options 1340 1341 #: The protocol attached to stdio of the child. 1342 stream_protocol_class = BootstrapProtocol 1343 1344 #: The protocol attached to stderr of the child. 1345 diag_protocol_class = LogProtocol 1346 1347 #: :class:`Process` 1348 proc = None 1349 1350 #: :class:`mitogen.core.Stream` with sides connected to stdin/stdout. 1351 stdio_stream = None 1352 1353 #: If `proc.stderr` is set, referencing either a plain pipe or the 1354 #: controlling TTY, this references the corresponding 1355 #: :class:`LogProtocol`'s stream, allowing it to be disconnected when this 1356 #: stream is disconnected. 1357 stderr_stream = None 1358 1359 #: Function with the semantics of :func:`create_child` used to create the 1360 #: child process. 1361 create_child = staticmethod(create_child) 1362 1363 #: Dictionary of extra kwargs passed to :attr:`create_child`. 1364 create_child_args = {} 1365 1366 #: :data:`True` if the remote has indicated that it intends to detach, and 1367 #: should not be killed on disconnect. 1368 detached = False 1369 1370 #: If :data:`True`, indicates the child should not be killed during 1371 #: graceful detachment, as it the actual process implementing the child 1372 #: context. In all other cases, the subprocess is SSH, sudo, or a similar 1373 #: tool that should be reminded to quit during disconnection. 1374 child_is_immediate_subprocess = True 1375 1376 #: Prefix given to default names generated by :meth:`connect`. 1377 name_prefix = u'local' 1378 1379 #: :class:`Timer` that runs :meth:`_on_timer_expired` when connection 1380 #: timeout occurs. 1381 _timer = None 1382 1383 #: When disconnection completes, instance of :class:`Reaper` used to wait 1384 #: on the exit status of the subprocess. 1385 _reaper = None 1386 1387 #: On failure, the exception object that should be propagated back to the 1388 #: user. 1389 exception = None 1390 1391 #: Extra text appended to :class:`EofError` if that exception is raised on 1392 #: a failed connection attempt. May be used in subclasses to hint at common 1393 #: problems with a particular connection method. 1394 eof_error_hint = None 1395 1396 def __init__(self, options, router): 1397 #: :class:`Options` 1398 self.options = options 1399 self._router = router 1400 1401 def __repr__(self): 1402 return 'Connection(%r)' % (self.stdio_stream,) 1403 1404 # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups 1405 # file descriptor 0 as 100, creates a pipe, then execs a new interpreter 1406 # with a custom argv. 1407 # * Optimized for minimum byte count after minification & compression. 1408 # * 'CONTEXT_NAME' and 'PREAMBLE_COMPRESSED_LEN' are substituted with 1409 # their respective values. 1410 # * CONTEXT_NAME must be prefixed with the name of the Python binary in 1411 # order to allow virtualenvs to detect their install prefix. 1412 # * For Darwin, OS X installs a craptacular argv0-introspecting Python 1413 # version switcher as /usr/bin/python. Override attempts to call it 1414 # with an explicit call to python2.7 1415 # 1416 # Locals: 1417 # R: read side of interpreter stdin. 1418 # W: write side of interpreter stdin. 1419 # r: read side of core_src FD. 1420 # w: write side of core_src FD. 1421 # C: the decompressed core source. 1422 1423 # Final os.close(2) to avoid --py-debug build from corrupting stream with 1424 # "[1234 refs]" during exit. 1425 @staticmethod 1426 def _first_stage(): 1427 R,W=os.pipe() 1428 r,w=os.pipe() 1429 if os.fork(): 1430 os.dup2(0,100) 1431 os.dup2(R,0) 1432 os.dup2(r,101) 1433 os.close(R) 1434 os.close(r) 1435 os.close(W) 1436 os.close(w) 1437 if sys.platform == 'darwin' and sys.executable == '/usr/bin/python': 1438 sys.executable += sys.version[:3] 1439 os.environ['ARGV0']=sys.executable 1440 os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)') 1441 os.write(1,'MITO000\n'.encode()) 1442 C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip') 1443 fp=os.fdopen(W,'wb',0) 1444 fp.write(C) 1445 fp.close() 1446 fp=os.fdopen(w,'wb',0) 1447 fp.write(C) 1448 fp.close() 1449 os.write(1,'MITO001\n'.encode()) 1450 os.close(2) 1451 1452 def get_python_argv(self): 1453 """ 1454 Return the initial argument vector elements necessary to invoke Python, 1455 by returning a 1-element list containing :attr:`python_path` if it is a 1456 string, or simply returning it if it is already a list. 1457 1458 This allows emulation of existing tools where the Python invocation may 1459 be set to e.g. `['/usr/bin/env', 'python']`. 1460 """ 1461 if isinstance(self.options.python_path, list): 1462 return self.options.python_path 1463 return [self.options.python_path] 1464 1465 def get_boot_command(self): 1466 source = inspect.getsource(self._first_stage) 1467 source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:])) 1468 source = source.replace(' ', '\t') 1469 source = source.replace('CONTEXT_NAME', self.options.remote_name) 1470 preamble_compressed = self.get_preamble() 1471 source = source.replace('PREAMBLE_COMPRESSED_LEN', 1472 str(len(preamble_compressed))) 1473 compressed = zlib.compress(source.encode(), 9) 1474 encoded = codecs.encode(compressed, 'base64').replace(b('\n'), b('')) 1475 # We can't use bytes.decode() in 3.x since it was restricted to always 1476 # return unicode, so codecs.decode() is used instead. In 3.x 1477 # codecs.decode() requires a bytes object. Since we must be compatible 1478 # with 2.4 (no bytes literal), an extra .encode() either returns the 1479 # same str (2.x) or an equivalent bytes (3.x). 1480 return self.get_python_argv() + [ 1481 '-c', 1482 'import codecs,os,sys;_=codecs.decode;' 1483 'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),) 1484 ] 1485 1486 def get_econtext_config(self): 1487 assert self.options.max_message_size is not None 1488 parent_ids = mitogen.parent_ids[:] 1489 parent_ids.insert(0, mitogen.context_id) 1490 return { 1491 'parent_ids': parent_ids, 1492 'context_id': self.context.context_id, 1493 'debug': self.options.debug, 1494 'profiling': self.options.profiling, 1495 'unidirectional': self.options.unidirectional, 1496 'log_level': get_log_level(), 1497 'whitelist': self._router.get_module_whitelist(), 1498 'blacklist': self._router.get_module_blacklist(), 1499 'max_message_size': self.options.max_message_size, 1500 'version': mitogen.__version__, 1501 } 1502 1503 def get_preamble(self): 1504 suffix = ( 1505 '\nExternalContext(%r).main()\n' %\ 1506 (self.get_econtext_config(),) 1507 ) 1508 partial = get_core_source_partial() 1509 return partial.append(suffix.encode('utf-8')) 1510 1511 def _get_name(self): 1512 """ 1513 Called by :meth:`connect` after :attr:`pid` is known. Subclasses can 1514 override it to specify a default stream name, or set 1515 :attr:`name_prefix` to generate a default format. 1516 """ 1517 return u'%s.%s' % (self.name_prefix, self.proc.pid) 1518 1519 def start_child(self): 1520 args = self.get_boot_command() 1521 LOG.debug('command line for %r: %s', self, Argv(args)) 1522 try: 1523 return self.create_child(args=args, **self.create_child_args) 1524 except OSError: 1525 e = sys.exc_info()[1] 1526 msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args)) 1527 raise mitogen.core.StreamError(msg) 1528 1529 def _adorn_eof_error(self, e): 1530 """ 1531 Subclasses may provide additional information in the case of a failed 1532 connection. 1533 """ 1534 if self.eof_error_hint: 1535 e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),) 1536 1537 def _complete_connection(self): 1538 self._timer.cancel() 1539 if not self.exception: 1540 mitogen.core.unlisten(self._router.broker, 'shutdown', 1541 self._on_broker_shutdown) 1542 self._router.register(self.context, self.stdio_stream) 1543 self.stdio_stream.set_protocol( 1544 MitogenProtocol( 1545 router=self._router, 1546 remote_id=self.context.context_id, 1547 ) 1548 ) 1549 self._router.route_monitor.notice_stream(self.stdio_stream) 1550 self.latch.put() 1551 1552 def _fail_connection(self, exc): 1553 """ 1554 Fail the connection attempt. 1555 """ 1556 LOG.debug('failing connection %s due to %r', 1557 self.stdio_stream and self.stdio_stream.name, exc) 1558 if self.exception is None: 1559 self._adorn_eof_error(exc) 1560 self.exception = exc 1561 mitogen.core.unlisten(self._router.broker, 'shutdown', 1562 self._on_broker_shutdown) 1563 for stream in self.stdio_stream, self.stderr_stream: 1564 if stream and not stream.receive_side.closed: 1565 stream.on_disconnect(self._router.broker) 1566 self._complete_connection() 1567 1568 eof_error_msg = 'EOF on stream; last 100 lines received:\n' 1569 1570 def on_stdio_disconnect(self): 1571 """ 1572 Handle stdio stream disconnection by failing the Connection if the 1573 stderr stream has already been closed. Otherwise, wait for it to close 1574 (or timeout), to allow buffered diagnostic logs to be consumed. 1575 1576 It is normal that when a subprocess aborts, stdio has nothing buffered 1577 when it is closed, thus signalling readability, causing an empty read 1578 (interpreted as indicating disconnection) on the next loop iteration, 1579 even if its stderr pipe has lots of diagnostic logs still buffered in 1580 the kernel. Therefore we must wait for both pipes to indicate they are 1581 empty before triggering connection failure. 1582 """ 1583 stderr = self.stderr_stream 1584 if stderr is None or stderr.receive_side.closed: 1585 self._on_streams_disconnected() 1586 1587 def on_stderr_disconnect(self): 1588 """ 1589 Inverse of :func:`on_stdio_disconnect`. 1590 """ 1591 if self.stdio_stream.receive_side.closed: 1592 self._on_streams_disconnected() 1593 1594 def _on_streams_disconnected(self): 1595 """ 1596 When disconnection has been detected for both streams, cancel the 1597 connection timer, mark the connection failed, and reap the child 1598 process. Do nothing if the timer has already been cancelled, indicating 1599 some existing failure has already been noticed. 1600 """ 1601 if self._timer.active: 1602 self._timer.cancel() 1603 self._fail_connection(EofError( 1604 self.eof_error_msg + get_history( 1605 [self.stdio_stream, self.stderr_stream] 1606 ) 1607 )) 1608 1609 if self._reaper: 1610 return 1611 1612 self._reaper = Reaper( 1613 broker=self._router.broker, 1614 proc=self.proc, 1615 kill=not ( 1616 (self.detached and self.child_is_immediate_subprocess) or 1617 # Avoid killing so child has chance to write cProfile data 1618 self._router.profiling 1619 ), 1620 # Don't delay shutdown waiting for a detached child, since the 1621 # detached child may expect to live indefinitely after its parent 1622 # exited. 1623 wait_on_shutdown=(not self.detached), 1624 ) 1625 self._reaper.reap() 1626 1627 def _on_broker_shutdown(self): 1628 """ 1629 Respond to broker.shutdown() being called by failing the connection 1630 attempt. 1631 """ 1632 self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG)) 1633 1634 def stream_factory(self): 1635 return self.stream_protocol_class.build_stream( 1636 broker=self._router.broker, 1637 ) 1638 1639 def stderr_stream_factory(self): 1640 return self.diag_protocol_class.build_stream() 1641 1642 def _setup_stdio_stream(self): 1643 stream = self.stream_factory() 1644 stream.conn = self 1645 stream.name = self.options.name or self._get_name() 1646 stream.accept(self.proc.stdout, self.proc.stdin) 1647 1648 mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect) 1649 self._router.broker.start_receive(stream) 1650 return stream 1651 1652 def _setup_stderr_stream(self): 1653 stream = self.stderr_stream_factory() 1654 stream.conn = self 1655 stream.name = self.options.name or self._get_name() 1656 stream.accept(self.proc.stderr, self.proc.stderr) 1657 1658 mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect) 1659 self._router.broker.start_receive(stream) 1660 return stream 1661 1662 def _on_timer_expired(self): 1663 self._fail_connection( 1664 mitogen.core.TimeoutError( 1665 'Failed to setup connection after %.2f seconds', 1666 self.options.connect_timeout, 1667 ) 1668 ) 1669 1670 def _async_connect(self): 1671 LOG.debug('creating connection to context %d using %s', 1672 self.context.context_id, self.__class__.__module__) 1673 mitogen.core.listen(self._router.broker, 'shutdown', 1674 self._on_broker_shutdown) 1675 self._timer = self._router.broker.timers.schedule( 1676 when=self.options.connect_deadline, 1677 func=self._on_timer_expired, 1678 ) 1679 1680 try: 1681 self.proc = self.start_child() 1682 except Exception: 1683 LOG.debug('failed to start child', exc_info=True) 1684 self._fail_connection(sys.exc_info()[1]) 1685 return 1686 1687 LOG.debug('child for %r started: pid:%r stdin:%r stdout:%r stderr:%r', 1688 self, self.proc.pid, 1689 self.proc.stdin.fileno(), 1690 self.proc.stdout.fileno(), 1691 self.proc.stderr and self.proc.stderr.fileno()) 1692 1693 self.stdio_stream = self._setup_stdio_stream() 1694 if self.context.name is None: 1695 self.context.name = self.stdio_stream.name 1696 self.proc.name = self.stdio_stream.name 1697 if self.proc.stderr: 1698 self.stderr_stream = self._setup_stderr_stream() 1699 1700 def connect(self, context): 1701 self.context = context 1702 self.latch = mitogen.core.Latch() 1703 self._router.broker.defer(self._async_connect) 1704 self.latch.get() 1705 if self.exception: 1706 raise self.exception 1707 1708 1709class ChildIdAllocator(object): 1710 """ 1711 Allocate new context IDs from a block of unique context IDs allocated by 1712 the master process. 1713 """ 1714 def __init__(self, router): 1715 self.router = router 1716 self.lock = threading.Lock() 1717 self.it = iter(xrange(0)) 1718 1719 def allocate(self): 1720 """ 1721 Allocate an ID, requesting a fresh block from the master if the 1722 existing block is exhausted. 1723 1724 :returns: 1725 The new context ID. 1726 1727 .. warning:: 1728 1729 This method is not safe to call from the :class:`Broker` thread, as 1730 it may block on IO of its own. 1731 """ 1732 self.lock.acquire() 1733 try: 1734 for id_ in self.it: 1735 return id_ 1736 1737 master = self.router.context_by_id(0) 1738 start, end = master.send_await( 1739 mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID) 1740 ) 1741 self.it = iter(xrange(start, end)) 1742 finally: 1743 self.lock.release() 1744 1745 return self.allocate() 1746 1747 1748class CallChain(object): 1749 """ 1750 Deliver :data:`mitogen.core.CALL_FUNCTION` messages to a target context, 1751 optionally threading related calls so an exception in an earlier call 1752 cancels subsequent calls. 1753 1754 :param mitogen.core.Context context: 1755 Target context. 1756 :param bool pipelined: 1757 Enable pipelining. 1758 1759 :meth:`call`, :meth:`call_no_reply` and :meth:`call_async` 1760 normally issue calls and produce responses with no memory of prior 1761 exceptions. If a call made with :meth:`call_no_reply` fails, the exception 1762 is logged to the target context's logging framework. 1763 1764 **Pipelining** 1765 1766 When pipelining is enabled, if an exception occurs during a call, 1767 subsequent calls made by the same :class:`CallChain` fail with the same 1768 exception, including those already in-flight on the network, and no further 1769 calls execute until :meth:`reset` is invoked. 1770 1771 No exception is logged for calls made with :meth:`call_no_reply`, instead 1772 the exception is saved and reported as the result of subsequent 1773 :meth:`call` or :meth:`call_async` calls. 1774 1775 Sequences of asynchronous calls can be made without wasting network 1776 round-trips to discover if prior calls succeed, and chains originating from 1777 multiple unrelated source contexts may overlap concurrently at a target 1778 context without interference. 1779 1780 In this example, 4 calls complete in one round-trip:: 1781 1782 chain = mitogen.parent.CallChain(context, pipelined=True) 1783 chain.call_no_reply(os.mkdir, '/tmp/foo') 1784 1785 # If previous mkdir() failed, this never runs: 1786 chain.call_no_reply(os.mkdir, '/tmp/foo/bar') 1787 1788 # If either mkdir() failed, this never runs, and the exception is 1789 # asynchronously delivered to the receiver. 1790 recv = chain.call_async(subprocess.check_output, '/tmp/foo') 1791 1792 # If anything so far failed, this never runs, and raises the exception. 1793 chain.call(do_something) 1794 1795 # If this code was executed, the exception would also be raised. 1796 if recv.get().unpickle() == 'baz': 1797 pass 1798 1799 When pipelining is enabled, :meth:`reset` must be invoked to ensure any 1800 exception is discarded, otherwise unbounded memory usage is possible in 1801 long-running programs. The context manager protocol is supported to ensure 1802 :meth:`reset` is always invoked:: 1803 1804 with mitogen.parent.CallChain(context, pipelined=True) as chain: 1805 chain.call_no_reply(...) 1806 chain.call_no_reply(...) 1807 chain.call_no_reply(...) 1808 chain.call(...) 1809 1810 # chain.reset() automatically invoked. 1811 """ 1812 def __init__(self, context, pipelined=False): 1813 self.context = context 1814 if pipelined: 1815 self.chain_id = self.make_chain_id() 1816 else: 1817 self.chain_id = None 1818 1819 @classmethod 1820 def make_chain_id(cls): 1821 return '%s-%s-%x-%x' % ( 1822 socket.gethostname(), 1823 os.getpid(), 1824 thread.get_ident(), 1825 int(1e6 * mitogen.core.now()), 1826 ) 1827 1828 def __repr__(self): 1829 return '%s(%s)' % (self.__class__.__name__, self.context) 1830 1831 def __enter__(self): 1832 return self 1833 1834 def __exit__(self, _1, _2, _3): 1835 self.reset() 1836 1837 def reset(self): 1838 """ 1839 Instruct the target to forget any related exception. 1840 """ 1841 if not self.chain_id: 1842 return 1843 1844 saved, self.chain_id = self.chain_id, None 1845 try: 1846 self.call_no_reply(mitogen.core.Dispatcher.forget_chain, saved) 1847 finally: 1848 self.chain_id = saved 1849 1850 closures_msg = ( 1851 'Mitogen cannot invoke closures, as doing so would require ' 1852 'serializing arbitrary program state, and no universal ' 1853 'method exists to recover a reference to them.' 1854 ) 1855 1856 lambda_msg = ( 1857 'Mitogen cannot invoke anonymous functions, as no universal method ' 1858 'exists to recover a reference to an anonymous function.' 1859 ) 1860 1861 method_msg = ( 1862 'Mitogen cannot invoke instance methods, as doing so would require ' 1863 'serializing arbitrary program state.' 1864 ) 1865 1866 def make_msg(self, fn, *args, **kwargs): 1867 if getattr(fn, closure_attr, None) is not None: 1868 raise TypeError(self.closures_msg) 1869 if fn.__name__ == '<lambda>': 1870 raise TypeError(self.lambda_msg) 1871 1872 if inspect.ismethod(fn): 1873 im_self = getattr(fn, IM_SELF_ATTR) 1874 if not inspect.isclass(im_self): 1875 raise TypeError(self.method_msg) 1876 klass = mitogen.core.to_text(im_self.__name__) 1877 else: 1878 klass = None 1879 1880 tup = ( 1881 self.chain_id, 1882 mitogen.core.to_text(fn.__module__), 1883 klass, 1884 mitogen.core.to_text(fn.__name__), 1885 args, 1886 mitogen.core.Kwargs(kwargs) 1887 ) 1888 return mitogen.core.Message.pickled(tup, 1889 handle=mitogen.core.CALL_FUNCTION) 1890 1891 def call_no_reply(self, fn, *args, **kwargs): 1892 """ 1893 Like :meth:`call_async`, but do not wait for a return value, and inform 1894 the target context no reply is expected. If the call fails and 1895 pipelining is disabled, the exception will be logged to the target 1896 context's logging framework. 1897 """ 1898 LOG.debug('starting no-reply function call to %r: %r', 1899 self.context.name or self.context.context_id, 1900 CallSpec(fn, args, kwargs)) 1901 self.context.send(self.make_msg(fn, *args, **kwargs)) 1902 1903 def call_async(self, fn, *args, **kwargs): 1904 """ 1905 Arrange for `fn(*args, **kwargs)` to be invoked on the context's main 1906 thread. 1907 1908 :param fn: 1909 A free function in module scope or a class method of a class 1910 directly reachable from module scope: 1911 1912 .. code-block:: python 1913 1914 # mymodule.py 1915 1916 def my_func(): 1917 '''A free function reachable as mymodule.my_func''' 1918 1919 class MyClass: 1920 @classmethod 1921 def my_classmethod(cls): 1922 '''Reachable as mymodule.MyClass.my_classmethod''' 1923 1924 def my_instancemethod(self): 1925 '''Unreachable: requires a class instance!''' 1926 1927 class MyEmbeddedClass: 1928 @classmethod 1929 def my_classmethod(cls): 1930 '''Not directly reachable from module scope!''' 1931 1932 :param tuple args: 1933 Function arguments, if any. See :ref:`serialization-rules` for 1934 permitted types. 1935 :param dict kwargs: 1936 Function keyword arguments, if any. See :ref:`serialization-rules` 1937 for permitted types. 1938 :returns: 1939 :class:`mitogen.core.Receiver` configured to receive the result of 1940 the invocation: 1941 1942 .. code-block:: python 1943 1944 recv = context.call_async(os.check_output, 'ls /tmp/') 1945 try: 1946 # Prints output once it is received. 1947 msg = recv.get() 1948 print(msg.unpickle()) 1949 except mitogen.core.CallError, e: 1950 print('Call failed:', str(e)) 1951 1952 Asynchronous calls may be dispatched in parallel to multiple 1953 contexts and consumed as they complete using 1954 :class:`mitogen.select.Select`. 1955 """ 1956 LOG.debug('starting function call to %s: %r', 1957 self.context.name or self.context.context_id, 1958 CallSpec(fn, args, kwargs)) 1959 return self.context.send_async(self.make_msg(fn, *args, **kwargs)) 1960 1961 def call(self, fn, *args, **kwargs): 1962 """ 1963 Like :meth:`call_async`, but block until the return value is available. 1964 Equivalent to:: 1965 1966 call_async(fn, *args, **kwargs).get().unpickle() 1967 1968 :returns: 1969 The function's return value. 1970 :raises mitogen.core.CallError: 1971 An exception was raised in the remote context during execution. 1972 """ 1973 receiver = self.call_async(fn, *args, **kwargs) 1974 return receiver.get().unpickle(throw_dead=False) 1975 1976 1977class Context(mitogen.core.Context): 1978 """ 1979 Extend :class:`mitogen.core.Context` with functionality useful to masters, 1980 and child contexts who later become parents. Currently when this class is 1981 required, the target context's router is upgraded at runtime. 1982 """ 1983 #: A :class:`CallChain` instance constructed by default, with pipelining 1984 #: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use 1985 #: this instance. 1986 call_chain_class = CallChain 1987 1988 via = None 1989 1990 def __init__(self, *args, **kwargs): 1991 super(Context, self).__init__(*args, **kwargs) 1992 self.default_call_chain = self.call_chain_class(self) 1993 1994 def __ne__(self, other): 1995 return not (self == other) 1996 1997 def __eq__(self, other): 1998 return ( 1999 isinstance(other, mitogen.core.Context) and 2000 (other.context_id == self.context_id) and 2001 (other.router == self.router) 2002 ) 2003 2004 def __hash__(self): 2005 return hash((self.router, self.context_id)) 2006 2007 def call_async(self, fn, *args, **kwargs): 2008 """ 2009 See :meth:`CallChain.call_async`. 2010 """ 2011 return self.default_call_chain.call_async(fn, *args, **kwargs) 2012 2013 def call(self, fn, *args, **kwargs): 2014 """ 2015 See :meth:`CallChain.call`. 2016 """ 2017 return self.default_call_chain.call(fn, *args, **kwargs) 2018 2019 def call_no_reply(self, fn, *args, **kwargs): 2020 """ 2021 See :meth:`CallChain.call_no_reply`. 2022 """ 2023 self.default_call_chain.call_no_reply(fn, *args, **kwargs) 2024 2025 def shutdown(self, wait=False): 2026 """ 2027 Arrange for the context to receive a ``SHUTDOWN`` message, triggering 2028 graceful shutdown. 2029 2030 Due to a lack of support for timers, no attempt is made yet to force 2031 terminate a hung context using this method. This will be fixed shortly. 2032 2033 :param bool wait: 2034 If :data:`True`, block the calling thread until the context has 2035 completely terminated. 2036 2037 :returns: 2038 If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` 2039 whose :meth:`get() <mitogen.core.Latch.get>` method returns 2040 :data:`None` when shutdown completes. The `timeout` parameter may 2041 be used to implement graceful timeouts. 2042 """ 2043 LOG.debug('%r.shutdown() sending SHUTDOWN', self) 2044 latch = mitogen.core.Latch() 2045 mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) 2046 self.send( 2047 mitogen.core.Message( 2048 handle=mitogen.core.SHUTDOWN, 2049 ) 2050 ) 2051 2052 if wait: 2053 latch.get() 2054 else: 2055 return latch 2056 2057 2058class RouteMonitor(object): 2059 """ 2060 Generate and respond to :data:`mitogen.core.ADD_ROUTE` and 2061 :data:`mitogen.core.DEL_ROUTE` messages sent to the local context by 2062 maintaining a table of available routes, and propagating messages towards 2063 parents and siblings as appropriate. 2064 2065 :class:`RouteMonitor` is responsible for generating routing messages for 2066 directly attached children. It learns of new children via 2067 :meth:`notice_stream` called by :class:`Router`, and subscribes to their 2068 ``disconnect`` event to learn when they disappear. 2069 2070 In children, constructing this class overwrites the stub 2071 :data:`mitogen.core.DEL_ROUTE` handler installed by 2072 :class:`mitogen.core.ExternalContext`, which is expected behaviour when a 2073 child is beging upgraded in preparation to become a parent of children of 2074 its own. 2075 2076 By virtue of only being active while responding to messages from a handler, 2077 RouteMonitor lives entirely on the broker thread, so its data requires no 2078 locking. 2079 2080 :param mitogen.master.Router router: 2081 Router to install handlers on. 2082 :param mitogen.core.Context parent: 2083 :data:`None` in the master process, or reference to the parent context 2084 we should propagate route updates towards. 2085 """ 2086 def __init__(self, router, parent=None): 2087 self.router = router 2088 self.parent = parent 2089 self._log = logging.getLogger('mitogen.route_monitor') 2090 #: Mapping of Stream instance to integer context IDs reachable via the 2091 #: stream; used to cleanup routes during disconnection. 2092 self._routes_by_stream = {} 2093 self.router.add_handler( 2094 fn=self._on_add_route, 2095 handle=mitogen.core.ADD_ROUTE, 2096 persist=True, 2097 policy=is_immediate_child, 2098 overwrite=True, 2099 ) 2100 self.router.add_handler( 2101 fn=self._on_del_route, 2102 handle=mitogen.core.DEL_ROUTE, 2103 persist=True, 2104 policy=is_immediate_child, 2105 overwrite=True, 2106 ) 2107 2108 def __repr__(self): 2109 return 'RouteMonitor()' 2110 2111 def _send_one(self, stream, handle, target_id, name): 2112 """ 2113 Compose and send an update message on a stream. 2114 2115 :param mitogen.core.Stream stream: 2116 Stream to send it on. 2117 :param int handle: 2118 :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE` 2119 :param int target_id: 2120 ID of the connecting or disconnecting context. 2121 :param str name: 2122 Context name or :data:`None`. 2123 """ 2124 if not stream: 2125 # We may not have a stream during shutdown. 2126 return 2127 2128 data = str(target_id) 2129 if name: 2130 data = '%s:%s' % (target_id, name) 2131 stream.protocol.send( 2132 mitogen.core.Message( 2133 handle=handle, 2134 data=data.encode('utf-8'), 2135 dst_id=stream.protocol.remote_id, 2136 ) 2137 ) 2138 2139 def _propagate_up(self, handle, target_id, name=None): 2140 """ 2141 In a non-master context, propagate an update towards the master. 2142 2143 :param int handle: 2144 :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE` 2145 :param int target_id: 2146 ID of the connecting or disconnecting context. 2147 :param str name: 2148 For :data:`mitogen.core.ADD_ROUTE`, the name of the new context 2149 assigned by its parent. This is used by parents to assign the 2150 :attr:`mitogen.core.Context.name` attribute. 2151 """ 2152 if self.parent: 2153 stream = self.router.stream_by_id(self.parent.context_id) 2154 self._send_one(stream, handle, target_id, name) 2155 2156 def _propagate_down(self, handle, target_id): 2157 """ 2158 For DEL_ROUTE, we additionally want to broadcast the message to any 2159 stream that has ever communicated with the disconnecting ID, so 2160 core.py's :meth:`mitogen.core.Router._on_del_route` can turn the 2161 message into a disconnect event. 2162 2163 :param int handle: 2164 :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE` 2165 :param int target_id: 2166 ID of the connecting or disconnecting context. 2167 """ 2168 for stream in self.router.get_streams(): 2169 if target_id in stream.protocol.egress_ids and ( 2170 (self.parent is None) or 2171 (self.parent.context_id != stream.protocol.remote_id) 2172 ): 2173 self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) 2174 2175 def notice_stream(self, stream): 2176 """ 2177 When this parent is responsible for a new directly connected child 2178 stream, we're also responsible for broadcasting 2179 :data:`mitogen.core.DEL_ROUTE` upstream when that child disconnects. 2180 """ 2181 self._routes_by_stream[stream] = set([stream.protocol.remote_id]) 2182 self._propagate_up(mitogen.core.ADD_ROUTE, stream.protocol.remote_id, 2183 stream.name) 2184 mitogen.core.listen( 2185 obj=stream, 2186 name='disconnect', 2187 func=lambda: self._on_stream_disconnect(stream), 2188 ) 2189 2190 def get_routes(self, stream): 2191 """ 2192 Return the set of context IDs reachable on a stream. 2193 2194 :param mitogen.core.Stream stream: 2195 :returns: set([int]) 2196 """ 2197 return self._routes_by_stream.get(stream) or set() 2198 2199 def _on_stream_disconnect(self, stream): 2200 """ 2201 Respond to disconnection of a local stream by propagating DEL_ROUTE for 2202 any contexts we know were attached to it. 2203 """ 2204 # During a stream crash it is possible for disconnect signal to fire 2205 # twice, in which case ignore the second instance. 2206 routes = self._routes_by_stream.pop(stream, None) 2207 if routes is None: 2208 return 2209 2210 self._log.debug('stream %s is gone; propagating DEL_ROUTE for %r', 2211 stream.name, routes) 2212 for target_id in routes: 2213 self.router.del_route(target_id) 2214 self._propagate_up(mitogen.core.DEL_ROUTE, target_id) 2215 self._propagate_down(mitogen.core.DEL_ROUTE, target_id) 2216 2217 context = self.router.context_by_id(target_id, create=False) 2218 if context: 2219 mitogen.core.fire(context, 'disconnect') 2220 2221 def _on_add_route(self, msg): 2222 """ 2223 Respond to :data:`mitogen.core.ADD_ROUTE` by validating the source of 2224 the message, updating the local table, and propagating the message 2225 upwards. 2226 """ 2227 if msg.is_dead: 2228 return 2229 2230 target_id_s, _, target_name = bytes_partition(msg.data, b(':')) 2231 target_name = target_name.decode() 2232 target_id = int(target_id_s) 2233 self.router.context_by_id(target_id).name = target_name 2234 stream = self.router.stream_by_id(msg.src_id) 2235 current = self.router.stream_by_id(target_id) 2236 if current and current.protocol.remote_id != mitogen.parent_id: 2237 self._log.error('Cannot add duplicate route to %r via %r, ' 2238 'already have existing route via %r', 2239 target_id, stream, current) 2240 return 2241 2242 self._log.debug('Adding route to %d via %r', target_id, stream) 2243 self._routes_by_stream[stream].add(target_id) 2244 self.router.add_route(target_id, stream) 2245 self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name) 2246 2247 def _on_del_route(self, msg): 2248 """ 2249 Respond to :data:`mitogen.core.DEL_ROUTE` by validating the source of 2250 the message, updating the local table, propagating the message 2251 upwards, and downwards towards any stream that every had a message 2252 forwarded from it towards the disconnecting context. 2253 """ 2254 if msg.is_dead: 2255 return 2256 2257 target_id = int(msg.data) 2258 registered_stream = self.router.stream_by_id(target_id) 2259 if registered_stream is None: 2260 return 2261 2262 stream = self.router.stream_by_id(msg.src_id) 2263 if registered_stream != stream: 2264 self._log.error('received DEL_ROUTE for %d from %r, expected %r', 2265 target_id, stream, registered_stream) 2266 return 2267 2268 context = self.router.context_by_id(target_id, create=False) 2269 if context: 2270 self._log.debug('firing local disconnect signal for %r', context) 2271 mitogen.core.fire(context, 'disconnect') 2272 2273 self._log.debug('deleting route to %d via %r', target_id, stream) 2274 routes = self._routes_by_stream.get(stream) 2275 if routes: 2276 routes.discard(target_id) 2277 2278 self.router.del_route(target_id) 2279 if stream.protocol.remote_id != mitogen.parent_id: 2280 self._propagate_up(mitogen.core.DEL_ROUTE, target_id) 2281 self._propagate_down(mitogen.core.DEL_ROUTE, target_id) 2282 2283 2284class Router(mitogen.core.Router): 2285 context_class = Context 2286 debug = False 2287 profiling = False 2288 2289 id_allocator = None 2290 responder = None 2291 log_forwarder = None 2292 route_monitor = None 2293 2294 def upgrade(self, importer, parent): 2295 LOG.debug('upgrading %r with capabilities to start new children', self) 2296 self.id_allocator = ChildIdAllocator(router=self) 2297 self.responder = ModuleForwarder( 2298 router=self, 2299 parent_context=parent, 2300 importer=importer, 2301 ) 2302 self.route_monitor = RouteMonitor(self, parent) 2303 self.add_handler( 2304 fn=self._on_detaching, 2305 handle=mitogen.core.DETACHING, 2306 persist=True, 2307 ) 2308 2309 def _on_detaching(self, msg): 2310 if msg.is_dead: 2311 return 2312 stream = self.stream_by_id(msg.src_id) 2313 if stream.protocol.remote_id != msg.src_id or stream.conn.detached: 2314 LOG.warning('bad DETACHING received on %r: %r', stream, msg) 2315 return 2316 LOG.debug('%r: marking as detached', stream) 2317 stream.conn.detached = True 2318 msg.reply(None) 2319 2320 def get_streams(self): 2321 """ 2322 Return an atomic snapshot of all streams in existence at time of call. 2323 This is safe to call from any thread. 2324 """ 2325 self._write_lock.acquire() 2326 try: 2327 return itervalues(self._stream_by_id) 2328 finally: 2329 self._write_lock.release() 2330 2331 def disconnect(self, context): 2332 """ 2333 Disconnect a context and forget its stream, assuming the context is 2334 directly connected. 2335 """ 2336 stream = self.stream_by_id(context) 2337 if stream is None or stream.protocol.remote_id != context.context_id: 2338 return 2339 2340 l = mitogen.core.Latch() 2341 mitogen.core.listen(stream, 'disconnect', l.put) 2342 def disconnect(): 2343 LOG.debug('Starting disconnect of %r', stream) 2344 stream.on_disconnect(self.broker) 2345 self.broker.defer(disconnect) 2346 l.get() 2347 2348 def add_route(self, target_id, stream): 2349 """ 2350 Arrange for messages whose `dst_id` is `target_id` to be forwarded on a 2351 directly connected :class:`Stream`. Safe to call from any thread. 2352 2353 This is called automatically by :class:`RouteMonitor` in response to 2354 :data:`mitogen.core.ADD_ROUTE` messages, but remains public while the 2355 design has not yet settled, and situations may arise where routing is 2356 not fully automatic. 2357 2358 :param int target_id: 2359 Target context ID to add a route for. 2360 :param mitogen.core.Stream stream: 2361 Stream over which messages to the target should be routed. 2362 """ 2363 LOG.debug('%r: adding route to context %r via %r', 2364 self, target_id, stream) 2365 assert isinstance(target_id, int) 2366 assert isinstance(stream, mitogen.core.Stream) 2367 2368 self._write_lock.acquire() 2369 try: 2370 self._stream_by_id[target_id] = stream 2371 finally: 2372 self._write_lock.release() 2373 2374 def del_route(self, target_id): 2375 """ 2376 Delete any route that exists for `target_id`. It is not an error to 2377 delete a route that does not currently exist. Safe to call from any 2378 thread. 2379 2380 This is called automatically by :class:`RouteMonitor` in response to 2381 :data:`mitogen.core.DEL_ROUTE` messages, but remains public while the 2382 design has not yet settled, and situations may arise where routing is 2383 not fully automatic. 2384 2385 :param int target_id: 2386 Target context ID to delete route for. 2387 """ 2388 LOG.debug('%r: deleting route to %r', self, target_id) 2389 # DEL_ROUTE may be sent by a parent if it knows this context sent 2390 # messages to a peer that has now disconnected, to let us raise 2391 # 'disconnect' event on the appropriate Context instance. In that case, 2392 # we won't a matching _stream_by_id entry for the disappearing route, 2393 # so don't raise an error for a missing key here. 2394 self._write_lock.acquire() 2395 try: 2396 self._stream_by_id.pop(target_id, None) 2397 finally: 2398 self._write_lock.release() 2399 2400 def get_module_blacklist(self): 2401 if mitogen.context_id == 0: 2402 return self.responder.blacklist 2403 return self.importer.master_blacklist 2404 2405 def get_module_whitelist(self): 2406 if mitogen.context_id == 0: 2407 return self.responder.whitelist 2408 return self.importer.master_whitelist 2409 2410 def allocate_id(self): 2411 return self.id_allocator.allocate() 2412 2413 connection_timeout_msg = u"Connection timed out." 2414 2415 def _connect(self, klass, **kwargs): 2416 context_id = self.allocate_id() 2417 context = self.context_class(self, context_id) 2418 context.name = kwargs.get('name') 2419 2420 kwargs['old_router'] = self 2421 kwargs['max_message_size'] = self.max_message_size 2422 conn = klass(klass.options_class(**kwargs), self) 2423 try: 2424 conn.connect(context=context) 2425 except mitogen.core.TimeoutError: 2426 raise mitogen.core.StreamError(self.connection_timeout_msg) 2427 2428 return context 2429 2430 def connect(self, method_name, name=None, **kwargs): 2431 if name: 2432 name = mitogen.core.to_text(name) 2433 2434 klass = get_connection_class(method_name) 2435 kwargs.setdefault(u'debug', self.debug) 2436 kwargs.setdefault(u'profiling', self.profiling) 2437 kwargs.setdefault(u'unidirectional', self.unidirectional) 2438 kwargs.setdefault(u'name', name) 2439 2440 via = kwargs.pop(u'via', None) 2441 if via is not None: 2442 return self.proxy_connect(via, method_name, 2443 **mitogen.core.Kwargs(kwargs)) 2444 return self._connect(klass, **mitogen.core.Kwargs(kwargs)) 2445 2446 def proxy_connect(self, via_context, method_name, name=None, **kwargs): 2447 resp = via_context.call(_proxy_connect, 2448 name=name, 2449 method_name=method_name, 2450 kwargs=mitogen.core.Kwargs(kwargs), 2451 ) 2452 if resp['msg'] is not None: 2453 raise mitogen.core.StreamError(resp['msg']) 2454 2455 name = u'%s.%s' % (via_context.name, resp['name']) 2456 context = self.context_class(self, resp['id'], name=name) 2457 context.via = via_context 2458 self._write_lock.acquire() 2459 try: 2460 self._context_by_id[context.context_id] = context 2461 finally: 2462 self._write_lock.release() 2463 return context 2464 2465 def buildah(self, **kwargs): 2466 return self.connect(u'buildah', **kwargs) 2467 2468 def doas(self, **kwargs): 2469 return self.connect(u'doas', **kwargs) 2470 2471 def docker(self, **kwargs): 2472 return self.connect(u'docker', **kwargs) 2473 2474 def kubectl(self, **kwargs): 2475 return self.connect(u'kubectl', **kwargs) 2476 2477 def fork(self, **kwargs): 2478 return self.connect(u'fork', **kwargs) 2479 2480 def jail(self, **kwargs): 2481 return self.connect(u'jail', **kwargs) 2482 2483 def local(self, **kwargs): 2484 return self.connect(u'local', **kwargs) 2485 2486 def lxc(self, **kwargs): 2487 return self.connect(u'lxc', **kwargs) 2488 2489 def lxd(self, **kwargs): 2490 return self.connect(u'lxd', **kwargs) 2491 2492 def setns(self, **kwargs): 2493 return self.connect(u'setns', **kwargs) 2494 2495 def su(self, **kwargs): 2496 return self.connect(u'su', **kwargs) 2497 2498 def sudo(self, **kwargs): 2499 return self.connect(u'sudo', **kwargs) 2500 2501 def ssh(self, **kwargs): 2502 return self.connect(u'ssh', **kwargs) 2503 2504 2505class Reaper(object): 2506 """ 2507 Asynchronous logic for reaping :class:`Process` objects. This is necessary 2508 to prevent uncontrolled buildup of zombie processes in long-lived parents 2509 that will eventually reach an OS limit, preventing creation of new threads 2510 and processes, and to log the exit status of the child in the case of an 2511 error. 2512 2513 To avoid modifying process-global state such as with 2514 :func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler 2515 that might interfere with the user's ability to use those facilities, 2516 Reaper polls for exit with backoff using timers installed on an associated 2517 :class:`Broker`. 2518 2519 :param mitogen.core.Broker broker: 2520 The :class:`Broker` on which to install timers 2521 :param mitogen.parent.Process proc: 2522 The process to reap. 2523 :param bool kill: 2524 If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process. 2525 :param bool wait_on_shutdown: 2526 If :data:`True`, delay :class:`Broker` shutdown if child has not yet 2527 exited. If :data:`False` simply forget the child. 2528 """ 2529 #: :class:`Timer` that invokes :meth:`reap` after some polling delay. 2530 _timer = None 2531 2532 def __init__(self, broker, proc, kill, wait_on_shutdown): 2533 self.broker = broker 2534 self.proc = proc 2535 self.kill = kill 2536 self.wait_on_shutdown = wait_on_shutdown 2537 self._tries = 0 2538 2539 def _signal_child(self, signum): 2540 # For processes like sudo we cannot actually send sudo a signal, 2541 # because it is setuid, so this is best-effort only. 2542 LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum]) 2543 try: 2544 os.kill(self.proc.pid, signum) 2545 except OSError: 2546 e = sys.exc_info()[1] 2547 if e.args[0] != errno.EPERM: 2548 raise 2549 2550 def _calc_delay(self, count): 2551 """ 2552 Calculate a poll delay given `count` attempts have already been made. 2553 These constants have no principle, they just produce rapid but still 2554 relatively conservative retries. 2555 """ 2556 delay = 0.05 2557 for _ in xrange(count): 2558 delay *= 1.72 2559 return delay 2560 2561 def _on_broker_shutdown(self): 2562 """ 2563 Respond to :class:`Broker` shutdown by cancelling the reap timer if 2564 :attr:`Router.await_children_at_shutdown` is disabled. Otherwise 2565 shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for 2566 subprocesses may have no intention of exiting any time soon. 2567 """ 2568 if not self.wait_on_shutdown: 2569 self._timer.cancel() 2570 2571 def _install_timer(self, delay): 2572 new = self._timer is None 2573 self._timer = self.broker.timers.schedule( 2574 when=mitogen.core.now() + delay, 2575 func=self.reap, 2576 ) 2577 if new: 2578 mitogen.core.listen(self.broker, 'shutdown', 2579 self._on_broker_shutdown) 2580 2581 def _remove_timer(self): 2582 if self._timer and self._timer.active: 2583 self._timer.cancel() 2584 mitogen.core.unlisten(self.broker, 'shutdown', 2585 self._on_broker_shutdown) 2586 2587 def reap(self): 2588 """ 2589 Reap the child process during disconnection. 2590 """ 2591 status = self.proc.poll() 2592 if status is not None: 2593 LOG.debug('%r: %s', self.proc, returncode_to_str(status)) 2594 mitogen.core.fire(self.proc, 'exit') 2595 self._remove_timer() 2596 return 2597 2598 self._tries += 1 2599 if self._tries > 20: 2600 LOG.warning('%r: child will not exit, giving up', self) 2601 self._remove_timer() 2602 return 2603 2604 delay = self._calc_delay(self._tries - 1) 2605 LOG.debug('%r still running after IO disconnect, recheck in %.03fs', 2606 self.proc, delay) 2607 self._install_timer(delay) 2608 2609 if not self.kill: 2610 pass 2611 elif self._tries == 2: 2612 self._signal_child(signal.SIGTERM) 2613 elif self._tries == 6: # roughly 4 seconds 2614 self._signal_child(signal.SIGKILL) 2615 2616 2617class Process(object): 2618 """ 2619 Process objects provide a uniform interface to the :mod:`subprocess` and 2620 :mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and 2621 :class:`mitogen.fork.Process`. 2622 2623 :param int pid: 2624 The process ID. 2625 :param file stdin: 2626 File object attached to standard input. 2627 :param file stdout: 2628 File object attached to standard output. 2629 :param file stderr: 2630 File object attached to standard error, or :data:`None`. 2631 """ 2632 #: Name of the process used in logs. Set to the stream/context name by 2633 #: :class:`Connection`. 2634 name = None 2635 2636 def __init__(self, pid, stdin, stdout, stderr=None): 2637 #: The process ID. 2638 self.pid = pid 2639 #: File object attached to standard input. 2640 self.stdin = stdin 2641 #: File object attached to standard output. 2642 self.stdout = stdout 2643 #: File object attached to standard error. 2644 self.stderr = stderr 2645 2646 def __repr__(self): 2647 return '%s %s pid %d' % ( 2648 type(self).__name__, 2649 self.name, 2650 self.pid, 2651 ) 2652 2653 def poll(self): 2654 """ 2655 Fetch the child process exit status, or :data:`None` if it is still 2656 running. This should be overridden by subclasses. 2657 2658 :returns: 2659 Exit status in the style of the :attr:`subprocess.Popen.returncode` 2660 attribute, i.e. with signals represented by a negative integer. 2661 """ 2662 raise NotImplementedError() 2663 2664 2665class PopenProcess(Process): 2666 """ 2667 :class:`Process` subclass wrapping a :class:`subprocess.Popen` object. 2668 2669 :param subprocess.Popen proc: 2670 The subprocess. 2671 """ 2672 def __init__(self, proc, stdin, stdout, stderr=None): 2673 super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr) 2674 #: The subprocess. 2675 self.proc = proc 2676 2677 def poll(self): 2678 return self.proc.poll() 2679 2680 2681class ModuleForwarder(object): 2682 """ 2683 Respond to :data:`mitogen.core.GET_MODULE` requests in a child by 2684 forwarding the request to our parent context, or satisfying the request 2685 from our local Importer cache. 2686 """ 2687 def __init__(self, router, parent_context, importer): 2688 self.router = router 2689 self.parent_context = parent_context 2690 self.importer = importer 2691 router.add_handler( 2692 fn=self._on_forward_module, 2693 handle=mitogen.core.FORWARD_MODULE, 2694 persist=True, 2695 policy=mitogen.core.has_parent_authority, 2696 ) 2697 router.add_handler( 2698 fn=self._on_get_module, 2699 handle=mitogen.core.GET_MODULE, 2700 persist=True, 2701 policy=is_immediate_child, 2702 ) 2703 2704 def __repr__(self): 2705 return 'ModuleForwarder' 2706 2707 def _on_forward_module(self, msg): 2708 if msg.is_dead: 2709 return 2710 2711 context_id_s, _, fullname = bytes_partition(msg.data, b('\x00')) 2712 fullname = mitogen.core.to_text(fullname) 2713 context_id = int(context_id_s) 2714 stream = self.router.stream_by_id(context_id) 2715 if stream.protocol.remote_id == mitogen.parent_id: 2716 LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child', 2717 self, context_id, fullname) 2718 return 2719 2720 if fullname in stream.protocol.sent_modules: 2721 return 2722 2723 LOG.debug('%r._on_forward_module() sending %r to %r via %r', 2724 self, fullname, context_id, stream.protocol.remote_id) 2725 self._send_module_and_related(stream, fullname) 2726 if stream.protocol.remote_id != context_id: 2727 stream.protocol._send( 2728 mitogen.core.Message( 2729 data=msg.data, 2730 handle=mitogen.core.FORWARD_MODULE, 2731 dst_id=stream.protocol.remote_id, 2732 ) 2733 ) 2734 2735 def _on_get_module(self, msg): 2736 if msg.is_dead: 2737 return 2738 2739 fullname = msg.data.decode('utf-8') 2740 LOG.debug('%r: %s requested by context %d', self, fullname, msg.src_id) 2741 callback = lambda: self._on_cache_callback(msg, fullname) 2742 self.importer._request_module(fullname, callback) 2743 2744 def _on_cache_callback(self, msg, fullname): 2745 stream = self.router.stream_by_id(msg.src_id) 2746 LOG.debug('%r: sending %s to %r', self, fullname, stream) 2747 self._send_module_and_related(stream, fullname) 2748 2749 def _send_module_and_related(self, stream, fullname): 2750 tup = self.importer._cache[fullname] 2751 for related in tup[4]: 2752 rtup = self.importer._cache.get(related) 2753 if rtup: 2754 self._send_one_module(stream, rtup) 2755 else: 2756 LOG.debug('%r: %s not in cache (for %s)', 2757 self, related, fullname) 2758 2759 self._send_one_module(stream, tup) 2760 2761 def _send_one_module(self, stream, tup): 2762 if tup[0] not in stream.protocol.sent_modules: 2763 stream.protocol.sent_modules.add(tup[0]) 2764 self.router._async_route( 2765 mitogen.core.Message.pickled( 2766 tup, 2767 dst_id=stream.protocol.remote_id, 2768 handle=mitogen.core.LOAD_MODULE, 2769 ) 2770 ) 2771