1# Copyright 2019, David Wilson
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are met:
5#
6# 1. Redistributions of source code must retain the above copyright notice,
7# this list of conditions and the following disclaimer.
8#
9# 2. Redistributions in binary form must reproduce the above copyright notice,
10# this list of conditions and the following disclaimer in the documentation
11# and/or other materials provided with the distribution.
12#
13# 3. Neither the name of the copyright holder nor the names of its contributors
14# may be used to endorse or promote products derived from this software without
15# specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28
29# !mitogen: minify_safe
30
31"""
32This module defines functionality common to master and parent processes. It is
33sent to any child context that is due to become a parent, due to recursive
34connection.
35"""
36
37import codecs
38import errno
39import fcntl
40import getpass
41import heapq
42import inspect
43import logging
44import os
45import re
46import signal
47import socket
48import struct
49import subprocess
50import sys
51import termios
52import textwrap
53import threading
54import zlib
55
56# Absolute imports for <2.5.
57select = __import__('select')
58
59try:
60    import thread
61except ImportError:
62    import threading as thread
63
64import mitogen.core
65from mitogen.core import b
66from mitogen.core import bytes_partition
67from mitogen.core import IOLOG
68
69
70LOG = logging.getLogger(__name__)
71
72# #410: we must avoid the use of socketpairs if SELinux is enabled.
73try:
74    fp = open('/sys/fs/selinux/enforce', 'rb')
75    try:
76        SELINUX_ENABLED = bool(int(fp.read()))
77    finally:
78        fp.close()
79except IOError:
80    SELINUX_ENABLED = False
81
82
83try:
84    next
85except NameError:
86    # Python 2.4/2.5
87    from mitogen.core import next
88
89
90itervalues = getattr(dict, 'itervalues', dict.values)
91
92if mitogen.core.PY3:
93    xrange = range
94    closure_attr = '__closure__'
95    IM_SELF_ATTR = '__self__'
96else:
97    closure_attr = 'func_closure'
98    IM_SELF_ATTR = 'im_self'
99
100
101try:
102    SC_OPEN_MAX = os.sysconf('SC_OPEN_MAX')
103except ValueError:
104    SC_OPEN_MAX = 1024
105
106BROKER_SHUTDOWN_MSG = (
107    'Connection cancelled because the associated Broker began to shut down.'
108)
109
110OPENPTY_MSG = (
111    "Failed to create a PTY: %s. It is likely the maximum number of PTYs has "
112    "been reached. Consider increasing the 'kern.tty.ptmx_max' sysctl on OS "
113    "X, the 'kernel.pty.max' sysctl on Linux, or modifying your configuration "
114    "to avoid PTY use."
115)
116
117SYS_EXECUTABLE_MSG = (
118    "The Python sys.executable variable is unset, indicating Python was "
119    "unable to determine its original program name. Unless explicitly "
120    "configured otherwise, child contexts will be started using "
121    "'/usr/bin/python'"
122)
123_sys_executable_warning_logged = False
124
125
126def _ioctl_cast(n):
127    """
128    Linux ioctl() request parameter is unsigned, whereas on BSD/Darwin it is
129    signed. Until 2.5 Python exclusively implemented the BSD behaviour,
130    preventing use of large unsigned int requests like the TTY layer uses
131    below. So on 2.4, we cast our unsigned to look like signed for Python.
132    """
133    if sys.version_info < (2, 5):
134        n, = struct.unpack('i', struct.pack('I', n))
135    return n
136
137
138# If not :data:`None`, called prior to exec() of any new child process. Used by
139# :func:`mitogen.utils.reset_affinity` to allow the child to be freely
140# scheduled.
141_preexec_hook = None
142
143# Get PTY number; asm-generic/ioctls.h
144LINUX_TIOCGPTN = _ioctl_cast(2147767344)
145
146# Lock/unlock PTY; asm-generic/ioctls.h
147LINUX_TIOCSPTLCK = _ioctl_cast(1074025521)
148
149IS_LINUX = os.uname()[0] == 'Linux'
150
151SIGNAL_BY_NUM = dict(
152    (getattr(signal, name), name)
153    for name in sorted(vars(signal), reverse=True)
154    if name.startswith('SIG') and not name.startswith('SIG_')
155)
156
157_core_source_lock = threading.Lock()
158_core_source_partial = None
159
160
161def get_log_level():
162    return (LOG.getEffectiveLevel() or logging.INFO)
163
164
165def get_sys_executable():
166    """
167    Return :data:`sys.executable` if it is set, otherwise return
168    ``"/usr/bin/python"`` and log a warning.
169    """
170    if sys.executable:
171        return sys.executable
172
173    global _sys_executable_warning_logged
174    if not _sys_executable_warning_logged:
175        LOG.warn(SYS_EXECUTABLE_MSG)
176        _sys_executable_warning_logged = True
177
178    return '/usr/bin/python'
179
180
181def _get_core_source():
182    """
183    In non-masters, simply fetch the cached mitogen.core source code via the
184    import mechanism. In masters, this function is replaced with a version that
185    performs minification directly.
186    """
187    return inspect.getsource(mitogen.core)
188
189
190def get_core_source_partial():
191    """
192    _get_core_source() is expensive, even with @lru_cache in minify.py, threads
193    can enter it simultaneously causing severe slowdowns.
194    """
195    global _core_source_partial
196
197    if _core_source_partial is None:
198        _core_source_lock.acquire()
199        try:
200            if _core_source_partial is None:
201                _core_source_partial = PartialZlib(
202                    _get_core_source().encode('utf-8')
203                )
204        finally:
205            _core_source_lock.release()
206
207    return _core_source_partial
208
209
210def get_default_remote_name():
211    """
212    Return the default name appearing in argv[0] of remote machines.
213    """
214    s = u'%s@%s:%d'
215    s %= (getpass.getuser(), socket.gethostname(), os.getpid())
216    # In mixed UNIX/Windows environments, the username may contain slashes.
217    return s.translate({
218        ord(u'\\'): ord(u'_'),
219        ord(u'/'): ord(u'_')
220    })
221
222
223def is_immediate_child(msg, stream):
224    """
225    Handler policy that requires messages to arrive only from immediately
226    connected children.
227    """
228    return msg.src_id == stream.protocol.remote_id
229
230
231def flags(names):
232    """
233    Return the result of ORing a set of (space separated) :py:mod:`termios`
234    module constants together.
235    """
236    return sum(getattr(termios, name, 0)
237               for name in names.split())
238
239
240def cfmakeraw(tflags):
241    """
242    Given a list returned by :py:func:`termios.tcgetattr`, return a list
243    modified in a manner similar to the `cfmakeraw()` C library function, but
244    additionally disabling local echo.
245    """
246    # BSD: github.com/freebsd/freebsd/blob/master/lib/libc/gen/termios.c#L162
247    # Linux: github.com/lattera/glibc/blob/master/termios/cfmakeraw.c#L20
248    iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags
249    iflag &= ~flags('IMAXBEL IXOFF INPCK BRKINT PARMRK '
250                    'ISTRIP INLCR ICRNL IXON IGNPAR')
251    iflag &= ~flags('IGNBRK BRKINT PARMRK')
252    oflag &= ~flags('OPOST')
253    lflag &= ~flags('ECHO ECHOE ECHOK ECHONL ICANON ISIG '
254                    'IEXTEN NOFLSH TOSTOP PENDIN')
255    cflag &= ~flags('CSIZE PARENB')
256    cflag |= flags('CS8 CREAD')
257    return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
258
259
260def disable_echo(fd):
261    old = termios.tcgetattr(fd)
262    new = cfmakeraw(old)
263    flags = getattr(termios, 'TCSASOFT', 0)
264    if not mitogen.core.IS_WSL:
265        # issue #319: Windows Subsystem for Linux as of July 2018 throws EINVAL
266        # if TCSAFLUSH is specified.
267        flags |= termios.TCSAFLUSH
268    termios.tcsetattr(fd, flags, new)
269
270
271def create_socketpair(size=None):
272    """
273    Create a :func:`socket.socketpair` for use as a child's UNIX stdio
274    channels. As socketpairs are bidirectional, they are economical on file
275    descriptor usage as one descriptor can be used for ``stdin`` and
276    ``stdout``. As they are sockets their buffers are tunable, allowing large
277    buffers to improve file transfer throughput and reduce IO loop iterations.
278    """
279    if size is None:
280        size = mitogen.core.CHUNK_SIZE
281
282    parentfp, childfp = socket.socketpair()
283    for fp in parentfp, childfp:
284        fp.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
285
286    return parentfp, childfp
287
288
289def create_best_pipe(escalates_privilege=False):
290    """
291    By default we prefer to communicate with children over a UNIX socket, as a
292    single file descriptor can represent bidirectional communication, and a
293    cross-platform API exists to align buffer sizes with the needs of the
294    library.
295
296    SELinux prevents us setting up a privileged process to inherit an AF_UNIX
297    socket, a facility explicitly designed as a better replacement for pipes,
298    because at some point in the mid 90s it might have been commonly possible
299    for AF_INET sockets to end up undesirably connected to a privileged
300    process, so let's make up arbitrary rules breaking all sockets instead.
301
302    If SELinux is detected, fall back to using pipes.
303
304    :param bool escalates_privilege:
305        If :data:`True`, the target program may escalate privileges, causing
306        SELinux to disconnect AF_UNIX sockets, so avoid those.
307    :returns:
308        `(parent_rfp, child_wfp, child_rfp, parent_wfp)`
309    """
310    if (not escalates_privilege) or (not SELINUX_ENABLED):
311        parentfp, childfp = create_socketpair()
312        return parentfp, childfp, childfp, parentfp
313
314    parent_rfp, child_wfp = mitogen.core.pipe()
315    try:
316        child_rfp, parent_wfp = mitogen.core.pipe()
317        return parent_rfp, child_wfp, child_rfp, parent_wfp
318    except:
319        parent_rfp.close()
320        child_wfp.close()
321        raise
322
323
324def popen(**kwargs):
325    """
326    Wrap :class:`subprocess.Popen` to ensure any global :data:`_preexec_hook`
327    is invoked in the child.
328    """
329    real_preexec_fn = kwargs.pop('preexec_fn', None)
330    def preexec_fn():
331        if _preexec_hook:
332            _preexec_hook()
333        if real_preexec_fn:
334            real_preexec_fn()
335    return subprocess.Popen(preexec_fn=preexec_fn, **kwargs)
336
337
338def create_child(args, merge_stdio=False, stderr_pipe=False,
339                 escalates_privilege=False, preexec_fn=None):
340    """
341    Create a child process whose stdin/stdout is connected to a socket.
342
343    :param list args:
344        Program argument vector.
345    :param bool merge_stdio:
346        If :data:`True`, arrange for `stderr` to be connected to the `stdout`
347        socketpair, rather than inherited from the parent process. This may be
348        necessary to ensure that no TTY is connected to any stdio handle, for
349        instance when using LXC.
350    :param bool stderr_pipe:
351        If :data:`True` and `merge_stdio` is :data:`False`, arrange for
352        `stderr` to be connected to a separate pipe, to allow any ongoing debug
353        logs generated by e.g. SSH to be output as the session progresses,
354        without interfering with `stdout`.
355    :param bool escalates_privilege:
356        If :data:`True`, the target program may escalate privileges, causing
357        SELinux to disconnect AF_UNIX sockets, so avoid those.
358    :param function preexec_fn:
359        If not :data:`None`, a function to run within the post-fork child
360        before executing the target program.
361    :returns:
362        :class:`Process` instance.
363    """
364    parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe(
365        escalates_privilege=escalates_privilege
366    )
367
368    stderr = None
369    stderr_r = None
370    if merge_stdio:
371        stderr = child_wfp
372    elif stderr_pipe:
373        stderr_r, stderr = mitogen.core.pipe()
374        mitogen.core.set_cloexec(stderr_r.fileno())
375
376    try:
377        proc = popen(
378            args=args,
379            stdin=child_rfp,
380            stdout=child_wfp,
381            stderr=stderr,
382            close_fds=True,
383            preexec_fn=preexec_fn,
384        )
385    except:
386        child_rfp.close()
387        child_wfp.close()
388        parent_rfp.close()
389        parent_wfp.close()
390        if stderr_pipe:
391            stderr.close()
392            stderr_r.close()
393        raise
394
395    child_rfp.close()
396    child_wfp.close()
397    if stderr_pipe:
398        stderr.close()
399
400    return PopenProcess(
401        proc=proc,
402        stdin=parent_wfp,
403        stdout=parent_rfp,
404        stderr=stderr_r,
405    )
406
407
408def _acquire_controlling_tty():
409    os.setsid()
410    if sys.platform in ('linux', 'linux2'):
411        # On Linux, the controlling tty becomes the first tty opened by a
412        # process lacking any prior tty.
413        os.close(os.open(os.ttyname(2), os.O_RDWR))
414    if hasattr(termios, 'TIOCSCTTY') and not mitogen.core.IS_WSL:
415        # #550: prehistoric WSL does not like TIOCSCTTY.
416        # On BSD an explicit ioctl is required. For some inexplicable reason,
417        # Python 2.6 on Travis also requires it.
418        fcntl.ioctl(2, termios.TIOCSCTTY)
419
420
421def _linux_broken_devpts_openpty():
422    """
423    #462: On broken Linux hosts with mismatched configuration (e.g. old
424    /etc/fstab template installed), /dev/pts may be mounted without the gid=
425    mount option, causing new slave devices to be created with the group ID of
426    the calling process. This upsets glibc, whose openpty() is required by
427    specification to produce a slave owned by a special group ID (which is
428    always the 'tty' group).
429
430    Glibc attempts to use "pt_chown" to fix ownership. If that fails, it
431    chown()s the PTY directly, which fails due to non-root, causing openpty()
432    to fail with EPERM ("Operation not permitted"). Since we don't need the
433    magical TTY group to run sudo and su, open the PTY ourselves in this case.
434    """
435    master_fd = None
436    try:
437        # Opening /dev/ptmx causes a PTY pair to be allocated, and the
438        # corresponding slave /dev/pts/* device to be created, owned by UID/GID
439        # matching this process.
440        master_fd = os.open('/dev/ptmx', os.O_RDWR)
441        # Clear the lock bit from the PTY. This a prehistoric feature from a
442        # time when slave device files were persistent.
443        fcntl.ioctl(master_fd, LINUX_TIOCSPTLCK, struct.pack('i', 0))
444        # Since v4.13 TIOCGPTPEER exists to open the slave in one step, but we
445        # must support older kernels. Ask for the PTY number.
446        pty_num_s = fcntl.ioctl(master_fd, LINUX_TIOCGPTN,
447                                struct.pack('i', 0))
448        pty_num, = struct.unpack('i', pty_num_s)
449        pty_name = '/dev/pts/%d' % (pty_num,)
450        # Now open it with O_NOCTTY to ensure it doesn't change our controlling
451        # TTY. Otherwise when we close the FD we get killed by the kernel, and
452        # the child we spawn that should really attach to it will get EPERM
453        # during _acquire_controlling_tty().
454        slave_fd = os.open(pty_name, os.O_RDWR|os.O_NOCTTY)
455        return master_fd, slave_fd
456    except OSError:
457        if master_fd is not None:
458            os.close(master_fd)
459        e = sys.exc_info()[1]
460        raise mitogen.core.StreamError(OPENPTY_MSG, e)
461
462
463def openpty():
464    """
465    Call :func:`os.openpty`, raising a descriptive error if the call fails.
466
467    :raises mitogen.core.StreamError:
468        Creating a PTY failed.
469    :returns:
470        `(master_fp, slave_fp)` file-like objects.
471    """
472    try:
473        master_fd, slave_fd = os.openpty()
474    except OSError:
475        e = sys.exc_info()[1]
476        if not (IS_LINUX and e.args[0] == errno.EPERM):
477            raise mitogen.core.StreamError(OPENPTY_MSG, e)
478        master_fd, slave_fd = _linux_broken_devpts_openpty()
479
480    master_fp = os.fdopen(master_fd, 'r+b', 0)
481    slave_fp = os.fdopen(slave_fd, 'r+b', 0)
482    disable_echo(master_fd)
483    disable_echo(slave_fd)
484    mitogen.core.set_block(slave_fd)
485    return master_fp, slave_fp
486
487
488def tty_create_child(args):
489    """
490    Return a file descriptor connected to the master end of a pseudo-terminal,
491    whose slave end is connected to stdin/stdout/stderr of a new child process.
492    The child is created such that the pseudo-terminal becomes its controlling
493    TTY, ensuring access to /dev/tty returns a new file descriptor open on the
494    slave end.
495
496    :param list args:
497        Program argument vector.
498    :returns:
499        :class:`Process` instance.
500    """
501    master_fp, slave_fp = openpty()
502    try:
503        proc = popen(
504            args=args,
505            stdin=slave_fp,
506            stdout=slave_fp,
507            stderr=slave_fp,
508            preexec_fn=_acquire_controlling_tty,
509            close_fds=True,
510        )
511    except:
512        master_fp.close()
513        slave_fp.close()
514        raise
515
516    slave_fp.close()
517    return PopenProcess(
518        proc=proc,
519        stdin=master_fp,
520        stdout=master_fp,
521    )
522
523
524def hybrid_tty_create_child(args, escalates_privilege=False):
525    """
526    Like :func:`tty_create_child`, except attach stdin/stdout to a socketpair
527    like :func:`create_child`, but leave stderr and the controlling TTY
528    attached to a TTY.
529
530    This permits high throughput communication with programs that are reached
531    via some program that requires a TTY for password input, like many
532    configurations of sudo. The UNIX TTY layer tends to have tiny (no more than
533    14KiB) buffers, forcing many IO loop iterations when transferring bulk
534    data, causing significant performance loss.
535
536    :param bool escalates_privilege:
537        If :data:`True`, the target program may escalate privileges, causing
538        SELinux to disconnect AF_UNIX sockets, so avoid those.
539    :param list args:
540        Program argument vector.
541    :returns:
542        :class:`Process` instance.
543    """
544    master_fp, slave_fp = openpty()
545    try:
546        parent_rfp, child_wfp, child_rfp, parent_wfp = create_best_pipe(
547            escalates_privilege=escalates_privilege,
548        )
549        try:
550            mitogen.core.set_block(child_rfp)
551            mitogen.core.set_block(child_wfp)
552            proc = popen(
553                args=args,
554                stdin=child_rfp,
555                stdout=child_wfp,
556                stderr=slave_fp,
557                preexec_fn=_acquire_controlling_tty,
558                close_fds=True,
559            )
560        except:
561            parent_rfp.close()
562            child_wfp.close()
563            parent_wfp.close()
564            child_rfp.close()
565            raise
566    except:
567        master_fp.close()
568        slave_fp.close()
569        raise
570
571    slave_fp.close()
572    child_rfp.close()
573    child_wfp.close()
574    return PopenProcess(
575        proc=proc,
576        stdin=parent_wfp,
577        stdout=parent_rfp,
578        stderr=master_fp,
579    )
580
581
582class Timer(object):
583    """
584    Represents a future event.
585    """
586    #: Set to :data:`False` if :meth:`cancel` has been called, or immediately
587    #: prior to being executed by :meth:`TimerList.expire`.
588    active = True
589
590    def __init__(self, when, func):
591        self.when = when
592        self.func = func
593
594    def __repr__(self):
595        return 'Timer(%r, %r)' % (self.when, self.func)
596
597    def __eq__(self, other):
598        return self.when == other.when
599
600    def __lt__(self, other):
601        return self.when < other.when
602
603    def __le__(self, other):
604        return self.when <= other.when
605
606    def cancel(self):
607        """
608        Cancel this event. If it has not yet executed, it will not execute
609        during any subsequent :meth:`TimerList.expire` call.
610        """
611        self.active = False
612
613
614class TimerList(object):
615    """
616    Efficiently manage a list of cancellable future events relative to wall
617    clock time. An instance of this class is installed as
618    :attr:`mitogen.master.Broker.timers` by default, and as
619    :attr:`mitogen.core.Broker.timers` in children after a call to
620    :func:`mitogen.parent.upgrade_router`.
621
622    You can use :class:`TimerList` to cause the broker to wake at arbitrary
623    future moments, useful for implementing timeouts and polling in an
624    asynchronous context.
625
626    :class:`TimerList` methods can only be called from asynchronous context,
627    for example via :meth:`mitogen.core.Broker.defer`.
628
629    The broker automatically adjusts its sleep delay according to the installed
630    timer list, and arranges for timers to expire via automatic calls to
631    :meth:`expire`. The main user interface to :class:`TimerList` is
632    :meth:`schedule`.
633    """
634    _now = mitogen.core.now
635
636    def __init__(self):
637        self._lst = []
638
639    def get_timeout(self):
640        """
641        Return the floating point seconds until the next event is due.
642
643        :returns:
644            Floating point delay, or 0.0, or :data:`None` if no events are
645            scheduled.
646        """
647        while self._lst and not self._lst[0].active:
648            heapq.heappop(self._lst)
649        if self._lst:
650            return max(0, self._lst[0].when - self._now())
651
652    def schedule(self, when, func):
653        """
654        Schedule a future event.
655
656        :param float when:
657            UNIX time in seconds when event should occur.
658        :param callable func:
659            Callable to invoke on expiry.
660        :returns:
661            A :class:`Timer` instance, exposing :meth:`Timer.cancel`, which may
662            be used to cancel the future invocation.
663        """
664        timer = Timer(when, func)
665        heapq.heappush(self._lst, timer)
666        return timer
667
668    def expire(self):
669        """
670        Invoke callbacks for any events in the past.
671        """
672        now = self._now()
673        while self._lst and self._lst[0].when <= now:
674            timer = heapq.heappop(self._lst)
675            if timer.active:
676                timer.active = False
677                timer.func()
678
679
680class PartialZlib(object):
681    """
682    Because the mitogen.core source has a line appended to it during bootstrap,
683    it must be recompressed for each connection. This is not a problem for a
684    small number of connections, but it amounts to 30 seconds CPU time by the
685    time 500 targets are in use.
686
687    For that reason, build a compressor containing mitogen.core and flush as
688    much of it as possible into an initial buffer. Then to append the custom
689    line, clone the compressor and compress just that line.
690
691    A full compression costs ~6ms on a modern machine, this method costs ~35
692    usec.
693    """
694    def __init__(self, s):
695        self.s = s
696        if sys.version_info > (2, 5):
697            self._compressor = zlib.compressobj(9)
698            self._out = self._compressor.compress(s)
699            self._out += self._compressor.flush(zlib.Z_SYNC_FLUSH)
700        else:
701            self._compressor = None
702
703    def append(self, s):
704        """
705        Append the bytestring `s` to the compressor state and return the
706        final compressed output.
707        """
708        if self._compressor is None:
709            return zlib.compress(self.s + s, 9)
710        else:
711            compressor = self._compressor.copy()
712            out = self._out
713            out += compressor.compress(s)
714            return out + compressor.flush()
715
716
717def _upgrade_broker(broker):
718    """
719    Extract the poller state from Broker and replace it with the industrial
720    strength poller for this OS. Must run on the Broker thread.
721    """
722    # This function is deadly! The act of calling start_receive() generates log
723    # messages which must be silenced as the upgrade progresses, otherwise the
724    # poller state will change as it is copied, resulting in write fds that are
725    # lost. (Due to LogHandler->Router->Stream->Protocol->Broker->Poller, where
726    # Stream only calls start_transmit() when transitioning from empty to
727    # non-empty buffer. If the start_transmit() is lost, writes from the child
728    # hang permanently).
729    root = logging.getLogger()
730    old_level = root.level
731    root.setLevel(logging.CRITICAL)
732    try:
733        old = broker.poller
734        new = PREFERRED_POLLER()
735        for fd, data in old.readers:
736            new.start_receive(fd, data)
737        for fd, data in old.writers:
738            new.start_transmit(fd, data)
739
740        old.close()
741        broker.poller = new
742    finally:
743        root.setLevel(old_level)
744
745    broker.timers = TimerList()
746    LOG.debug('upgraded %r with %r (new: %d readers, %d writers; '
747              'old: %d readers, %d writers)', old, new,
748              len(new.readers), len(new.writers),
749              len(old.readers), len(old.writers))
750
751
752@mitogen.core.takes_econtext
753def upgrade_router(econtext):
754    if not isinstance(econtext.router, Router):  # TODO
755        econtext.broker.defer(_upgrade_broker, econtext.broker)
756        econtext.router.__class__ = Router  # TODO
757        econtext.router.upgrade(
758            importer=econtext.importer,
759            parent=econtext.parent,
760        )
761
762
763def get_connection_class(name):
764    """
765    Given the name of a Mitogen connection method, import its implementation
766    module and return its Stream subclass.
767    """
768    if name == u'local':
769        name = u'parent'
770    module = mitogen.core.import_module(u'mitogen.' + name)
771    return module.Connection
772
773
774@mitogen.core.takes_econtext
775def _proxy_connect(name, method_name, kwargs, econtext):
776    """
777    Implements the target portion of Router._proxy_connect() by upgrading the
778    local process to a parent if it was not already, then calling back into
779    Router._connect() using the arguments passed to the parent's
780    Router.connect().
781
782    :returns:
783        Dict containing:
784        * ``id``: :data:`None`, or integer new context ID.
785        * ``name``: :data:`None`, or string name attribute of new Context.
786        * ``msg``: :data:`None`, or StreamError exception text.
787    """
788    upgrade_router(econtext)
789
790    try:
791        context = econtext.router._connect(
792            klass=get_connection_class(method_name),
793            name=name,
794            **kwargs
795        )
796    except mitogen.core.StreamError:
797        return {
798            u'id': None,
799            u'name': None,
800            u'msg': 'error occurred on host %s: %s' % (
801                socket.gethostname(),
802                sys.exc_info()[1],
803            ),
804        }
805
806    return {
807        u'id': context.context_id,
808        u'name': context.name,
809        u'msg': None,
810    }
811
812
813def returncode_to_str(n):
814    """
815    Parse and format a :func:`os.waitpid` exit status.
816    """
817    if n < 0:
818        return 'exited due to signal %d (%s)' % (-n, SIGNAL_BY_NUM.get(-n))
819    return 'exited with return code %d' % (n,)
820
821
822class EofError(mitogen.core.StreamError):
823    """
824    Raised by :class:`Connection` when an empty read is detected from the
825    remote process before bootstrap completes.
826    """
827    # inherits from StreamError to maintain compatibility.
828    pass
829
830
831class CancelledError(mitogen.core.StreamError):
832    """
833    Raised by :class:`Connection` when :meth:`mitogen.core.Broker.shutdown` is
834    called before bootstrap completes.
835    """
836    pass
837
838
839class Argv(object):
840    """
841    Wrapper to defer argv formatting when debug logging is disabled.
842    """
843    def __init__(self, argv):
844        self.argv = argv
845
846    must_escape = frozenset('\\$"`!')
847    must_escape_or_space = must_escape | frozenset(' ')
848
849    def escape(self, x):
850        if not self.must_escape_or_space.intersection(x):
851            return x
852
853        s = '"'
854        for c in x:
855            if c in self.must_escape:
856                s += '\\'
857            s += c
858        s += '"'
859        return s
860
861    def __str__(self):
862        return ' '.join(map(self.escape, self.argv))
863
864
865class CallSpec(object):
866    """
867    Wrapper to defer call argument formatting when debug logging is disabled.
868    """
869    def __init__(self, func, args, kwargs):
870        self.func = func
871        self.args = args
872        self.kwargs = kwargs
873
874    def _get_name(self):
875        bits = [self.func.__module__]
876        if inspect.ismethod(self.func):
877            im_self = getattr(self.func, IM_SELF_ATTR)
878            bits.append(getattr(im_self, '__name__', None) or
879                        getattr(type(im_self), '__name__', None))
880        bits.append(self.func.__name__)
881        return u'.'.join(bits)
882
883    def _get_args(self):
884        return u', '.join(repr(a) for a in self.args)
885
886    def _get_kwargs(self):
887        s = u''
888        if self.kwargs:
889            s = u', '.join('%s=%r' % (k, v) for k, v in self.kwargs.items())
890            if self.args:
891                s = u', ' + s
892        return s
893
894    def __repr__(self):
895        return '%s(%s%s)' % (
896            self._get_name(),
897            self._get_args(),
898            self._get_kwargs(),
899        )
900
901
902class PollPoller(mitogen.core.Poller):
903    """
904    Poller based on the POSIX :linux:man2:`poll` interface. Not available on
905    some versions of OS X, otherwise it is the preferred poller for small FD
906    counts, as there is no setup/teardown/configuration system call overhead.
907    """
908    SUPPORTED = hasattr(select, 'poll')
909    _repr = 'PollPoller()'
910
911    def __init__(self):
912        super(PollPoller, self).__init__()
913        self._pollobj = select.poll()
914
915    # TODO: no proof we dont need writemask too
916    _readmask = (
917        getattr(select, 'POLLIN', 0) |
918        getattr(select, 'POLLHUP', 0)
919    )
920
921    def _update(self, fd):
922        mask = (((fd in self._rfds) and self._readmask) |
923                ((fd in self._wfds) and select.POLLOUT))
924        if mask:
925            self._pollobj.register(fd, mask)
926        else:
927            try:
928                self._pollobj.unregister(fd)
929            except KeyError:
930                pass
931
932    def _poll(self, timeout):
933        if timeout:
934            timeout *= 1000
935
936        events, _ = mitogen.core.io_op(self._pollobj.poll, timeout)
937        for fd, event in events:
938            if event & self._readmask:
939                IOLOG.debug('%r: POLLIN|POLLHUP for %r', self, fd)
940                data, gen = self._rfds.get(fd, (None, None))
941                if gen and gen < self._generation:
942                    yield data
943            if event & select.POLLOUT:
944                IOLOG.debug('%r: POLLOUT for %r', self, fd)
945                data, gen = self._wfds.get(fd, (None, None))
946                if gen and gen < self._generation:
947                    yield data
948
949
950class KqueuePoller(mitogen.core.Poller):
951    """
952    Poller based on the FreeBSD/Darwin :freebsd:man2:`kqueue` interface.
953    """
954    SUPPORTED = hasattr(select, 'kqueue')
955    _repr = 'KqueuePoller()'
956
957    def __init__(self):
958        super(KqueuePoller, self).__init__()
959        self._kqueue = select.kqueue()
960        self._changelist = []
961
962    def close(self):
963        super(KqueuePoller, self).close()
964        self._kqueue.close()
965
966    def _control(self, fd, filters, flags):
967        mitogen.core._vv and IOLOG.debug(
968            '%r._control(%r, %r, %r)', self, fd, filters, flags)
969        # TODO: at shutdown it is currently possible for KQ_EV_ADD/KQ_EV_DEL
970        # pairs to be pending after the associated file descriptor has already
971        # been closed. Fixing this requires maintaining extra state, or perhaps
972        # making fd closure the poller's responsibility. In the meantime,
973        # simply apply changes immediately.
974        # self._changelist.append(select.kevent(fd, filters, flags))
975        changelist = [select.kevent(fd, filters, flags)]
976        events, _ = mitogen.core.io_op(self._kqueue.control, changelist, 0, 0)
977        assert not events
978
979    def start_receive(self, fd, data=None):
980        mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
981            self, fd, data)
982        if fd not in self._rfds:
983            self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
984        self._rfds[fd] = (data or fd, self._generation)
985
986    def stop_receive(self, fd):
987        mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
988        if fd in self._rfds:
989            self._control(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
990            del self._rfds[fd]
991
992    def start_transmit(self, fd, data=None):
993        mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
994            self, fd, data)
995        if fd not in self._wfds:
996            self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
997        self._wfds[fd] = (data or fd, self._generation)
998
999    def stop_transmit(self, fd):
1000        mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
1001        if fd in self._wfds:
1002            self._control(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
1003            del self._wfds[fd]
1004
1005    def _poll(self, timeout):
1006        changelist = self._changelist
1007        self._changelist = []
1008        events, _ = mitogen.core.io_op(self._kqueue.control,
1009            changelist, 32, timeout)
1010        for event in events:
1011            fd = event.ident
1012            if event.flags & select.KQ_EV_ERROR:
1013                LOG.debug('ignoring stale event for fd %r: errno=%d: %s',
1014                          fd, event.data, errno.errorcode.get(event.data))
1015            elif event.filter == select.KQ_FILTER_READ:
1016                data, gen = self._rfds.get(fd, (None, None))
1017                # Events can still be read for an already-discarded fd.
1018                if gen and gen < self._generation:
1019                    mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
1020                    yield data
1021            elif event.filter == select.KQ_FILTER_WRITE and fd in self._wfds:
1022                data, gen = self._wfds.get(fd, (None, None))
1023                if gen and gen < self._generation:
1024                    mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
1025                    yield data
1026
1027
1028class EpollPoller(mitogen.core.Poller):
1029    """
1030    Poller based on the Linux :linux:man2:`epoll` interface.
1031    """
1032    SUPPORTED = hasattr(select, 'epoll')
1033    _repr = 'EpollPoller()'
1034
1035    def __init__(self):
1036        super(EpollPoller, self).__init__()
1037        self._epoll = select.epoll(32)
1038        self._registered_fds = set()
1039
1040    def close(self):
1041        super(EpollPoller, self).close()
1042        self._epoll.close()
1043
1044    def _control(self, fd):
1045        mitogen.core._vv and IOLOG.debug('%r._control(%r)', self, fd)
1046        mask = (((fd in self._rfds) and select.EPOLLIN) |
1047                ((fd in self._wfds) and select.EPOLLOUT))
1048        if mask:
1049            if fd in self._registered_fds:
1050                self._epoll.modify(fd, mask)
1051            else:
1052                self._epoll.register(fd, mask)
1053                self._registered_fds.add(fd)
1054        elif fd in self._registered_fds:
1055            self._epoll.unregister(fd)
1056            self._registered_fds.remove(fd)
1057
1058    def start_receive(self, fd, data=None):
1059        mitogen.core._vv and IOLOG.debug('%r.start_receive(%r, %r)',
1060            self, fd, data)
1061        self._rfds[fd] = (data or fd, self._generation)
1062        self._control(fd)
1063
1064    def stop_receive(self, fd):
1065        mitogen.core._vv and IOLOG.debug('%r.stop_receive(%r)', self, fd)
1066        self._rfds.pop(fd, None)
1067        self._control(fd)
1068
1069    def start_transmit(self, fd, data=None):
1070        mitogen.core._vv and IOLOG.debug('%r.start_transmit(%r, %r)',
1071            self, fd, data)
1072        self._wfds[fd] = (data or fd, self._generation)
1073        self._control(fd)
1074
1075    def stop_transmit(self, fd):
1076        mitogen.core._vv and IOLOG.debug('%r.stop_transmit(%r)', self, fd)
1077        self._wfds.pop(fd, None)
1078        self._control(fd)
1079
1080    _inmask = (getattr(select, 'EPOLLIN', 0) |
1081               getattr(select, 'EPOLLHUP', 0))
1082
1083    def _poll(self, timeout):
1084        the_timeout = -1
1085        if timeout is not None:
1086            the_timeout = timeout
1087
1088        events, _ = mitogen.core.io_op(self._epoll.poll, the_timeout, 32)
1089        for fd, event in events:
1090            if event & self._inmask:
1091                data, gen = self._rfds.get(fd, (None, None))
1092                if gen and gen < self._generation:
1093                    # Events can still be read for an already-discarded fd.
1094                    mitogen.core._vv and IOLOG.debug('%r: POLLIN: %r', self, fd)
1095                    yield data
1096            if event & select.EPOLLOUT:
1097                data, gen = self._wfds.get(fd, (None, None))
1098                if gen and gen < self._generation:
1099                    mitogen.core._vv and IOLOG.debug('%r: POLLOUT: %r', self, fd)
1100                    yield data
1101
1102
1103# 2.4 and 2.5 only had select.select() and select.poll().
1104for _klass in mitogen.core.Poller, PollPoller, KqueuePoller, EpollPoller:
1105    if _klass.SUPPORTED:
1106        PREFERRED_POLLER = _klass
1107
1108# For processes that start many threads or connections, it's possible Latch
1109# will also get high-numbered FDs, and so select() becomes useless there too.
1110# So swap in our favourite poller.
1111if PollPoller.SUPPORTED:
1112    mitogen.core.Latch.poller_class = PollPoller
1113else:
1114    mitogen.core.Latch.poller_class = PREFERRED_POLLER
1115
1116
1117class LineLoggingProtocolMixin(object):
1118    def __init__(self, **kwargs):
1119        super(LineLoggingProtocolMixin, self).__init__(**kwargs)
1120        self.logged_lines = []
1121        self.logged_partial = None
1122
1123    def on_line_received(self, line):
1124        self.logged_partial = None
1125        self.logged_lines.append((mitogen.core.now(), line))
1126        self.logged_lines[:] = self.logged_lines[-100:]
1127        return super(LineLoggingProtocolMixin, self).on_line_received(line)
1128
1129    def on_partial_line_received(self, line):
1130        self.logged_partial = line
1131        return super(LineLoggingProtocolMixin, self).on_partial_line_received(line)
1132
1133    def on_disconnect(self, broker):
1134        if self.logged_partial:
1135            self.logged_lines.append((mitogen.core.now(), self.logged_partial))
1136            self.logged_partial = None
1137        super(LineLoggingProtocolMixin, self).on_disconnect(broker)
1138
1139
1140def get_history(streams):
1141    history = []
1142    for stream in streams:
1143        if stream:
1144            history.extend(getattr(stream.protocol, 'logged_lines', []))
1145    history.sort()
1146
1147    s = b('\n').join(h[1] for h in history)
1148    return mitogen.core.to_text(s)
1149
1150
1151class RegexProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
1152    """
1153    Implement a delimited protocol where messages matching a set of regular
1154    expressions are dispatched to individual handler methods. Input is
1155    dispatches using :attr:`PATTERNS` and :attr:`PARTIAL_PATTERNS`, before
1156    falling back to :meth:`on_unrecognized_line_received` and
1157    :meth:`on_unrecognized_partial_line_received`.
1158    """
1159    #: A sequence of 2-tuples of the form `(compiled pattern, method)` for
1160    #: patterns that should be matched against complete (delimited) messages,
1161    #: i.e. full lines.
1162    PATTERNS = []
1163
1164    #: Like :attr:`PATTERNS`, but patterns that are matched against incomplete
1165    #: lines.
1166    PARTIAL_PATTERNS = []
1167
1168    def on_line_received(self, line):
1169        super(RegexProtocol, self).on_line_received(line)
1170        for pattern, func in self.PATTERNS:
1171            match = pattern.search(line)
1172            if match is not None:
1173                return func(self, line, match)
1174
1175        return self.on_unrecognized_line_received(line)
1176
1177    def on_unrecognized_line_received(self, line):
1178        LOG.debug('%s: (unrecognized): %s',
1179            self.stream.name, line.decode('utf-8', 'replace'))
1180
1181    def on_partial_line_received(self, line):
1182        super(RegexProtocol, self).on_partial_line_received(line)
1183        LOG.debug('%s: (partial): %s',
1184            self.stream.name, line.decode('utf-8', 'replace'))
1185        for pattern, func in self.PARTIAL_PATTERNS:
1186            match = pattern.search(line)
1187            if match is not None:
1188                return func(self, line, match)
1189
1190        return self.on_unrecognized_partial_line_received(line)
1191
1192    def on_unrecognized_partial_line_received(self, line):
1193        LOG.debug('%s: (unrecognized partial): %s',
1194            self.stream.name, line.decode('utf-8', 'replace'))
1195
1196
1197class BootstrapProtocol(RegexProtocol):
1198    """
1199    Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER`
1200    to be written by the first stage to indicate it can receive the bootstrap,
1201    then await :attr:`EC1_MARKER` to indicate success, and
1202    :class:`MitogenProtocol` can be enabled.
1203    """
1204    #: Sentinel value emitted by the first stage to indicate it is ready to
1205    #: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
1206    #: length of at least `max(len('password'), len('debug1:'))`
1207    EC0_MARKER = b('MITO000')
1208    EC1_MARKER = b('MITO001')
1209    EC2_MARKER = b('MITO002')
1210
1211    def __init__(self, broker):
1212        super(BootstrapProtocol, self).__init__()
1213        self._writer = mitogen.core.BufferedWriter(broker, self)
1214
1215    def on_transmit(self, broker):
1216        self._writer.on_transmit(broker)
1217
1218    def _on_ec0_received(self, line, match):
1219        LOG.debug('%r: first stage started succcessfully', self)
1220        self._writer.write(self.stream.conn.get_preamble())
1221
1222    def _on_ec1_received(self, line, match):
1223        LOG.debug('%r: first stage received mitogen.core source', self)
1224
1225    def _on_ec2_received(self, line, match):
1226        LOG.debug('%r: new child booted successfully', self)
1227        self.stream.conn._complete_connection()
1228        return False
1229
1230    def on_unrecognized_line_received(self, line):
1231        LOG.debug('%s: stdout: %s', self.stream.name,
1232            line.decode('utf-8', 'replace'))
1233
1234    PATTERNS = [
1235        (re.compile(EC0_MARKER), _on_ec0_received),
1236        (re.compile(EC1_MARKER), _on_ec1_received),
1237        (re.compile(EC2_MARKER), _on_ec2_received),
1238    ]
1239
1240
1241class LogProtocol(LineLoggingProtocolMixin, mitogen.core.DelimitedProtocol):
1242    """
1243    For "hybrid TTY/socketpair" mode, after connection setup a spare TTY master
1244    FD exists that cannot be closed, and to which SSH or sudo may continue
1245    writing log messages.
1246
1247    The descriptor cannot be closed since the UNIX TTY layer sends SIGHUP to
1248    processes whose controlling TTY is the slave whose master side was closed.
1249    LogProtocol takes over this FD and creates log messages for anything
1250    written to it.
1251    """
1252    def on_line_received(self, line):
1253        """
1254        Read a line, decode it as UTF-8, and log it.
1255        """
1256        super(LogProtocol, self).on_line_received(line)
1257        LOG.info(u'%s: %s', self.stream.name, line.decode('utf-8', 'replace'))
1258
1259
1260class MitogenProtocol(mitogen.core.MitogenProtocol):
1261    """
1262    Extend core.MitogenProtocol to cause SHUTDOWN to be sent to the child
1263    during graceful shutdown.
1264    """
1265    def on_shutdown(self, broker):
1266        """
1267        Respond to the broker's request for the stream to shut down by sending
1268        SHUTDOWN to the child.
1269        """
1270        LOG.debug('%r: requesting child shutdown', self)
1271        self._send(
1272            mitogen.core.Message(
1273                src_id=mitogen.context_id,
1274                dst_id=self.remote_id,
1275                handle=mitogen.core.SHUTDOWN,
1276            )
1277        )
1278
1279
1280class Options(object):
1281    name = None
1282
1283    #: The path to the remote Python interpreter.
1284    python_path = get_sys_executable()
1285
1286    #: Maximum time to wait for a connection attempt.
1287    connect_timeout = 30.0
1288
1289    #: True to cause context to write verbose /tmp/mitogen.<pid>.log.
1290    debug = False
1291
1292    #: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
1293    profiling = False
1294
1295    #: True if unidirectional routing is enabled in the new child.
1296    unidirectional = False
1297
1298    #: Passed via Router wrapper methods, must eventually be passed to
1299    #: ExternalContext.main().
1300    max_message_size = None
1301
1302    #: Remote name.
1303    remote_name = None
1304
1305    #: Derived from :py:attr:`connect_timeout`; absolute floating point
1306    #: UNIX timestamp after which the connection attempt should be abandoned.
1307    connect_deadline = None
1308
1309    def __init__(self, max_message_size, name=None, remote_name=None,
1310                 python_path=None, debug=False, connect_timeout=None,
1311                 profiling=False, unidirectional=False, old_router=None):
1312        self.name = name
1313        self.max_message_size = max_message_size
1314        if python_path:
1315            self.python_path = python_path
1316        if connect_timeout:
1317            self.connect_timeout = connect_timeout
1318        if remote_name is None:
1319            remote_name = get_default_remote_name()
1320        if '/' in remote_name or '\\' in remote_name:
1321            raise ValueError('remote_name= cannot contain slashes')
1322        if remote_name:
1323            self.remote_name = mitogen.core.to_text(remote_name)
1324        self.debug = debug
1325        self.profiling = profiling
1326        self.unidirectional = unidirectional
1327        self.max_message_size = max_message_size
1328        self.connect_deadline = mitogen.core.now() + self.connect_timeout
1329
1330
1331class Connection(object):
1332    """
1333    Manage the lifetime of a set of :class:`Streams <Stream>` connecting to a
1334    remote Python interpreter, including bootstrap, disconnection, and external
1335    tool integration.
1336
1337    Base for streams capable of starting children.
1338    """
1339    options_class = Options
1340
1341    #: The protocol attached to stdio of the child.
1342    stream_protocol_class = BootstrapProtocol
1343
1344    #: The protocol attached to stderr of the child.
1345    diag_protocol_class = LogProtocol
1346
1347    #: :class:`Process`
1348    proc = None
1349
1350    #: :class:`mitogen.core.Stream` with sides connected to stdin/stdout.
1351    stdio_stream = None
1352
1353    #: If `proc.stderr` is set, referencing either a plain pipe or the
1354    #: controlling TTY, this references the corresponding
1355    #: :class:`LogProtocol`'s stream, allowing it to be disconnected when this
1356    #: stream is disconnected.
1357    stderr_stream = None
1358
1359    #: Function with the semantics of :func:`create_child` used to create the
1360    #: child process.
1361    create_child = staticmethod(create_child)
1362
1363    #: Dictionary of extra kwargs passed to :attr:`create_child`.
1364    create_child_args = {}
1365
1366    #: :data:`True` if the remote has indicated that it intends to detach, and
1367    #: should not be killed on disconnect.
1368    detached = False
1369
1370    #: If :data:`True`, indicates the child should not be killed during
1371    #: graceful detachment, as it the actual process implementing the child
1372    #: context. In all other cases, the subprocess is SSH, sudo, or a similar
1373    #: tool that should be reminded to quit during disconnection.
1374    child_is_immediate_subprocess = True
1375
1376    #: Prefix given to default names generated by :meth:`connect`.
1377    name_prefix = u'local'
1378
1379    #: :class:`Timer` that runs :meth:`_on_timer_expired` when connection
1380    #: timeout occurs.
1381    _timer = None
1382
1383    #: When disconnection completes, instance of :class:`Reaper` used to wait
1384    #: on the exit status of the subprocess.
1385    _reaper = None
1386
1387    #: On failure, the exception object that should be propagated back to the
1388    #: user.
1389    exception = None
1390
1391    #: Extra text appended to :class:`EofError` if that exception is raised on
1392    #: a failed connection attempt. May be used in subclasses to hint at common
1393    #: problems with a particular connection method.
1394    eof_error_hint = None
1395
1396    def __init__(self, options, router):
1397        #: :class:`Options`
1398        self.options = options
1399        self._router = router
1400
1401    def __repr__(self):
1402        return 'Connection(%r)' % (self.stdio_stream,)
1403
1404    # Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
1405    # file descriptor 0 as 100, creates a pipe, then execs a new interpreter
1406    # with a custom argv.
1407    #   * Optimized for minimum byte count after minification & compression.
1408    #   * 'CONTEXT_NAME' and 'PREAMBLE_COMPRESSED_LEN' are substituted with
1409    #     their respective values.
1410    #   * CONTEXT_NAME must be prefixed with the name of the Python binary in
1411    #     order to allow virtualenvs to detect their install prefix.
1412    #   * For Darwin, OS X installs a craptacular argv0-introspecting Python
1413    #     version switcher as /usr/bin/python. Override attempts to call it
1414    #     with an explicit call to python2.7
1415    #
1416    # Locals:
1417    #   R: read side of interpreter stdin.
1418    #   W: write side of interpreter stdin.
1419    #   r: read side of core_src FD.
1420    #   w: write side of core_src FD.
1421    #   C: the decompressed core source.
1422
1423    # Final os.close(2) to avoid --py-debug build from corrupting stream with
1424    # "[1234 refs]" during exit.
1425    @staticmethod
1426    def _first_stage():
1427        R,W=os.pipe()
1428        r,w=os.pipe()
1429        if os.fork():
1430            os.dup2(0,100)
1431            os.dup2(R,0)
1432            os.dup2(r,101)
1433            os.close(R)
1434            os.close(r)
1435            os.close(W)
1436            os.close(w)
1437            if sys.platform == 'darwin' and sys.executable == '/usr/bin/python':
1438                sys.executable += sys.version[:3]
1439            os.environ['ARGV0']=sys.executable
1440            os.execl(sys.executable,sys.executable+'(mitogen:CONTEXT_NAME)')
1441        os.write(1,'MITO000\n'.encode())
1442        C=_(os.fdopen(0,'rb').read(PREAMBLE_COMPRESSED_LEN),'zip')
1443        fp=os.fdopen(W,'wb',0)
1444        fp.write(C)
1445        fp.close()
1446        fp=os.fdopen(w,'wb',0)
1447        fp.write(C)
1448        fp.close()
1449        os.write(1,'MITO001\n'.encode())
1450        os.close(2)
1451
1452    def get_python_argv(self):
1453        """
1454        Return the initial argument vector elements necessary to invoke Python,
1455        by returning a 1-element list containing :attr:`python_path` if it is a
1456        string, or simply returning it if it is already a list.
1457
1458        This allows emulation of existing tools where the Python invocation may
1459        be set to e.g. `['/usr/bin/env', 'python']`.
1460        """
1461        if isinstance(self.options.python_path, list):
1462            return self.options.python_path
1463        return [self.options.python_path]
1464
1465    def get_boot_command(self):
1466        source = inspect.getsource(self._first_stage)
1467        source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
1468        source = source.replace('    ', '\t')
1469        source = source.replace('CONTEXT_NAME', self.options.remote_name)
1470        preamble_compressed = self.get_preamble()
1471        source = source.replace('PREAMBLE_COMPRESSED_LEN',
1472                                str(len(preamble_compressed)))
1473        compressed = zlib.compress(source.encode(), 9)
1474        encoded = codecs.encode(compressed, 'base64').replace(b('\n'), b(''))
1475        # We can't use bytes.decode() in 3.x since it was restricted to always
1476        # return unicode, so codecs.decode() is used instead. In 3.x
1477        # codecs.decode() requires a bytes object. Since we must be compatible
1478        # with 2.4 (no bytes literal), an extra .encode() either returns the
1479        # same str (2.x) or an equivalent bytes (3.x).
1480        return self.get_python_argv() + [
1481            '-c',
1482            'import codecs,os,sys;_=codecs.decode;'
1483            'exec(_(_("%s".encode(),"base64"),"zip"))' % (encoded.decode(),)
1484        ]
1485
1486    def get_econtext_config(self):
1487        assert self.options.max_message_size is not None
1488        parent_ids = mitogen.parent_ids[:]
1489        parent_ids.insert(0, mitogen.context_id)
1490        return {
1491            'parent_ids': parent_ids,
1492            'context_id': self.context.context_id,
1493            'debug': self.options.debug,
1494            'profiling': self.options.profiling,
1495            'unidirectional': self.options.unidirectional,
1496            'log_level': get_log_level(),
1497            'whitelist': self._router.get_module_whitelist(),
1498            'blacklist': self._router.get_module_blacklist(),
1499            'max_message_size': self.options.max_message_size,
1500            'version': mitogen.__version__,
1501        }
1502
1503    def get_preamble(self):
1504        suffix = (
1505            '\nExternalContext(%r).main()\n' %\
1506            (self.get_econtext_config(),)
1507        )
1508        partial = get_core_source_partial()
1509        return partial.append(suffix.encode('utf-8'))
1510
1511    def _get_name(self):
1512        """
1513        Called by :meth:`connect` after :attr:`pid` is known. Subclasses can
1514        override it to specify a default stream name, or set
1515        :attr:`name_prefix` to generate a default format.
1516        """
1517        return u'%s.%s' % (self.name_prefix, self.proc.pid)
1518
1519    def start_child(self):
1520        args = self.get_boot_command()
1521        LOG.debug('command line for %r: %s', self, Argv(args))
1522        try:
1523            return self.create_child(args=args, **self.create_child_args)
1524        except OSError:
1525            e = sys.exc_info()[1]
1526            msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args))
1527            raise mitogen.core.StreamError(msg)
1528
1529    def _adorn_eof_error(self, e):
1530        """
1531        Subclasses may provide additional information in the case of a failed
1532        connection.
1533        """
1534        if self.eof_error_hint:
1535            e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)
1536
1537    def _complete_connection(self):
1538        self._timer.cancel()
1539        if not self.exception:
1540            mitogen.core.unlisten(self._router.broker, 'shutdown',
1541                                  self._on_broker_shutdown)
1542            self._router.register(self.context, self.stdio_stream)
1543            self.stdio_stream.set_protocol(
1544                MitogenProtocol(
1545                    router=self._router,
1546                    remote_id=self.context.context_id,
1547                )
1548            )
1549            self._router.route_monitor.notice_stream(self.stdio_stream)
1550        self.latch.put()
1551
1552    def _fail_connection(self, exc):
1553        """
1554        Fail the connection attempt.
1555        """
1556        LOG.debug('failing connection %s due to %r',
1557                  self.stdio_stream and self.stdio_stream.name, exc)
1558        if self.exception is None:
1559            self._adorn_eof_error(exc)
1560            self.exception = exc
1561            mitogen.core.unlisten(self._router.broker, 'shutdown',
1562                                  self._on_broker_shutdown)
1563        for stream in self.stdio_stream, self.stderr_stream:
1564            if stream and not stream.receive_side.closed:
1565                stream.on_disconnect(self._router.broker)
1566        self._complete_connection()
1567
1568    eof_error_msg = 'EOF on stream; last 100 lines received:\n'
1569
1570    def on_stdio_disconnect(self):
1571        """
1572        Handle stdio stream disconnection by failing the Connection if the
1573        stderr stream has already been closed. Otherwise, wait for it to close
1574        (or timeout), to allow buffered diagnostic logs to be consumed.
1575
1576        It is normal that when a subprocess aborts, stdio has nothing buffered
1577        when it is closed, thus signalling readability, causing an empty read
1578        (interpreted as indicating disconnection) on the next loop iteration,
1579        even if its stderr pipe has lots of diagnostic logs still buffered in
1580        the kernel. Therefore we must wait for both pipes to indicate they are
1581        empty before triggering connection failure.
1582        """
1583        stderr = self.stderr_stream
1584        if stderr is None or stderr.receive_side.closed:
1585            self._on_streams_disconnected()
1586
1587    def on_stderr_disconnect(self):
1588        """
1589        Inverse of :func:`on_stdio_disconnect`.
1590        """
1591        if self.stdio_stream.receive_side.closed:
1592            self._on_streams_disconnected()
1593
1594    def _on_streams_disconnected(self):
1595        """
1596        When disconnection has been detected for both streams, cancel the
1597        connection timer, mark the connection failed, and reap the child
1598        process. Do nothing if the timer has already been cancelled, indicating
1599        some existing failure has already been noticed.
1600        """
1601        if self._timer.active:
1602            self._timer.cancel()
1603            self._fail_connection(EofError(
1604                self.eof_error_msg + get_history(
1605                    [self.stdio_stream, self.stderr_stream]
1606                )
1607            ))
1608
1609        if self._reaper:
1610            return
1611
1612        self._reaper = Reaper(
1613            broker=self._router.broker,
1614            proc=self.proc,
1615            kill=not (
1616                (self.detached and self.child_is_immediate_subprocess) or
1617                # Avoid killing so child has chance to write cProfile data
1618                self._router.profiling
1619            ),
1620            # Don't delay shutdown waiting for a detached child, since the
1621            # detached child may expect to live indefinitely after its parent
1622            # exited.
1623            wait_on_shutdown=(not self.detached),
1624        )
1625        self._reaper.reap()
1626
1627    def _on_broker_shutdown(self):
1628        """
1629        Respond to broker.shutdown() being called by failing the connection
1630        attempt.
1631        """
1632        self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG))
1633
1634    def stream_factory(self):
1635        return self.stream_protocol_class.build_stream(
1636            broker=self._router.broker,
1637        )
1638
1639    def stderr_stream_factory(self):
1640        return self.diag_protocol_class.build_stream()
1641
1642    def _setup_stdio_stream(self):
1643        stream = self.stream_factory()
1644        stream.conn = self
1645        stream.name = self.options.name or self._get_name()
1646        stream.accept(self.proc.stdout, self.proc.stdin)
1647
1648        mitogen.core.listen(stream, 'disconnect', self.on_stdio_disconnect)
1649        self._router.broker.start_receive(stream)
1650        return stream
1651
1652    def _setup_stderr_stream(self):
1653        stream = self.stderr_stream_factory()
1654        stream.conn = self
1655        stream.name = self.options.name or self._get_name()
1656        stream.accept(self.proc.stderr, self.proc.stderr)
1657
1658        mitogen.core.listen(stream, 'disconnect', self.on_stderr_disconnect)
1659        self._router.broker.start_receive(stream)
1660        return stream
1661
1662    def _on_timer_expired(self):
1663        self._fail_connection(
1664            mitogen.core.TimeoutError(
1665                'Failed to setup connection after %.2f seconds',
1666                self.options.connect_timeout,
1667            )
1668        )
1669
1670    def _async_connect(self):
1671        LOG.debug('creating connection to context %d using %s',
1672                  self.context.context_id, self.__class__.__module__)
1673        mitogen.core.listen(self._router.broker, 'shutdown',
1674                            self._on_broker_shutdown)
1675        self._timer = self._router.broker.timers.schedule(
1676            when=self.options.connect_deadline,
1677            func=self._on_timer_expired,
1678        )
1679
1680        try:
1681            self.proc = self.start_child()
1682        except Exception:
1683            LOG.debug('failed to start child', exc_info=True)
1684            self._fail_connection(sys.exc_info()[1])
1685            return
1686
1687        LOG.debug('child for %r started: pid:%r stdin:%r stdout:%r stderr:%r',
1688                  self, self.proc.pid,
1689                  self.proc.stdin.fileno(),
1690                  self.proc.stdout.fileno(),
1691                  self.proc.stderr and self.proc.stderr.fileno())
1692
1693        self.stdio_stream = self._setup_stdio_stream()
1694        if self.context.name is None:
1695            self.context.name = self.stdio_stream.name
1696        self.proc.name = self.stdio_stream.name
1697        if self.proc.stderr:
1698            self.stderr_stream = self._setup_stderr_stream()
1699
1700    def connect(self, context):
1701        self.context = context
1702        self.latch = mitogen.core.Latch()
1703        self._router.broker.defer(self._async_connect)
1704        self.latch.get()
1705        if self.exception:
1706            raise self.exception
1707
1708
1709class ChildIdAllocator(object):
1710    """
1711    Allocate new context IDs from a block of unique context IDs allocated by
1712    the master process.
1713    """
1714    def __init__(self, router):
1715        self.router = router
1716        self.lock = threading.Lock()
1717        self.it = iter(xrange(0))
1718
1719    def allocate(self):
1720        """
1721        Allocate an ID, requesting a fresh block from the master if the
1722        existing block is exhausted.
1723
1724        :returns:
1725            The new context ID.
1726
1727        .. warning::
1728
1729            This method is not safe to call from the :class:`Broker` thread, as
1730            it may block on IO of its own.
1731        """
1732        self.lock.acquire()
1733        try:
1734            for id_ in self.it:
1735                return id_
1736
1737            master = self.router.context_by_id(0)
1738            start, end = master.send_await(
1739                mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID)
1740            )
1741            self.it = iter(xrange(start, end))
1742        finally:
1743            self.lock.release()
1744
1745        return self.allocate()
1746
1747
1748class CallChain(object):
1749    """
1750    Deliver :data:`mitogen.core.CALL_FUNCTION` messages to a target context,
1751    optionally threading related calls so an exception in an earlier call
1752    cancels subsequent calls.
1753
1754    :param mitogen.core.Context context:
1755        Target context.
1756    :param bool pipelined:
1757        Enable pipelining.
1758
1759    :meth:`call`, :meth:`call_no_reply` and :meth:`call_async`
1760    normally issue calls and produce responses with no memory of prior
1761    exceptions. If a call made with :meth:`call_no_reply` fails, the exception
1762    is logged to the target context's logging framework.
1763
1764    **Pipelining**
1765
1766    When pipelining is enabled, if an exception occurs during a call,
1767    subsequent calls made by the same :class:`CallChain` fail with the same
1768    exception, including those already in-flight on the network, and no further
1769    calls execute until :meth:`reset` is invoked.
1770
1771    No exception is logged for calls made with :meth:`call_no_reply`, instead
1772    the exception is saved and reported as the result of subsequent
1773    :meth:`call` or :meth:`call_async` calls.
1774
1775    Sequences of asynchronous calls can be made without wasting network
1776    round-trips to discover if prior calls succeed, and chains originating from
1777    multiple unrelated source contexts may overlap concurrently at a target
1778    context without interference.
1779
1780    In this example, 4 calls complete in one round-trip::
1781
1782        chain = mitogen.parent.CallChain(context, pipelined=True)
1783        chain.call_no_reply(os.mkdir, '/tmp/foo')
1784
1785        # If previous mkdir() failed, this never runs:
1786        chain.call_no_reply(os.mkdir, '/tmp/foo/bar')
1787
1788        # If either mkdir() failed, this never runs, and the exception is
1789        # asynchronously delivered to the receiver.
1790        recv = chain.call_async(subprocess.check_output, '/tmp/foo')
1791
1792        # If anything so far failed, this never runs, and raises the exception.
1793        chain.call(do_something)
1794
1795        # If this code was executed, the exception would also be raised.
1796        if recv.get().unpickle() == 'baz':
1797            pass
1798
1799    When pipelining is enabled, :meth:`reset` must be invoked to ensure any
1800    exception is discarded, otherwise unbounded memory usage is possible in
1801    long-running programs. The context manager protocol is supported to ensure
1802    :meth:`reset` is always invoked::
1803
1804        with mitogen.parent.CallChain(context, pipelined=True) as chain:
1805            chain.call_no_reply(...)
1806            chain.call_no_reply(...)
1807            chain.call_no_reply(...)
1808            chain.call(...)
1809
1810        # chain.reset() automatically invoked.
1811    """
1812    def __init__(self, context, pipelined=False):
1813        self.context = context
1814        if pipelined:
1815            self.chain_id = self.make_chain_id()
1816        else:
1817            self.chain_id = None
1818
1819    @classmethod
1820    def make_chain_id(cls):
1821        return '%s-%s-%x-%x' % (
1822            socket.gethostname(),
1823            os.getpid(),
1824            thread.get_ident(),
1825            int(1e6 * mitogen.core.now()),
1826        )
1827
1828    def __repr__(self):
1829        return '%s(%s)' % (self.__class__.__name__, self.context)
1830
1831    def __enter__(self):
1832        return self
1833
1834    def __exit__(self, _1, _2, _3):
1835        self.reset()
1836
1837    def reset(self):
1838        """
1839        Instruct the target to forget any related exception.
1840        """
1841        if not self.chain_id:
1842            return
1843
1844        saved, self.chain_id = self.chain_id, None
1845        try:
1846            self.call_no_reply(mitogen.core.Dispatcher.forget_chain, saved)
1847        finally:
1848            self.chain_id = saved
1849
1850    closures_msg = (
1851        'Mitogen cannot invoke closures, as doing so would require '
1852        'serializing arbitrary program state, and no universal '
1853        'method exists to recover a reference to them.'
1854    )
1855
1856    lambda_msg = (
1857        'Mitogen cannot invoke anonymous functions, as no universal method '
1858        'exists to recover a reference to an anonymous function.'
1859    )
1860
1861    method_msg = (
1862        'Mitogen cannot invoke instance methods, as doing so would require '
1863        'serializing arbitrary program state.'
1864    )
1865
1866    def make_msg(self, fn, *args, **kwargs):
1867        if getattr(fn, closure_attr, None) is not None:
1868            raise TypeError(self.closures_msg)
1869        if fn.__name__ == '<lambda>':
1870            raise TypeError(self.lambda_msg)
1871
1872        if inspect.ismethod(fn):
1873            im_self = getattr(fn, IM_SELF_ATTR)
1874            if not inspect.isclass(im_self):
1875                raise TypeError(self.method_msg)
1876            klass = mitogen.core.to_text(im_self.__name__)
1877        else:
1878            klass = None
1879
1880        tup = (
1881            self.chain_id,
1882            mitogen.core.to_text(fn.__module__),
1883            klass,
1884            mitogen.core.to_text(fn.__name__),
1885            args,
1886            mitogen.core.Kwargs(kwargs)
1887        )
1888        return mitogen.core.Message.pickled(tup,
1889            handle=mitogen.core.CALL_FUNCTION)
1890
1891    def call_no_reply(self, fn, *args, **kwargs):
1892        """
1893        Like :meth:`call_async`, but do not wait for a return value, and inform
1894        the target context no reply is expected. If the call fails and
1895        pipelining is disabled, the exception will be logged to the target
1896        context's logging framework.
1897        """
1898        LOG.debug('starting no-reply function call to %r: %r',
1899                  self.context.name or self.context.context_id,
1900                  CallSpec(fn, args, kwargs))
1901        self.context.send(self.make_msg(fn, *args, **kwargs))
1902
1903    def call_async(self, fn, *args, **kwargs):
1904        """
1905        Arrange for `fn(*args, **kwargs)` to be invoked on the context's main
1906        thread.
1907
1908        :param fn:
1909            A free function in module scope or a class method of a class
1910            directly reachable from module scope:
1911
1912            .. code-block:: python
1913
1914                # mymodule.py
1915
1916                def my_func():
1917                    '''A free function reachable as mymodule.my_func'''
1918
1919                class MyClass:
1920                    @classmethod
1921                    def my_classmethod(cls):
1922                        '''Reachable as mymodule.MyClass.my_classmethod'''
1923
1924                    def my_instancemethod(self):
1925                        '''Unreachable: requires a class instance!'''
1926
1927                    class MyEmbeddedClass:
1928                        @classmethod
1929                        def my_classmethod(cls):
1930                            '''Not directly reachable from module scope!'''
1931
1932        :param tuple args:
1933            Function arguments, if any. See :ref:`serialization-rules` for
1934            permitted types.
1935        :param dict kwargs:
1936            Function keyword arguments, if any. See :ref:`serialization-rules`
1937            for permitted types.
1938        :returns:
1939            :class:`mitogen.core.Receiver` configured to receive the result of
1940            the invocation:
1941
1942            .. code-block:: python
1943
1944                recv = context.call_async(os.check_output, 'ls /tmp/')
1945                try:
1946                    # Prints output once it is received.
1947                    msg = recv.get()
1948                    print(msg.unpickle())
1949                except mitogen.core.CallError, e:
1950                    print('Call failed:', str(e))
1951
1952            Asynchronous calls may be dispatched in parallel to multiple
1953            contexts and consumed as they complete using
1954            :class:`mitogen.select.Select`.
1955        """
1956        LOG.debug('starting function call to %s: %r',
1957                  self.context.name or self.context.context_id,
1958                  CallSpec(fn, args, kwargs))
1959        return self.context.send_async(self.make_msg(fn, *args, **kwargs))
1960
1961    def call(self, fn, *args, **kwargs):
1962        """
1963        Like :meth:`call_async`, but block until the return value is available.
1964        Equivalent to::
1965
1966            call_async(fn, *args, **kwargs).get().unpickle()
1967
1968        :returns:
1969            The function's return value.
1970        :raises mitogen.core.CallError:
1971            An exception was raised in the remote context during execution.
1972        """
1973        receiver = self.call_async(fn, *args, **kwargs)
1974        return receiver.get().unpickle(throw_dead=False)
1975
1976
1977class Context(mitogen.core.Context):
1978    """
1979    Extend :class:`mitogen.core.Context` with functionality useful to masters,
1980    and child contexts who later become parents. Currently when this class is
1981    required, the target context's router is upgraded at runtime.
1982    """
1983    #: A :class:`CallChain` instance constructed by default, with pipelining
1984    #: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use
1985    #: this instance.
1986    call_chain_class = CallChain
1987
1988    via = None
1989
1990    def __init__(self, *args, **kwargs):
1991        super(Context, self).__init__(*args, **kwargs)
1992        self.default_call_chain = self.call_chain_class(self)
1993
1994    def __ne__(self, other):
1995        return not (self == other)
1996
1997    def __eq__(self, other):
1998        return (
1999            isinstance(other, mitogen.core.Context) and
2000            (other.context_id == self.context_id) and
2001            (other.router == self.router)
2002        )
2003
2004    def __hash__(self):
2005        return hash((self.router, self.context_id))
2006
2007    def call_async(self, fn, *args, **kwargs):
2008        """
2009        See :meth:`CallChain.call_async`.
2010        """
2011        return self.default_call_chain.call_async(fn, *args, **kwargs)
2012
2013    def call(self, fn, *args, **kwargs):
2014        """
2015        See :meth:`CallChain.call`.
2016        """
2017        return self.default_call_chain.call(fn, *args, **kwargs)
2018
2019    def call_no_reply(self, fn, *args, **kwargs):
2020        """
2021        See :meth:`CallChain.call_no_reply`.
2022        """
2023        self.default_call_chain.call_no_reply(fn, *args, **kwargs)
2024
2025    def shutdown(self, wait=False):
2026        """
2027        Arrange for the context to receive a ``SHUTDOWN`` message, triggering
2028        graceful shutdown.
2029
2030        Due to a lack of support for timers, no attempt is made yet to force
2031        terminate a hung context using this method. This will be fixed shortly.
2032
2033        :param bool wait:
2034            If :data:`True`, block the calling thread until the context has
2035            completely terminated.
2036
2037        :returns:
2038            If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
2039            whose :meth:`get() <mitogen.core.Latch.get>` method returns
2040            :data:`None` when shutdown completes. The `timeout` parameter may
2041            be used to implement graceful timeouts.
2042        """
2043        LOG.debug('%r.shutdown() sending SHUTDOWN', self)
2044        latch = mitogen.core.Latch()
2045        mitogen.core.listen(self, 'disconnect', lambda: latch.put(None))
2046        self.send(
2047            mitogen.core.Message(
2048                handle=mitogen.core.SHUTDOWN,
2049            )
2050        )
2051
2052        if wait:
2053            latch.get()
2054        else:
2055            return latch
2056
2057
2058class RouteMonitor(object):
2059    """
2060    Generate and respond to :data:`mitogen.core.ADD_ROUTE` and
2061    :data:`mitogen.core.DEL_ROUTE` messages sent to the local context by
2062    maintaining a table of available routes, and propagating messages towards
2063    parents and siblings as appropriate.
2064
2065    :class:`RouteMonitor` is responsible for generating routing messages for
2066    directly attached children. It learns of new children via
2067    :meth:`notice_stream` called by :class:`Router`, and subscribes to their
2068    ``disconnect`` event to learn when they disappear.
2069
2070    In children, constructing this class overwrites the stub
2071    :data:`mitogen.core.DEL_ROUTE` handler installed by
2072    :class:`mitogen.core.ExternalContext`, which is expected behaviour when a
2073    child is beging upgraded in preparation to become a parent of children of
2074    its own.
2075
2076    By virtue of only being active while responding to messages from a handler,
2077    RouteMonitor lives entirely on the broker thread, so its data requires no
2078    locking.
2079
2080    :param mitogen.master.Router router:
2081        Router to install handlers on.
2082    :param mitogen.core.Context parent:
2083        :data:`None` in the master process, or reference to the parent context
2084        we should propagate route updates towards.
2085    """
2086    def __init__(self, router, parent=None):
2087        self.router = router
2088        self.parent = parent
2089        self._log = logging.getLogger('mitogen.route_monitor')
2090        #: Mapping of Stream instance to integer context IDs reachable via the
2091        #: stream; used to cleanup routes during disconnection.
2092        self._routes_by_stream = {}
2093        self.router.add_handler(
2094            fn=self._on_add_route,
2095            handle=mitogen.core.ADD_ROUTE,
2096            persist=True,
2097            policy=is_immediate_child,
2098            overwrite=True,
2099        )
2100        self.router.add_handler(
2101            fn=self._on_del_route,
2102            handle=mitogen.core.DEL_ROUTE,
2103            persist=True,
2104            policy=is_immediate_child,
2105            overwrite=True,
2106        )
2107
2108    def __repr__(self):
2109        return 'RouteMonitor()'
2110
2111    def _send_one(self, stream, handle, target_id, name):
2112        """
2113        Compose and send an update message on a stream.
2114
2115        :param mitogen.core.Stream stream:
2116            Stream to send it on.
2117        :param int handle:
2118            :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
2119        :param int target_id:
2120            ID of the connecting or disconnecting context.
2121        :param str name:
2122            Context name or :data:`None`.
2123        """
2124        if not stream:
2125            # We may not have a stream during shutdown.
2126            return
2127
2128        data = str(target_id)
2129        if name:
2130            data = '%s:%s' % (target_id, name)
2131        stream.protocol.send(
2132            mitogen.core.Message(
2133                handle=handle,
2134                data=data.encode('utf-8'),
2135                dst_id=stream.protocol.remote_id,
2136            )
2137        )
2138
2139    def _propagate_up(self, handle, target_id, name=None):
2140        """
2141        In a non-master context, propagate an update towards the master.
2142
2143        :param int handle:
2144            :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
2145        :param int target_id:
2146            ID of the connecting or disconnecting context.
2147        :param str name:
2148            For :data:`mitogen.core.ADD_ROUTE`, the name of the new context
2149            assigned by its parent. This is used by parents to assign the
2150            :attr:`mitogen.core.Context.name` attribute.
2151        """
2152        if self.parent:
2153            stream = self.router.stream_by_id(self.parent.context_id)
2154            self._send_one(stream, handle, target_id, name)
2155
2156    def _propagate_down(self, handle, target_id):
2157        """
2158        For DEL_ROUTE, we additionally want to broadcast the message to any
2159        stream that has ever communicated with the disconnecting ID, so
2160        core.py's :meth:`mitogen.core.Router._on_del_route` can turn the
2161        message into a disconnect event.
2162
2163        :param int handle:
2164            :data:`mitogen.core.ADD_ROUTE` or :data:`mitogen.core.DEL_ROUTE`
2165        :param int target_id:
2166            ID of the connecting or disconnecting context.
2167        """
2168        for stream in self.router.get_streams():
2169            if target_id in stream.protocol.egress_ids and (
2170                    (self.parent is None) or
2171                    (self.parent.context_id != stream.protocol.remote_id)
2172                ):
2173                self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None)
2174
2175    def notice_stream(self, stream):
2176        """
2177        When this parent is responsible for a new directly connected child
2178        stream, we're also responsible for broadcasting
2179        :data:`mitogen.core.DEL_ROUTE` upstream when that child disconnects.
2180        """
2181        self._routes_by_stream[stream] = set([stream.protocol.remote_id])
2182        self._propagate_up(mitogen.core.ADD_ROUTE, stream.protocol.remote_id,
2183                        stream.name)
2184        mitogen.core.listen(
2185            obj=stream,
2186            name='disconnect',
2187            func=lambda: self._on_stream_disconnect(stream),
2188        )
2189
2190    def get_routes(self, stream):
2191        """
2192        Return the set of context IDs reachable on a stream.
2193
2194        :param mitogen.core.Stream stream:
2195        :returns: set([int])
2196        """
2197        return self._routes_by_stream.get(stream) or set()
2198
2199    def _on_stream_disconnect(self, stream):
2200        """
2201        Respond to disconnection of a local stream by propagating DEL_ROUTE for
2202        any contexts we know were attached to it.
2203        """
2204        # During a stream crash it is possible for disconnect signal to fire
2205        # twice, in which case ignore the second instance.
2206        routes = self._routes_by_stream.pop(stream, None)
2207        if routes is None:
2208            return
2209
2210        self._log.debug('stream %s is gone; propagating DEL_ROUTE for %r',
2211                        stream.name, routes)
2212        for target_id in routes:
2213            self.router.del_route(target_id)
2214            self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
2215            self._propagate_down(mitogen.core.DEL_ROUTE, target_id)
2216
2217            context = self.router.context_by_id(target_id, create=False)
2218            if context:
2219                mitogen.core.fire(context, 'disconnect')
2220
2221    def _on_add_route(self, msg):
2222        """
2223        Respond to :data:`mitogen.core.ADD_ROUTE` by validating the source of
2224        the message, updating the local table, and propagating the message
2225        upwards.
2226        """
2227        if msg.is_dead:
2228            return
2229
2230        target_id_s, _, target_name = bytes_partition(msg.data, b(':'))
2231        target_name = target_name.decode()
2232        target_id = int(target_id_s)
2233        self.router.context_by_id(target_id).name = target_name
2234        stream = self.router.stream_by_id(msg.src_id)
2235        current = self.router.stream_by_id(target_id)
2236        if current and current.protocol.remote_id != mitogen.parent_id:
2237            self._log.error('Cannot add duplicate route to %r via %r, '
2238                            'already have existing route via %r',
2239                            target_id, stream, current)
2240            return
2241
2242        self._log.debug('Adding route to %d via %r', target_id, stream)
2243        self._routes_by_stream[stream].add(target_id)
2244        self.router.add_route(target_id, stream)
2245        self._propagate_up(mitogen.core.ADD_ROUTE, target_id, target_name)
2246
2247    def _on_del_route(self, msg):
2248        """
2249        Respond to :data:`mitogen.core.DEL_ROUTE` by validating the source of
2250        the message, updating the local table, propagating the message
2251        upwards, and downwards towards any stream that every had a message
2252        forwarded from it towards the disconnecting context.
2253        """
2254        if msg.is_dead:
2255            return
2256
2257        target_id = int(msg.data)
2258        registered_stream = self.router.stream_by_id(target_id)
2259        if registered_stream is None:
2260            return
2261
2262        stream = self.router.stream_by_id(msg.src_id)
2263        if registered_stream != stream:
2264            self._log.error('received DEL_ROUTE for %d from %r, expected %r',
2265                            target_id, stream, registered_stream)
2266            return
2267
2268        context = self.router.context_by_id(target_id, create=False)
2269        if context:
2270            self._log.debug('firing local disconnect signal for %r', context)
2271            mitogen.core.fire(context, 'disconnect')
2272
2273        self._log.debug('deleting route to %d via %r', target_id, stream)
2274        routes = self._routes_by_stream.get(stream)
2275        if routes:
2276            routes.discard(target_id)
2277
2278        self.router.del_route(target_id)
2279        if stream.protocol.remote_id != mitogen.parent_id:
2280            self._propagate_up(mitogen.core.DEL_ROUTE, target_id)
2281        self._propagate_down(mitogen.core.DEL_ROUTE, target_id)
2282
2283
2284class Router(mitogen.core.Router):
2285    context_class = Context
2286    debug = False
2287    profiling = False
2288
2289    id_allocator = None
2290    responder = None
2291    log_forwarder = None
2292    route_monitor = None
2293
2294    def upgrade(self, importer, parent):
2295        LOG.debug('upgrading %r with capabilities to start new children', self)
2296        self.id_allocator = ChildIdAllocator(router=self)
2297        self.responder = ModuleForwarder(
2298            router=self,
2299            parent_context=parent,
2300            importer=importer,
2301        )
2302        self.route_monitor = RouteMonitor(self, parent)
2303        self.add_handler(
2304            fn=self._on_detaching,
2305            handle=mitogen.core.DETACHING,
2306            persist=True,
2307        )
2308
2309    def _on_detaching(self, msg):
2310        if msg.is_dead:
2311            return
2312        stream = self.stream_by_id(msg.src_id)
2313        if stream.protocol.remote_id != msg.src_id or stream.conn.detached:
2314            LOG.warning('bad DETACHING received on %r: %r', stream, msg)
2315            return
2316        LOG.debug('%r: marking as detached', stream)
2317        stream.conn.detached = True
2318        msg.reply(None)
2319
2320    def get_streams(self):
2321        """
2322        Return an atomic snapshot of all streams in existence at time of call.
2323        This is safe to call from any thread.
2324        """
2325        self._write_lock.acquire()
2326        try:
2327            return itervalues(self._stream_by_id)
2328        finally:
2329            self._write_lock.release()
2330
2331    def disconnect(self, context):
2332        """
2333        Disconnect a context and forget its stream, assuming the context is
2334        directly connected.
2335        """
2336        stream = self.stream_by_id(context)
2337        if stream is None or stream.protocol.remote_id != context.context_id:
2338            return
2339
2340        l = mitogen.core.Latch()
2341        mitogen.core.listen(stream, 'disconnect', l.put)
2342        def disconnect():
2343            LOG.debug('Starting disconnect of %r', stream)
2344            stream.on_disconnect(self.broker)
2345        self.broker.defer(disconnect)
2346        l.get()
2347
2348    def add_route(self, target_id, stream):
2349        """
2350        Arrange for messages whose `dst_id` is `target_id` to be forwarded on a
2351        directly connected :class:`Stream`. Safe to call from any thread.
2352
2353        This is called automatically by :class:`RouteMonitor` in response to
2354        :data:`mitogen.core.ADD_ROUTE` messages, but remains public while the
2355        design has not yet settled, and situations may arise where routing is
2356        not fully automatic.
2357
2358        :param int target_id:
2359            Target context ID to add a route for.
2360        :param mitogen.core.Stream stream:
2361            Stream over which messages to the target should be routed.
2362        """
2363        LOG.debug('%r: adding route to context %r via %r',
2364                  self, target_id, stream)
2365        assert isinstance(target_id, int)
2366        assert isinstance(stream, mitogen.core.Stream)
2367
2368        self._write_lock.acquire()
2369        try:
2370            self._stream_by_id[target_id] = stream
2371        finally:
2372            self._write_lock.release()
2373
2374    def del_route(self, target_id):
2375        """
2376        Delete any route that exists for `target_id`. It is not an error to
2377        delete a route that does not currently exist. Safe to call from any
2378        thread.
2379
2380        This is called automatically by :class:`RouteMonitor` in response to
2381        :data:`mitogen.core.DEL_ROUTE` messages, but remains public while the
2382        design has not yet settled, and situations may arise where routing is
2383        not fully automatic.
2384
2385        :param int target_id:
2386            Target context ID to delete route for.
2387        """
2388        LOG.debug('%r: deleting route to %r', self, target_id)
2389        # DEL_ROUTE may be sent by a parent if it knows this context sent
2390        # messages to a peer that has now disconnected, to let us raise
2391        # 'disconnect' event on the appropriate Context instance. In that case,
2392        # we won't a matching _stream_by_id entry for the disappearing route,
2393        # so don't raise an error for a missing key here.
2394        self._write_lock.acquire()
2395        try:
2396            self._stream_by_id.pop(target_id, None)
2397        finally:
2398            self._write_lock.release()
2399
2400    def get_module_blacklist(self):
2401        if mitogen.context_id == 0:
2402            return self.responder.blacklist
2403        return self.importer.master_blacklist
2404
2405    def get_module_whitelist(self):
2406        if mitogen.context_id == 0:
2407            return self.responder.whitelist
2408        return self.importer.master_whitelist
2409
2410    def allocate_id(self):
2411        return self.id_allocator.allocate()
2412
2413    connection_timeout_msg = u"Connection timed out."
2414
2415    def _connect(self, klass, **kwargs):
2416        context_id = self.allocate_id()
2417        context = self.context_class(self, context_id)
2418        context.name = kwargs.get('name')
2419
2420        kwargs['old_router'] = self
2421        kwargs['max_message_size'] = self.max_message_size
2422        conn = klass(klass.options_class(**kwargs), self)
2423        try:
2424            conn.connect(context=context)
2425        except mitogen.core.TimeoutError:
2426            raise mitogen.core.StreamError(self.connection_timeout_msg)
2427
2428        return context
2429
2430    def connect(self, method_name, name=None, **kwargs):
2431        if name:
2432            name = mitogen.core.to_text(name)
2433
2434        klass = get_connection_class(method_name)
2435        kwargs.setdefault(u'debug', self.debug)
2436        kwargs.setdefault(u'profiling', self.profiling)
2437        kwargs.setdefault(u'unidirectional', self.unidirectional)
2438        kwargs.setdefault(u'name', name)
2439
2440        via = kwargs.pop(u'via', None)
2441        if via is not None:
2442            return self.proxy_connect(via, method_name,
2443                **mitogen.core.Kwargs(kwargs))
2444        return self._connect(klass, **mitogen.core.Kwargs(kwargs))
2445
2446    def proxy_connect(self, via_context, method_name, name=None, **kwargs):
2447        resp = via_context.call(_proxy_connect,
2448            name=name,
2449            method_name=method_name,
2450            kwargs=mitogen.core.Kwargs(kwargs),
2451        )
2452        if resp['msg'] is not None:
2453            raise mitogen.core.StreamError(resp['msg'])
2454
2455        name = u'%s.%s' % (via_context.name, resp['name'])
2456        context = self.context_class(self, resp['id'], name=name)
2457        context.via = via_context
2458        self._write_lock.acquire()
2459        try:
2460            self._context_by_id[context.context_id] = context
2461        finally:
2462            self._write_lock.release()
2463        return context
2464
2465    def buildah(self, **kwargs):
2466        return self.connect(u'buildah', **kwargs)
2467
2468    def doas(self, **kwargs):
2469        return self.connect(u'doas', **kwargs)
2470
2471    def docker(self, **kwargs):
2472        return self.connect(u'docker', **kwargs)
2473
2474    def kubectl(self, **kwargs):
2475        return self.connect(u'kubectl', **kwargs)
2476
2477    def fork(self, **kwargs):
2478        return self.connect(u'fork', **kwargs)
2479
2480    def jail(self, **kwargs):
2481        return self.connect(u'jail', **kwargs)
2482
2483    def local(self, **kwargs):
2484        return self.connect(u'local', **kwargs)
2485
2486    def lxc(self, **kwargs):
2487        return self.connect(u'lxc', **kwargs)
2488
2489    def lxd(self, **kwargs):
2490        return self.connect(u'lxd', **kwargs)
2491
2492    def setns(self, **kwargs):
2493        return self.connect(u'setns', **kwargs)
2494
2495    def su(self, **kwargs):
2496        return self.connect(u'su', **kwargs)
2497
2498    def sudo(self, **kwargs):
2499        return self.connect(u'sudo', **kwargs)
2500
2501    def ssh(self, **kwargs):
2502        return self.connect(u'ssh', **kwargs)
2503
2504
2505class Reaper(object):
2506    """
2507    Asynchronous logic for reaping :class:`Process` objects. This is necessary
2508    to prevent uncontrolled buildup of zombie processes in long-lived parents
2509    that will eventually reach an OS limit, preventing creation of new threads
2510    and processes, and to log the exit status of the child in the case of an
2511    error.
2512
2513    To avoid modifying process-global state such as with
2514    :func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler
2515    that might interfere with the user's ability to use those facilities,
2516    Reaper polls for exit with backoff using timers installed on an associated
2517    :class:`Broker`.
2518
2519    :param mitogen.core.Broker broker:
2520        The :class:`Broker` on which to install timers
2521    :param mitogen.parent.Process proc:
2522        The process to reap.
2523    :param bool kill:
2524        If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
2525    :param bool wait_on_shutdown:
2526        If :data:`True`, delay :class:`Broker` shutdown if child has not yet
2527        exited. If :data:`False` simply forget the child.
2528    """
2529    #: :class:`Timer` that invokes :meth:`reap` after some polling delay.
2530    _timer = None
2531
2532    def __init__(self, broker, proc, kill, wait_on_shutdown):
2533        self.broker = broker
2534        self.proc = proc
2535        self.kill = kill
2536        self.wait_on_shutdown = wait_on_shutdown
2537        self._tries = 0
2538
2539    def _signal_child(self, signum):
2540        # For processes like sudo we cannot actually send sudo a signal,
2541        # because it is setuid, so this is best-effort only.
2542        LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
2543        try:
2544            os.kill(self.proc.pid, signum)
2545        except OSError:
2546            e = sys.exc_info()[1]
2547            if e.args[0] != errno.EPERM:
2548                raise
2549
2550    def _calc_delay(self, count):
2551        """
2552        Calculate a poll delay given `count` attempts have already been made.
2553        These constants have no principle, they just produce rapid but still
2554        relatively conservative retries.
2555        """
2556        delay = 0.05
2557        for _ in xrange(count):
2558            delay *= 1.72
2559        return delay
2560
2561    def _on_broker_shutdown(self):
2562        """
2563        Respond to :class:`Broker` shutdown by cancelling the reap timer if
2564        :attr:`Router.await_children_at_shutdown` is disabled. Otherwise
2565        shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for
2566        subprocesses may have no intention of exiting any time soon.
2567        """
2568        if not self.wait_on_shutdown:
2569            self._timer.cancel()
2570
2571    def _install_timer(self, delay):
2572        new = self._timer is None
2573        self._timer = self.broker.timers.schedule(
2574            when=mitogen.core.now() + delay,
2575            func=self.reap,
2576        )
2577        if new:
2578            mitogen.core.listen(self.broker, 'shutdown',
2579                                self._on_broker_shutdown)
2580
2581    def _remove_timer(self):
2582        if self._timer and self._timer.active:
2583            self._timer.cancel()
2584            mitogen.core.unlisten(self.broker, 'shutdown',
2585                                  self._on_broker_shutdown)
2586
2587    def reap(self):
2588        """
2589        Reap the child process during disconnection.
2590        """
2591        status = self.proc.poll()
2592        if status is not None:
2593            LOG.debug('%r: %s', self.proc, returncode_to_str(status))
2594            mitogen.core.fire(self.proc, 'exit')
2595            self._remove_timer()
2596            return
2597
2598        self._tries += 1
2599        if self._tries > 20:
2600            LOG.warning('%r: child will not exit, giving up', self)
2601            self._remove_timer()
2602            return
2603
2604        delay = self._calc_delay(self._tries - 1)
2605        LOG.debug('%r still running after IO disconnect, recheck in %.03fs',
2606                  self.proc, delay)
2607        self._install_timer(delay)
2608
2609        if not self.kill:
2610            pass
2611        elif self._tries == 2:
2612            self._signal_child(signal.SIGTERM)
2613        elif self._tries == 6:  # roughly 4 seconds
2614            self._signal_child(signal.SIGKILL)
2615
2616
2617class Process(object):
2618    """
2619    Process objects provide a uniform interface to the :mod:`subprocess` and
2620    :mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and
2621    :class:`mitogen.fork.Process`.
2622
2623    :param int pid:
2624        The process ID.
2625    :param file stdin:
2626        File object attached to standard input.
2627    :param file stdout:
2628        File object attached to standard output.
2629    :param file stderr:
2630        File object attached to standard error, or :data:`None`.
2631    """
2632    #: Name of the process used in logs. Set to the stream/context name by
2633    #: :class:`Connection`.
2634    name = None
2635
2636    def __init__(self, pid, stdin, stdout, stderr=None):
2637        #: The process ID.
2638        self.pid = pid
2639        #: File object attached to standard input.
2640        self.stdin = stdin
2641        #: File object attached to standard output.
2642        self.stdout = stdout
2643        #: File object attached to standard error.
2644        self.stderr = stderr
2645
2646    def __repr__(self):
2647        return '%s %s pid %d' % (
2648            type(self).__name__,
2649            self.name,
2650            self.pid,
2651        )
2652
2653    def poll(self):
2654        """
2655        Fetch the child process exit status, or :data:`None` if it is still
2656        running. This should be overridden by subclasses.
2657
2658        :returns:
2659            Exit status in the style of the :attr:`subprocess.Popen.returncode`
2660            attribute, i.e. with signals represented by a negative integer.
2661        """
2662        raise NotImplementedError()
2663
2664
2665class PopenProcess(Process):
2666    """
2667    :class:`Process` subclass wrapping a :class:`subprocess.Popen` object.
2668
2669    :param subprocess.Popen proc:
2670        The subprocess.
2671    """
2672    def __init__(self, proc, stdin, stdout, stderr=None):
2673        super(PopenProcess, self).__init__(proc.pid, stdin, stdout, stderr)
2674        #: The subprocess.
2675        self.proc = proc
2676
2677    def poll(self):
2678        return self.proc.poll()
2679
2680
2681class ModuleForwarder(object):
2682    """
2683    Respond to :data:`mitogen.core.GET_MODULE` requests in a child by
2684    forwarding the request to our parent context, or satisfying the request
2685    from our local Importer cache.
2686    """
2687    def __init__(self, router, parent_context, importer):
2688        self.router = router
2689        self.parent_context = parent_context
2690        self.importer = importer
2691        router.add_handler(
2692            fn=self._on_forward_module,
2693            handle=mitogen.core.FORWARD_MODULE,
2694            persist=True,
2695            policy=mitogen.core.has_parent_authority,
2696        )
2697        router.add_handler(
2698            fn=self._on_get_module,
2699            handle=mitogen.core.GET_MODULE,
2700            persist=True,
2701            policy=is_immediate_child,
2702        )
2703
2704    def __repr__(self):
2705        return 'ModuleForwarder'
2706
2707    def _on_forward_module(self, msg):
2708        if msg.is_dead:
2709            return
2710
2711        context_id_s, _, fullname = bytes_partition(msg.data, b('\x00'))
2712        fullname = mitogen.core.to_text(fullname)
2713        context_id = int(context_id_s)
2714        stream = self.router.stream_by_id(context_id)
2715        if stream.protocol.remote_id == mitogen.parent_id:
2716            LOG.error('%r: dropping FORWARD_MODULE(%d, %r): no route to child',
2717                      self, context_id, fullname)
2718            return
2719
2720        if fullname in stream.protocol.sent_modules:
2721            return
2722
2723        LOG.debug('%r._on_forward_module() sending %r to %r via %r',
2724                  self, fullname, context_id, stream.protocol.remote_id)
2725        self._send_module_and_related(stream, fullname)
2726        if stream.protocol.remote_id != context_id:
2727            stream.protocol._send(
2728                mitogen.core.Message(
2729                    data=msg.data,
2730                    handle=mitogen.core.FORWARD_MODULE,
2731                    dst_id=stream.protocol.remote_id,
2732                )
2733            )
2734
2735    def _on_get_module(self, msg):
2736        if msg.is_dead:
2737            return
2738
2739        fullname = msg.data.decode('utf-8')
2740        LOG.debug('%r: %s requested by context %d', self, fullname, msg.src_id)
2741        callback = lambda: self._on_cache_callback(msg, fullname)
2742        self.importer._request_module(fullname, callback)
2743
2744    def _on_cache_callback(self, msg, fullname):
2745        stream = self.router.stream_by_id(msg.src_id)
2746        LOG.debug('%r: sending %s to %r', self, fullname, stream)
2747        self._send_module_and_related(stream, fullname)
2748
2749    def _send_module_and_related(self, stream, fullname):
2750        tup = self.importer._cache[fullname]
2751        for related in tup[4]:
2752            rtup = self.importer._cache.get(related)
2753            if rtup:
2754                self._send_one_module(stream, rtup)
2755            else:
2756                LOG.debug('%r: %s not in cache (for %s)',
2757                          self, related, fullname)
2758
2759        self._send_one_module(stream, tup)
2760
2761    def _send_one_module(self, stream, tup):
2762        if tup[0] not in stream.protocol.sent_modules:
2763            stream.protocol.sent_modules.add(tup[0])
2764            self.router._async_route(
2765                mitogen.core.Message.pickled(
2766                    tup,
2767                    dst_id=stream.protocol.remote_id,
2768                    handle=mitogen.core.LOAD_MODULE,
2769                )
2770            )
2771