1# Copyright 2019, David Wilson
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are met:
5#
6# 1. Redistributions of source code must retain the above copyright notice,
7# this list of conditions and the following disclaimer.
8#
9# 2. Redistributions in binary form must reproduce the above copyright notice,
10# this list of conditions and the following disclaimer in the documentation
11# and/or other materials provided with the distribution.
12#
13# 3. Neither the name of the copyright holder nor the names of its contributors
14# may be used to endorse or promote products derived from this software without
15# specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28
29# !mitogen: minify_safe
30
31"""
32This module implements most package functionality, but remains separate from
33non-essential code in order to reduce its size, since it is also serves as the
34bootstrap implementation sent to every new slave context.
35"""
36
37import binascii
38import collections
39import encodings.latin_1
40import encodings.utf_8
41import errno
42import fcntl
43import itertools
44import linecache
45import logging
46import os
47import pickle as py_pickle
48import pstats
49import signal
50import socket
51import struct
52import sys
53import syslog
54import threading
55import time
56import traceback
57import warnings
58import weakref
59import zlib
60
61# Python >3.7 deprecated the imp module.
62warnings.filterwarnings('ignore', message='the imp module is deprecated')
63import imp
64
65# Absolute imports for <2.5.
66select = __import__('select')
67
68try:
69    import cProfile
70except ImportError:
71    cProfile = None
72
73try:
74    import thread
75except ImportError:
76    import threading as thread
77
78try:
79    import cPickle as pickle
80except ImportError:
81    import pickle
82
83try:
84    from cStringIO import StringIO as BytesIO
85except ImportError:
86    from io import BytesIO
87
88try:
89    BaseException
90except NameError:
91    BaseException = Exception
92
93try:
94    ModuleNotFoundError
95except NameError:
96    ModuleNotFoundError = ImportError
97
98# TODO: usage of 'import' after setting __name__, but before fixing up
99# sys.modules generates a warning. This happens when profiling = True.
100warnings.filterwarnings('ignore',
101    "Parent module 'mitogen' not found while handling absolute import")
102
103LOG = logging.getLogger('mitogen')
104IOLOG = logging.getLogger('mitogen.io')
105IOLOG.setLevel(logging.INFO)
106
107# str.encode() may take import lock. Deadlock possible if broker calls
108# .encode() on behalf of thread currently waiting for module.
109LATIN1_CODEC = encodings.latin_1.Codec()
110
111_v = False
112_vv = False
113
114GET_MODULE = 100
115CALL_FUNCTION = 101
116FORWARD_LOG = 102
117ADD_ROUTE = 103
118DEL_ROUTE = 104
119ALLOCATE_ID = 105
120SHUTDOWN = 106
121LOAD_MODULE = 107
122FORWARD_MODULE = 108
123DETACHING = 109
124CALL_SERVICE = 110
125STUB_CALL_SERVICE = 111
126
127#: Special value used to signal disconnection or the inability to route a
128#: message, when it appears in the `reply_to` field. Usually causes
129#: :class:`mitogen.core.ChannelError` to be raised when it is received.
130#:
131#: It indicates the sender did not know how to process the message, or wishes
132#: no further messages to be delivered to it. It is used when:
133#:
134#:  * a remote receiver is disconnected or explicitly closed.
135#:  * a related message could not be delivered due to no route existing for it.
136#:  * a router is being torn down, as a sentinel value to notify
137#:    :meth:`mitogen.core.Router.add_handler` callbacks to clean up.
138IS_DEAD = 999
139
140try:
141    BaseException
142except NameError:
143    BaseException = Exception
144
145PY24 = sys.version_info < (2, 5)
146PY3 = sys.version_info > (3,)
147if PY3:
148    b = str.encode
149    BytesType = bytes
150    UnicodeType = str
151    FsPathTypes = (str,)
152    BufferType = lambda buf, start: memoryview(buf)[start:]
153    long = int
154else:
155    b = str
156    BytesType = str
157    FsPathTypes = (str, unicode)
158    BufferType = buffer
159    UnicodeType = unicode
160
161AnyTextType = (BytesType, UnicodeType)
162
163try:
164    next
165except NameError:
166    next = lambda it: it.next()
167
168# #550: prehistoric WSL did not advertise itself in uname output.
169try:
170    fp = open('/proc/sys/kernel/osrelease')
171    IS_WSL = 'Microsoft' in fp.read()
172    fp.close()
173except IOError:
174    IS_WSL = False
175
176
177#: Default size for calls to :meth:`Side.read` or :meth:`Side.write`, and the
178#: size of buffers configured by :func:`mitogen.parent.create_socketpair`. This
179#: value has many performance implications, 128KiB seems to be a sweet spot.
180#:
181#: * When set low, large messages cause many :class:`Broker` IO loop
182#:   iterations, burning CPU and reducing throughput.
183#: * When set high, excessive RAM is reserved by the OS for socket buffers (2x
184#:   per child), and an identically sized temporary userspace buffer is
185#:   allocated on each read that requires zeroing, and over a particular size
186#:   may require two system calls to allocate/deallocate.
187#:
188#: Care must be taken to ensure the underlying kernel object and receiving
189#: program support the desired size. For example,
190#:
191#: * Most UNIXes have TTYs with fixed 2KiB-4KiB buffers, making them unsuitable
192#:   for efficient IO.
193#: * Different UNIXes have varying presets for pipes, which may not be
194#:   configurable. On recent Linux the default pipe buffer size is 64KiB, but
195#:   under memory pressure may be as low as 4KiB for unprivileged processes.
196#: * When communication is via an intermediary process, its internal buffers
197#:   effect the speed OS buffers will drain. For example OpenSSH uses 64KiB
198#:   reads.
199#:
200#: An ideal :class:`Message` has a size that is a multiple of
201#: :data:`CHUNK_SIZE` inclusive of headers, to avoid wasting IO loop iterations
202#: writing small trailer chunks.
203CHUNK_SIZE = 131072
204
205_tls = threading.local()
206
207
208if __name__ == 'mitogen.core':
209    # When loaded using import mechanism, ExternalContext.main() will not have
210    # a chance to set the synthetic mitogen global, so just import it here.
211    import mitogen
212else:
213    # When loaded as __main__, ensure classes and functions gain a __module__
214    # attribute consistent with the host process, so that pickling succeeds.
215    __name__ = 'mitogen.core'
216
217
218class Error(Exception):
219    """
220    Base for all exceptions raised by Mitogen.
221
222    :param str fmt:
223        Exception text, or format string if `args` is non-empty.
224    :param tuple args:
225        Format string arguments.
226    """
227    def __init__(self, fmt=None, *args):
228        if args:
229            fmt %= args
230        if fmt and not isinstance(fmt, UnicodeType):
231            fmt = fmt.decode('utf-8')
232        Exception.__init__(self, fmt)
233
234
235class LatchError(Error):
236    """
237    Raised when an attempt is made to use a :class:`mitogen.core.Latch` that
238    has been marked closed.
239    """
240    pass
241
242
243class Blob(BytesType):
244    """
245    A serializable bytes subclass whose content is summarized in repr() output,
246    making it suitable for logging binary data.
247    """
248    def __repr__(self):
249        return '[blob: %d bytes]' % len(self)
250
251    def __reduce__(self):
252        return (Blob, (BytesType(self),))
253
254
255class Secret(UnicodeType):
256    """
257    A serializable unicode subclass whose content is masked in repr() output,
258    making it suitable for logging passwords.
259    """
260    def __repr__(self):
261        return '[secret]'
262
263    if not PY3:
264        # TODO: what is this needed for in 2.x?
265        def __str__(self):
266            return UnicodeType(self)
267
268    def __reduce__(self):
269        return (Secret, (UnicodeType(self),))
270
271
272class Kwargs(dict):
273    """
274    A serializable dict subclass that indicates its keys should be coerced to
275    Unicode on Python 3 and bytes on Python<2.6.
276
277    Python 2 produces keyword argument dicts whose keys are bytes, requiring a
278    helper to ensure compatibility with Python 3 where Unicode is required,
279    whereas Python 3 produces keyword argument dicts whose keys are Unicode,
280    requiring a helper for Python 2.4/2.5, where bytes are required.
281    """
282    if PY3:
283        def __init__(self, dct):
284            for k, v in dct.items():
285                if type(k) is bytes:
286                    self[k.decode()] = v
287                else:
288                    self[k] = v
289    elif sys.version_info < (2, 6, 5):
290        def __init__(self, dct):
291            for k, v in dct.iteritems():
292                if type(k) is unicode:
293                    k, _ = encodings.utf_8.encode(k)
294                self[k] = v
295
296    def __repr__(self):
297        return 'Kwargs(%s)' % (dict.__repr__(self),)
298
299    def __reduce__(self):
300        return (Kwargs, (dict(self),))
301
302
303class CallError(Error):
304    """
305    Serializable :class:`Error` subclass raised when :meth:`Context.call()
306    <mitogen.parent.Context.call>` fails. A copy of the traceback from the
307    external context is appended to the exception message.
308    """
309    def __init__(self, fmt=None, *args):
310        if not isinstance(fmt, BaseException):
311            Error.__init__(self, fmt, *args)
312        else:
313            e = fmt
314            cls = e.__class__
315            fmt = '%s.%s: %s' % (cls.__module__, cls.__name__, e)
316            tb = sys.exc_info()[2]
317            if tb:
318                fmt += '\n'
319                fmt += ''.join(traceback.format_tb(tb))
320            Error.__init__(self, fmt)
321
322    def __reduce__(self):
323        return (_unpickle_call_error, (self.args[0],))
324
325
326def _unpickle_call_error(s):
327    if not (type(s) is UnicodeType and len(s) < 10000):
328        raise TypeError('cannot unpickle CallError: bad input')
329    return CallError(s)
330
331
332class ChannelError(Error):
333    """
334    Raised when a channel dies or has been closed.
335    """
336    remote_msg = 'Channel closed by remote end.'
337    local_msg = 'Channel closed by local end.'
338
339
340class StreamError(Error):
341    """
342    Raised when a stream cannot be established.
343    """
344    pass
345
346
347class TimeoutError(Error):
348    """
349    Raised when a timeout occurs on a stream.
350    """
351    pass
352
353
354def to_text(o):
355    """
356    Coerce `o` to Unicode by decoding it from UTF-8 if it is an instance of
357    :class:`bytes`, otherwise pass it to the :class:`str` constructor. The
358    returned object is always a plain :class:`str`, any subclass is removed.
359    """
360    if isinstance(o, BytesType):
361        return o.decode('utf-8')
362    return UnicodeType(o)
363
364
365# Documented in api.rst to work around Sphinx limitation.
366now = getattr(time, 'monotonic', time.time)
367
368
369# Python 2.4
370try:
371    any
372except NameError:
373    def any(it):
374        for elem in it:
375            if elem:
376                return True
377
378
379def _partition(s, sep, find):
380    """
381    (str|unicode).(partition|rpartition) for Python 2.4/2.5.
382    """
383    idx = find(sep)
384    if idx != -1:
385        left = s[0:idx]
386        return left, sep, s[len(left)+len(sep):]
387
388
389if hasattr(UnicodeType, 'rpartition'):
390    str_partition = UnicodeType.partition
391    str_rpartition = UnicodeType.rpartition
392    bytes_partition = BytesType.partition
393else:
394    def str_partition(s, sep):
395        return _partition(s, sep, s.find) or (s, u'', u'')
396    def str_rpartition(s, sep):
397        return _partition(s, sep, s.rfind) or (u'', u'', s)
398    def bytes_partition(s, sep):
399        return _partition(s, sep, s.find) or (s, '', '')
400
401
402def _has_parent_authority(context_id):
403    return (
404        (context_id == mitogen.context_id) or
405        (context_id in mitogen.parent_ids)
406    )
407
408def has_parent_authority(msg, _stream=None):
409    """
410    Policy function for use with :class:`Receiver` and
411    :meth:`Router.add_handler` that requires incoming messages to originate
412    from a parent context, or on a :class:`Stream` whose :attr:`auth_id
413    <Stream.auth_id>` has been set to that of a parent context or the current
414    context.
415    """
416    return _has_parent_authority(msg.auth_id)
417
418
419def _signals(obj, signal):
420    return (
421        obj.__dict__
422        .setdefault('_signals', {})
423        .setdefault(signal, [])
424    )
425
426
427def listen(obj, name, func):
428    """
429    Arrange for `func()` to be invoked when signal `name` is fired on `obj`.
430    """
431    _signals(obj, name).append(func)
432
433
434def unlisten(obj, name, func):
435    """
436    Remove `func()` from the list of functions invoked when signal `name` is
437    fired by `obj`.
438
439    :raises ValueError:
440        `func()` was not on the list.
441    """
442    _signals(obj, name).remove(func)
443
444
445def fire(obj, name, *args, **kwargs):
446    """
447    Arrange for `func(*args, **kwargs)` to be invoked for every function
448    registered for signal `name` on `obj`.
449    """
450    for func in _signals(obj, name):
451        func(*args, **kwargs)
452
453
454def takes_econtext(func):
455    """
456    Decorator that marks a function or class method to automatically receive a
457    kwarg named `econtext`, referencing the
458    :class:`mitogen.core.ExternalContext` active in the context in which the
459    function is being invoked in. The decorator is only meaningful when the
460    function is invoked via :data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`.
461
462    When the function is invoked directly, `econtext` must still be passed to
463    it explicitly.
464    """
465    func.mitogen_takes_econtext = True
466    return func
467
468
469def takes_router(func):
470    """
471    Decorator that marks a function or class method to automatically receive a
472    kwarg named `router`, referencing the :class:`mitogen.core.Router` active
473    in the context in which the function is being invoked in. The decorator is
474    only meaningful when the function is invoked via :data:`CALL_FUNCTION
475    <mitogen.core.CALL_FUNCTION>`.
476
477    When the function is invoked directly, `router` must still be passed to it
478    explicitly.
479    """
480    func.mitogen_takes_router = True
481    return func
482
483
484def is_blacklisted_import(importer, fullname):
485    """
486    Return :data:`True` if `fullname` is part of a blacklisted package, or if
487    any packages have been whitelisted and `fullname` is not part of one.
488
489    NB:
490      - If a package is on both lists, then it is treated as blacklisted.
491      - If any package is whitelisted, then all non-whitelisted packages are
492        treated as blacklisted.
493    """
494    return ((not any(fullname.startswith(s) for s in importer.whitelist)) or
495                (any(fullname.startswith(s) for s in importer.blacklist)))
496
497
498def set_cloexec(fd):
499    """
500    Set the file descriptor `fd` to automatically close on :func:`os.execve`.
501    This has no effect on file descriptors inherited across :func:`os.fork`,
502    they must be explicitly closed through some other means, such as
503    :func:`mitogen.fork.on_fork`.
504    """
505    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
506    assert fd > 2, 'fd %r <= 2' % (fd,)
507    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
508
509
510def set_nonblock(fd):
511    """
512    Set the file descriptor `fd` to non-blocking mode. For most underlying file
513    types, this causes :func:`os.read` or :func:`os.write` to raise
514    :class:`OSError` with :data:`errno.EAGAIN` rather than block the thread
515    when the underlying kernel buffer is exhausted.
516    """
517    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
518    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
519
520
521def set_block(fd):
522    """
523    Inverse of :func:`set_nonblock`, i.e. cause `fd` to block the thread when
524    the underlying kernel buffer is exhausted.
525    """
526    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
527    fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
528
529
530def io_op(func, *args):
531    """
532    Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`,
533    or :class:`OSError`, trapping UNIX error codes relating to disconnection
534    and retry events in various subsystems:
535
536    * When a signal is delivered to the process on Python 2, system call retry
537      is signalled through :data:`errno.EINTR`. The invocation is automatically
538      restarted.
539    * When performing IO against a TTY, disconnection of the remote end is
540      signalled by :data:`errno.EIO`.
541    * When performing IO against a socket, disconnection of the remote end is
542      signalled by :data:`errno.ECONNRESET`.
543    * When performing IO against a pipe, disconnection of the remote end is
544      signalled by :data:`errno.EPIPE`.
545
546    :returns:
547        Tuple of `(return_value, disconnect_reason)`, where `return_value` is
548        the return value of `func(*args)`, and `disconnected` is an exception
549        instance when disconnection was detected, otherwise :data:`None`.
550    """
551    while True:
552        try:
553            return func(*args), None
554        except (select.error, OSError, IOError):
555            e = sys.exc_info()[1]
556            _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
557            if e.args[0] == errno.EINTR:
558                continue
559            if e.args[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
560                return None, e
561            raise
562
563
564class PidfulStreamHandler(logging.StreamHandler):
565    """
566    A :class:`logging.StreamHandler` subclass used when
567    :meth:`Router.enable_debug() <mitogen.master.Router.enable_debug>` has been
568    called, or the `debug` parameter was specified during context construction.
569    Verifies the process ID has not changed on each call to :meth:`emit`,
570    reopening the associated log file when a change is detected.
571
572    This ensures logging to the per-process output files happens correctly even
573    when uncooperative third party components call :func:`os.fork`.
574    """
575    #: PID that last opened the log file.
576    open_pid = None
577
578    #: Output path template.
579    template = '/tmp/mitogen.%s.%s.log'
580
581    def _reopen(self):
582        self.acquire()
583        try:
584            if self.open_pid == os.getpid():
585                return
586            ts = time.strftime('%Y%m%d_%H%M%S')
587            path = self.template % (os.getpid(), ts)
588            self.stream = open(path, 'w', 1)
589            set_cloexec(self.stream.fileno())
590            self.stream.write('Parent PID: %s\n' % (os.getppid(),))
591            self.stream.write('Created by:\n\n%s\n' % (
592                ''.join(traceback.format_stack()),
593            ))
594            self.open_pid = os.getpid()
595        finally:
596            self.release()
597
598    def emit(self, record):
599        if self.open_pid != os.getpid():
600            self._reopen()
601        logging.StreamHandler.emit(self, record)
602
603
604def enable_debug_logging():
605    global _v, _vv
606    _v = True
607    _vv = True
608    root = logging.getLogger()
609    root.setLevel(logging.DEBUG)
610    IOLOG.setLevel(logging.DEBUG)
611    handler = PidfulStreamHandler()
612    handler.formatter = logging.Formatter(
613        '%(asctime)s %(levelname).1s %(name)s: %(message)s',
614        '%H:%M:%S'
615    )
616    root.handlers.insert(0, handler)
617
618
619_profile_hook = lambda name, func, *args: func(*args)
620_profile_fmt = os.environ.get(
621    'MITOGEN_PROFILE_FMT',
622    '/tmp/mitogen.stats.%(pid)s.%(identity)s.%(now)s.%(ext)s',
623)
624
625
626def _profile_hook(name, func, *args):
627    """
628    Call `func(*args)` and return its result. This function is replaced by
629    :func:`_real_profile_hook` when :func:`enable_profiling` is called. This
630    interface is obsolete and will be replaced by a signals-based integration
631    later on.
632    """
633    return func(*args)
634
635
636def _real_profile_hook(name, func, *args):
637    profiler = cProfile.Profile()
638    profiler.enable()
639    try:
640        return func(*args)
641    finally:
642        path = _profile_fmt % {
643            'now': int(1e6 * now()),
644            'identity': name,
645            'pid': os.getpid(),
646            'ext': '%s'
647        }
648        profiler.dump_stats(path % ('pstats',))
649        profiler.create_stats()
650        fp = open(path % ('log',), 'w')
651        try:
652            stats = pstats.Stats(profiler, stream=fp)
653            stats.sort_stats('cumulative')
654            stats.print_stats()
655        finally:
656            fp.close()
657
658
659def enable_profiling(econtext=None):
660    global _profile_hook
661    _profile_hook = _real_profile_hook
662
663
664def import_module(modname):
665    """
666    Import `module` and return the attribute named `attr`.
667    """
668    return __import__(modname, None, None, [''])
669
670
671def pipe():
672    """
673    Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned
674    descriptors in Python file objects in order to manage their lifetime and
675    ensure they are closed when their last reference is discarded and they have
676    not been closed explicitly.
677    """
678    rfd, wfd = os.pipe()
679    return (
680        os.fdopen(rfd, 'rb', 0),
681        os.fdopen(wfd, 'wb', 0)
682    )
683
684
685def iter_split(buf, delim, func):
686    """
687    Invoke `func(s)` for each `delim`-delimited chunk in the potentially large
688    `buf`, avoiding intermediate lists and quadratic string operations. Return
689    the trailing undelimited portion of `buf`, or any unprocessed portion of
690    `buf` after `func(s)` returned :data:`False`.
691
692    :returns:
693        `(trailer, cont)`, where `cont` is :data:`False` if the last call to
694        `func(s)` returned :data:`False`.
695    """
696    dlen = len(delim)
697    start = 0
698    cont = True
699    while cont:
700        nl = buf.find(delim, start)
701        if nl == -1:
702            break
703        cont = not func(buf[start:nl]) is False
704        start = nl + dlen
705    return buf[start:], cont
706
707
708class Py24Pickler(py_pickle.Pickler):
709    """
710    Exceptions were classic classes until Python 2.5. Sadly for 2.4, cPickle
711    offers little control over how a classic instance is pickled. Therefore 2.4
712    uses a pure-Python pickler, so CallError can be made to look as it does on
713    newer Pythons.
714
715    This mess will go away once proper serialization exists.
716    """
717    @classmethod
718    def dumps(cls, obj, protocol):
719        bio = BytesIO()
720        self = cls(bio, protocol=protocol)
721        self.dump(obj)
722        return bio.getvalue()
723
724    def save_exc_inst(self, obj):
725        if isinstance(obj, CallError):
726            func, args = obj.__reduce__()
727            self.save(func)
728            self.save(args)
729            self.write(py_pickle.REDUCE)
730        else:
731            py_pickle.Pickler.save_inst(self, obj)
732
733    if PY24:
734        dispatch = py_pickle.Pickler.dispatch.copy()
735        dispatch[py_pickle.InstanceType] = save_exc_inst
736
737
738if PY3:
739    # In 3.x Unpickler is a class exposing find_class as an overridable, but it
740    # cannot be overridden without subclassing.
741    class _Unpickler(pickle.Unpickler):
742        def find_class(self, module, func):
743            return self.find_global(module, func)
744    pickle__dumps = pickle.dumps
745elif PY24:
746    # On Python 2.4, we must use a pure-Python pickler.
747    pickle__dumps = Py24Pickler.dumps
748    _Unpickler = pickle.Unpickler
749else:
750    pickle__dumps = pickle.dumps
751    # In 2.x Unpickler is a function exposing a writeable find_global
752    # attribute.
753    _Unpickler = pickle.Unpickler
754
755
756class Message(object):
757    """
758    Messages are the fundamental unit of communication, comprising fields from
759    the :ref:`stream-protocol` header, an optional reference to the receiving
760    :class:`mitogen.core.Router` for ingress messages, and helper methods for
761    deserialization and generating replies.
762    """
763    #: Integer target context ID. :class:`Router` delivers messages locally
764    #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise
765    #: they are routed up or downstream.
766    dst_id = None
767
768    #: Integer source context ID. Used as the target of replies if any are
769    #: generated.
770    src_id = None
771
772    #: Context ID under whose authority the message is acting. See
773    #: :ref:`source-verification`.
774    auth_id = None
775
776    #: Integer target handle in the destination context. This is one of the
777    #: :ref:`standard-handles`, or a dynamically generated handle used to
778    #: receive a one-time reply, such as the return value of a function call.
779    handle = None
780
781    #: Integer target handle to direct any reply to this message. Used to
782    #: receive a one-time reply, such as the return value of a function call.
783    #: :data:`IS_DEAD` has a special meaning when it appears in this field.
784    reply_to = None
785
786    #: Raw message data bytes.
787    data = b('')
788
789    _unpickled = object()
790
791    #: The :class:`Router` responsible for routing the message. This is
792    #: :data:`None` for locally originated messages.
793    router = None
794
795    #: The :class:`Receiver` over which the message was last received. Part of
796    #: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`.
797    receiver = None
798
799    HEADER_FMT = '>hLLLLLL'
800    HEADER_LEN = struct.calcsize(HEADER_FMT)
801    HEADER_MAGIC = 0x4d49  # 'MI'
802
803    def __init__(self, **kwargs):
804        """
805        Construct a message from from the supplied `kwargs`. :attr:`src_id` and
806        :attr:`auth_id` are always set to :data:`mitogen.context_id`.
807        """
808        self.src_id = mitogen.context_id
809        self.auth_id = mitogen.context_id
810        vars(self).update(kwargs)
811        assert isinstance(self.data, BytesType), 'Message data is not Bytes'
812
813    def pack(self):
814        return (
815            struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, self.dst_id,
816                        self.src_id, self.auth_id, self.handle,
817                        self.reply_to or 0, len(self.data))
818            + self.data
819        )
820
821    def _unpickle_context(self, context_id, name):
822        return _unpickle_context(context_id, name, router=self.router)
823
824    def _unpickle_sender(self, context_id, dst_handle):
825        return _unpickle_sender(self.router, context_id, dst_handle)
826
827    def _unpickle_bytes(self, s, encoding):
828        s, n = LATIN1_CODEC.encode(s)
829        return s
830
831    def _find_global(self, module, func):
832        """
833        Return the class implementing `module_name.class_name` or raise
834        `StreamError` if the module is not whitelisted.
835        """
836        if module == __name__:
837            if func == '_unpickle_call_error' or func == 'CallError':
838                return _unpickle_call_error
839            elif func == '_unpickle_sender':
840                return self._unpickle_sender
841            elif func == '_unpickle_context':
842                return self._unpickle_context
843            elif func == 'Blob':
844                return Blob
845            elif func == 'Secret':
846                return Secret
847            elif func == 'Kwargs':
848                return Kwargs
849        elif module == '_codecs' and func == 'encode':
850            return self._unpickle_bytes
851        elif module == '__builtin__' and func == 'bytes':
852            return BytesType
853        raise StreamError('cannot unpickle %r/%r', module, func)
854
855    @property
856    def is_dead(self):
857        """
858        :data:`True` if :attr:`reply_to` is set to the magic value
859        :data:`IS_DEAD`, indicating the sender considers the channel dead. Dead
860        messages can be raised in a variety of circumstances, see
861        :data:`IS_DEAD` for more information.
862        """
863        return self.reply_to == IS_DEAD
864
865    @classmethod
866    def dead(cls, reason=None, **kwargs):
867        """
868        Syntax helper to construct a dead message.
869        """
870        kwargs['data'], _ = encodings.utf_8.encode(reason or u'')
871        return cls(reply_to=IS_DEAD, **kwargs)
872
873    @classmethod
874    def pickled(cls, obj, **kwargs):
875        """
876        Construct a pickled message, setting :attr:`data` to the serialization
877        of `obj`, and setting remaining fields using `kwargs`.
878
879        :returns:
880            The new message.
881        """
882        self = cls(**kwargs)
883        try:
884            self.data = pickle__dumps(obj, protocol=2)
885        except pickle.PicklingError:
886            e = sys.exc_info()[1]
887            self.data = pickle__dumps(CallError(e), protocol=2)
888        return self
889
890    def reply(self, msg, router=None, **kwargs):
891        """
892        Compose a reply to this message and send it using :attr:`router`, or
893        `router` is :attr:`router` is :data:`None`.
894
895        :param obj:
896            Either a :class:`Message`, or an object to be serialized in order
897            to construct a new message.
898        :param router:
899            Optional router to use if :attr:`router` is :data:`None`.
900        :param kwargs:
901            Optional keyword parameters overriding message fields in the reply.
902        """
903        if not isinstance(msg, Message):
904            msg = Message.pickled(msg)
905        msg.dst_id = self.src_id
906        msg.handle = self.reply_to
907        vars(msg).update(kwargs)
908        if msg.handle:
909            (self.router or router).route(msg)
910        else:
911            LOG.debug('dropping reply to message with no return address: %r',
912                      msg)
913
914    if PY3:
915        UNPICKLER_KWARGS = {'encoding': 'bytes'}
916    else:
917        UNPICKLER_KWARGS = {}
918
919    def _throw_dead(self):
920        if len(self.data):
921            raise ChannelError(self.data.decode('utf-8', 'replace'))
922        elif self.src_id == mitogen.context_id:
923            raise ChannelError(ChannelError.local_msg)
924        else:
925            raise ChannelError(ChannelError.remote_msg)
926
927    def unpickle(self, throw=True, throw_dead=True):
928        """
929        Unpickle :attr:`data`, optionally raising any exceptions present.
930
931        :param bool throw_dead:
932            If :data:`True`, raise exceptions, otherwise it is the caller's
933            responsibility.
934
935        :raises CallError:
936            The serialized data contained CallError exception.
937        :raises ChannelError:
938            The `is_dead` field was set.
939        """
940        _vv and IOLOG.debug('%r.unpickle()', self)
941        if throw_dead and self.is_dead:
942            self._throw_dead()
943
944        obj = self._unpickled
945        if obj is Message._unpickled:
946            fp = BytesIO(self.data)
947            unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS)
948            unpickler.find_global = self._find_global
949            try:
950                # Must occur off the broker thread.
951                try:
952                    obj = unpickler.load()
953                except:
954                    LOG.error('raw pickle was: %r', self.data)
955                    raise
956                self._unpickled = obj
957            except (TypeError, ValueError):
958                e = sys.exc_info()[1]
959                raise StreamError('invalid message: %s', e)
960
961        if throw:
962            if isinstance(obj, CallError):
963                raise obj
964
965        return obj
966
967    def __repr__(self):
968        return 'Message(%r, %r, %r, %r, %r, %r..%d)' % (
969            self.dst_id, self.src_id, self.auth_id, self.handle,
970            self.reply_to, (self.data or '')[:50], len(self.data)
971        )
972
973
974class Sender(object):
975    """
976    Senders are used to send pickled messages to a handle in another context,
977    it is the inverse of :class:`mitogen.core.Receiver`.
978
979    Senders may be serialized, making them convenient to wire up data flows.
980    See :meth:`mitogen.core.Receiver.to_sender` for more information.
981
982    :param mitogen.core.Context context:
983        Context to send messages to.
984    :param int dst_handle:
985        Destination handle to send messages to.
986    """
987    def __init__(self, context, dst_handle):
988        self.context = context
989        self.dst_handle = dst_handle
990
991    def send(self, data):
992        """
993        Send `data` to the remote end.
994        """
995        _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
996        self.context.send(Message.pickled(data, handle=self.dst_handle))
997
998    explicit_close_msg = 'Sender was explicitly closed'
999
1000    def close(self):
1001        """
1002        Send a dead message to the remote, causing :meth:`ChannelError` to be
1003        raised in any waiting thread.
1004        """
1005        _vv and IOLOG.debug('%r.close()', self)
1006        self.context.send(
1007            Message.dead(
1008                reason=self.explicit_close_msg,
1009                handle=self.dst_handle
1010            )
1011        )
1012
1013    def __repr__(self):
1014        return 'Sender(%r, %r)' % (self.context, self.dst_handle)
1015
1016    def __reduce__(self):
1017        return _unpickle_sender, (self.context.context_id, self.dst_handle)
1018
1019
1020def _unpickle_sender(router, context_id, dst_handle):
1021    if not (isinstance(router, Router) and
1022            isinstance(context_id, (int, long)) and context_id >= 0 and
1023            isinstance(dst_handle, (int, long)) and dst_handle > 0):
1024        raise TypeError('cannot unpickle Sender: bad input or missing router')
1025    return Sender(Context(router, context_id), dst_handle)
1026
1027
1028class Receiver(object):
1029    """
1030    Receivers maintain a thread-safe queue of messages sent to a handle of this
1031    context from another context.
1032
1033    :param mitogen.core.Router router:
1034        Router to register the handler on.
1035
1036    :param int handle:
1037        If not :data:`None`, an explicit handle to register, otherwise an
1038        unused handle is chosen.
1039
1040    :param bool persist:
1041        If :data:`False`, unregister the handler after one message is received.
1042        Single-message receivers are intended for RPC-like transactions, such
1043        as in the case of :meth:`mitogen.parent.Context.call_async`.
1044
1045    :param mitogen.core.Context respondent:
1046        Context this receiver is receiving from. If not :data:`None`, arranges
1047        for the receiver to receive a dead message if messages can no longer be
1048        routed to the context due to disconnection, and ignores messages that
1049        did not originate from the respondent context.
1050    """
1051    #: If not :data:`None`, a function invoked as `notify(receiver)` after a
1052    #: message has been received. The function is invoked on :class:`Broker`
1053    #: thread, therefore it must not block. Used by
1054    #: :class:`mitogen.select.Select` to efficiently implement waiting on
1055    #: multiple event sources.
1056    notify = None
1057
1058    raise_channelerror = True
1059
1060    def __init__(self, router, handle=None, persist=True,
1061                 respondent=None, policy=None, overwrite=False):
1062        self.router = router
1063        #: The handle.
1064        self.handle = handle  # Avoid __repr__ crash in add_handler()
1065        self._latch = Latch()  # Must exist prior to .add_handler()
1066        self.handle = router.add_handler(
1067            fn=self._on_receive,
1068            handle=handle,
1069            policy=policy,
1070            persist=persist,
1071            respondent=respondent,
1072            overwrite=overwrite,
1073        )
1074
1075    def __repr__(self):
1076        return 'Receiver(%r, %r)' % (self.router, self.handle)
1077
1078    def __enter__(self):
1079        return self
1080
1081    def __exit__(self, _1, _2, _3):
1082        self.close()
1083
1084    def to_sender(self):
1085        """
1086        Return a :class:`Sender` configured to deliver messages to this
1087        receiver. As senders are serializable, this makes it convenient to pass
1088        `(context_id, handle)` pairs around::
1089
1090            def deliver_monthly_report(sender):
1091                for line in open('monthly_report.txt'):
1092                    sender.send(line)
1093                sender.close()
1094
1095            @mitogen.main()
1096            def main(router):
1097                remote = router.ssh(hostname='mainframe')
1098                recv = mitogen.core.Receiver(router)
1099                remote.call(deliver_monthly_report, recv.to_sender())
1100                for msg in recv:
1101                    print(msg)
1102        """
1103        return Sender(self.router.myself(), self.handle)
1104
1105    def _on_receive(self, msg):
1106        """
1107        Callback registered for the handle with :class:`Router`; appends data
1108        to the internal queue.
1109        """
1110        _vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
1111        self._latch.put(msg)
1112        if self.notify:
1113            self.notify(self)
1114
1115    closed_msg = 'the Receiver has been closed'
1116
1117    def close(self):
1118        """
1119        Unregister the receiver's handle from its associated router, and cause
1120        :class:`ChannelError` to be raised in any thread waiting in :meth:`get`
1121        on this receiver.
1122        """
1123        if self.handle:
1124            self.router.del_handler(self.handle)
1125            self.handle = None
1126        self._latch.close()
1127
1128    def size(self):
1129        """
1130        Return the number of items currently buffered.
1131
1132        As with :class:`Queue.Queue`, `0` may be returned even though a
1133        subsequent call to :meth:`get` will succeed, since a message may be
1134        posted at any moment between :meth:`size` and :meth:`get`.
1135
1136        As with :class:`Queue.Queue`, `>0` may be returned even though a
1137        subsequent call to :meth:`get` will block, since another waiting thread
1138        may be woken at any moment between :meth:`size` and :meth:`get`.
1139
1140        :raises LatchError:
1141            The underlying latch has already been marked closed.
1142        """
1143        return self._latch.size()
1144
1145    def empty(self):
1146        """
1147        Return `size() == 0`.
1148
1149        .. deprecated:: 0.2.8
1150           Use :meth:`size` instead.
1151
1152        :raises LatchError:
1153            The latch has already been marked closed.
1154        """
1155        return self._latch.empty()
1156
1157    def get(self, timeout=None, block=True, throw_dead=True):
1158        """
1159        Sleep waiting for a message to arrive on this receiver.
1160
1161        :param float timeout:
1162            If not :data:`None`, specifies a timeout in seconds.
1163
1164        :raises mitogen.core.ChannelError:
1165            The remote end indicated the channel should be closed,
1166            communication with it was lost, or :meth:`close` was called in the
1167            local process.
1168
1169        :raises mitogen.core.TimeoutError:
1170            Timeout was reached.
1171
1172        :returns:
1173            :class:`Message` that was received.
1174        """
1175        _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
1176        try:
1177            msg = self._latch.get(timeout=timeout, block=block)
1178        except LatchError:
1179            raise ChannelError(self.closed_msg)
1180        if msg.is_dead and throw_dead:
1181            msg._throw_dead()
1182        return msg
1183
1184    def __iter__(self):
1185        """
1186        Yield consecutive :class:`Message` instances delivered to this receiver
1187        until :class:`ChannelError` is raised.
1188        """
1189        while True:
1190            try:
1191                msg = self.get()
1192            except ChannelError:
1193                return
1194            yield msg
1195
1196
1197class Channel(Sender, Receiver):
1198    """
1199    A channel inherits from :class:`mitogen.core.Sender` and
1200    `mitogen.core.Receiver` to provide bidirectional functionality.
1201
1202    .. deprecated:: 0.2.0
1203        This class is incomplete and obsolete, it will be removed in Mitogen
1204        0.3.
1205
1206    Channels were an early attempt at syntax sugar. It is always easier to pass
1207    around unidirectional pairs of senders/receivers, even though the syntax is
1208    baroque:
1209
1210    .. literalinclude:: ../examples/ping_pong.py
1211
1212    Since all handles aren't known until after both ends are constructed, for
1213    both ends to communicate through a channel, it is necessary for one end to
1214    retrieve the handle allocated to the other and reconfigure its own channel
1215    to match. Currently this is a manual task.
1216    """
1217    def __init__(self, router, context, dst_handle, handle=None):
1218        Sender.__init__(self, context, dst_handle)
1219        Receiver.__init__(self, router, handle)
1220
1221    def close(self):
1222        Receiver.close(self)
1223        Sender.close(self)
1224
1225    def __repr__(self):
1226        return 'Channel(%s, %s)' % (
1227            Sender.__repr__(self),
1228            Receiver.__repr__(self)
1229        )
1230
1231
1232class Importer(object):
1233    """
1234    Import protocol implementation that fetches modules from the parent
1235    process.
1236
1237    :param context: Context to communicate via.
1238    """
1239    # The Mitogen package is handled specially, since the child context must
1240    # construct it manually during startup.
1241    MITOGEN_PKG_CONTENT = [
1242        'buildah',
1243        'compat',
1244        'debug',
1245        'doas',
1246        'docker',
1247        'kubectl',
1248        'fakessh',
1249        'fork',
1250        'jail',
1251        'lxc',
1252        'lxd',
1253        'master',
1254        'minify',
1255        'os_fork',
1256        'parent',
1257        'select',
1258        'service',
1259        'setns',
1260        'ssh',
1261        'su',
1262        'sudo',
1263        'utils',
1264    ]
1265
1266    ALWAYS_BLACKLIST = [
1267        # 2.x generates needless imports for 'builtins', while 3.x does the
1268        # same for '__builtin__'. The correct one is built-in, the other always
1269        # a negative round-trip.
1270        'builtins',
1271        '__builtin__',
1272        'thread',
1273
1274        # org.python.core imported by copy, pickle, xml.sax; breaks Jython, but
1275        # very unlikely to trigger a bug report.
1276        'org',
1277    ]
1278
1279    if PY3:
1280        ALWAYS_BLACKLIST += ['cStringIO']
1281
1282    def __init__(self, router, context, core_src, whitelist=(), blacklist=()):
1283        self._log = logging.getLogger('mitogen.importer')
1284        self._context = context
1285        self._present = {'mitogen': self.MITOGEN_PKG_CONTENT}
1286        self._lock = threading.Lock()
1287        self.whitelist = list(whitelist) or ['']
1288        self.blacklist = list(blacklist) + self.ALWAYS_BLACKLIST
1289
1290        # Preserve copies of the original server-supplied whitelist/blacklist
1291        # for later use by children.
1292        self.master_whitelist = self.whitelist[:]
1293        self.master_blacklist = self.blacklist[:]
1294
1295        # Presence of an entry in this map indicates in-flight GET_MODULE.
1296        self._callbacks = {}
1297        self._cache = {}
1298        if core_src:
1299            self._update_linecache('x/mitogen/core.py', core_src)
1300            self._cache['mitogen.core'] = (
1301                'mitogen.core',
1302                None,
1303                'x/mitogen/core.py',
1304                zlib.compress(core_src, 9),
1305                [],
1306            )
1307        self._install_handler(router)
1308
1309    def _update_linecache(self, path, data):
1310        """
1311        The Python 2.4 linecache module, used to fetch source code for
1312        tracebacks and :func:`inspect.getsource`, does not support PEP-302,
1313        meaning it needs extra help to for Mitogen-loaded modules. Directly
1314        populate its cache if a loaded module belongs to the Mitogen package.
1315        """
1316        if PY24 and 'mitogen' in path:
1317            linecache.cache[path] = (
1318                len(data),
1319                0.0,
1320                [line+'\n' for line in data.splitlines()],
1321                path,
1322            )
1323
1324    def _install_handler(self, router):
1325        router.add_handler(
1326            fn=self._on_load_module,
1327            handle=LOAD_MODULE,
1328            policy=has_parent_authority,
1329        )
1330
1331    def __repr__(self):
1332        return 'Importer'
1333
1334    def builtin_find_module(self, fullname):
1335        # imp.find_module() will always succeed for __main__, because it is a
1336        # built-in module. That means it exists on a special linked list deep
1337        # within the bowels of the interpreter. We must special case it.
1338        if fullname == '__main__':
1339            raise ModuleNotFoundError()
1340
1341        parent, _, modname = str_rpartition(fullname, '.')
1342        if parent:
1343            path = sys.modules[parent].__path__
1344        else:
1345            path = None
1346
1347        fp, pathname, description = imp.find_module(modname, path)
1348        if fp:
1349            fp.close()
1350
1351    def find_module(self, fullname, path=None):
1352        if hasattr(_tls, 'running'):
1353            return None
1354
1355        _tls.running = True
1356        try:
1357            #_v and self._log.debug('Python requested %r', fullname)
1358            fullname = to_text(fullname)
1359            pkgname, dot, _ = str_rpartition(fullname, '.')
1360            pkg = sys.modules.get(pkgname)
1361            if pkgname and getattr(pkg, '__loader__', None) is not self:
1362                self._log.debug('%s is submodule of a locally loaded package',
1363                                fullname)
1364                return None
1365
1366            suffix = fullname[len(pkgname+dot):]
1367            if pkgname and suffix not in self._present.get(pkgname, ()):
1368                self._log.debug('%s has no submodule %s', pkgname, suffix)
1369                return None
1370
1371            # #114: explicitly whitelisted prefixes override any
1372            # system-installed package.
1373            if self.whitelist != ['']:
1374                if any(fullname.startswith(s) for s in self.whitelist):
1375                    return self
1376
1377            try:
1378                self.builtin_find_module(fullname)
1379                _vv and self._log.debug('%r is available locally', fullname)
1380            except ImportError:
1381                _vv and self._log.debug('we will try to load %r', fullname)
1382                return self
1383        finally:
1384            del _tls.running
1385
1386    blacklisted_msg = (
1387        '%r is present in the Mitogen importer blacklist, therefore this '
1388        'context will not attempt to request it from the master, as the '
1389        'request will always be refused.'
1390    )
1391    pkg_resources_msg = (
1392        'pkg_resources is prohibited from importing __main__, as it causes '
1393        'problems in applications whose main module is not designed to be '
1394        're-imported by children.'
1395    )
1396    absent_msg = (
1397        'The Mitogen master process was unable to serve %r. It may be a '
1398        'native Python extension, or it may be missing entirely. Check the '
1399        'importer debug logs on the master for more information.'
1400    )
1401
1402    def _refuse_imports(self, fullname):
1403        if is_blacklisted_import(self, fullname):
1404            raise ModuleNotFoundError(self.blacklisted_msg % (fullname,))
1405
1406        f = sys._getframe(2)
1407        requestee = f.f_globals['__name__']
1408
1409        if fullname == '__main__' and requestee == 'pkg_resources':
1410            # Anything that imports pkg_resources will eventually cause
1411            # pkg_resources to try and scan __main__ for its __requires__
1412            # attribute (pkg_resources/__init__.py::_build_master()). This
1413            # breaks any app that is not expecting its __main__ to suddenly be
1414            # sucked over a network and injected into a remote process, like
1415            # py.test.
1416            raise ModuleNotFoundError(self.pkg_resources_msg)
1417
1418        if fullname == 'pbr':
1419            # It claims to use pkg_resources to read version information, which
1420            # would result in PEP-302 being used, but it actually does direct
1421            # filesystem access. So instead smodge the environment to override
1422            # any version that was defined. This will probably break something
1423            # later.
1424            os.environ['PBR_VERSION'] = '0.0.0'
1425
1426    def _on_load_module(self, msg):
1427        if msg.is_dead:
1428            return
1429
1430        tup = msg.unpickle()
1431        fullname = tup[0]
1432        _v and self._log.debug('received %s', fullname)
1433
1434        self._lock.acquire()
1435        try:
1436            self._cache[fullname] = tup
1437            if tup[2] is not None and PY24:
1438                self._update_linecache(
1439                    path='master:' + tup[2],
1440                    data=zlib.decompress(tup[3])
1441                )
1442            callbacks = self._callbacks.pop(fullname, [])
1443        finally:
1444            self._lock.release()
1445
1446        for callback in callbacks:
1447            callback()
1448
1449    def _request_module(self, fullname, callback):
1450        self._lock.acquire()
1451        try:
1452            present = fullname in self._cache
1453            if not present:
1454                funcs = self._callbacks.get(fullname)
1455                if funcs is not None:
1456                    _v and self._log.debug('existing request for %s in flight',
1457                                           fullname)
1458                    funcs.append(callback)
1459                else:
1460                    _v and self._log.debug('sending new %s request to parent',
1461                                           fullname)
1462                    self._callbacks[fullname] = [callback]
1463                    self._context.send(
1464                        Message(data=b(fullname), handle=GET_MODULE)
1465                    )
1466        finally:
1467            self._lock.release()
1468
1469        if present:
1470            callback()
1471
1472    def load_module(self, fullname):
1473        fullname = to_text(fullname)
1474        _v and self._log.debug('requesting %s', fullname)
1475        self._refuse_imports(fullname)
1476
1477        event = threading.Event()
1478        self._request_module(fullname, event.set)
1479        event.wait()
1480
1481        ret = self._cache[fullname]
1482        if ret[2] is None:
1483            raise ModuleNotFoundError(self.absent_msg % (fullname,))
1484
1485        pkg_present = ret[1]
1486        mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
1487        mod.__file__ = self.get_filename(fullname)
1488        mod.__loader__ = self
1489        if pkg_present is not None:  # it's a package.
1490            mod.__path__ = []
1491            mod.__package__ = fullname
1492            self._present[fullname] = pkg_present
1493        else:
1494            mod.__package__ = str_rpartition(fullname, '.')[0] or None
1495
1496        if mod.__package__ and not PY3:
1497            # 2.x requires __package__ to be exactly a string.
1498            mod.__package__, _ = encodings.utf_8.encode(mod.__package__)
1499
1500        source = self.get_source(fullname)
1501        try:
1502            code = compile(source, mod.__file__, 'exec', 0, 1)
1503        except SyntaxError:
1504            LOG.exception('while importing %r', fullname)
1505            raise
1506
1507        if PY3:
1508            exec(code, vars(mod))
1509        else:
1510            exec('exec code in vars(mod)')
1511
1512        # #590: if a module replaces itself in sys.modules during import, below
1513        # is necessary. This matches PyImport_ExecCodeModuleEx()
1514        return sys.modules.get(fullname, mod)
1515
1516    def get_filename(self, fullname):
1517        if fullname in self._cache:
1518            path = self._cache[fullname][2]
1519            if path is None:
1520                # If find_loader() returns self but a subsequent master RPC
1521                # reveals the module can't be loaded, and so load_module()
1522                # throws ImportError, on Python 3.x it is still possible for
1523                # the loader to be called to fetch metadata.
1524                raise ModuleNotFoundError(self.absent_msg % (fullname,))
1525            return u'master:' + self._cache[fullname][2]
1526
1527    def get_source(self, fullname):
1528        if fullname in self._cache:
1529            compressed = self._cache[fullname][3]
1530            if compressed is None:
1531                raise ModuleNotFoundError(self.absent_msg % (fullname,))
1532
1533            source = zlib.decompress(self._cache[fullname][3])
1534            if PY3:
1535                return to_text(source)
1536            return source
1537
1538
1539class LogHandler(logging.Handler):
1540    """
1541    A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG`
1542    messages to be sent to a parent context in response to logging messages
1543    generated by the current context. This is installed by default in child
1544    contexts during bootstrap, so that :mod:`logging` events can be viewed and
1545    managed centrally in the master process.
1546
1547    The handler is initially *corked* after construction, such that it buffers
1548    messages until :meth:`uncork` is called. This allows logging to be
1549    installed prior to communication with the target being available, and
1550    avoids any possible race where early log messages might be dropped.
1551
1552    :param mitogen.core.Context context:
1553        The context to send log messages towards. At present this is always
1554        the master process.
1555    """
1556    def __init__(self, context):
1557        logging.Handler.__init__(self)
1558        self.context = context
1559        self.local = threading.local()
1560        self._buffer = []
1561        # Private synchronization is needed while corked, to ensure no
1562        # concurrent call to _send() exists during uncork().
1563        self._buffer_lock = threading.Lock()
1564
1565    def uncork(self):
1566        """
1567        #305: during startup :class:`LogHandler` may be installed before it is
1568        possible to route messages, therefore messages are buffered until
1569        :meth:`uncork` is called by :class:`ExternalContext`.
1570        """
1571        self._buffer_lock.acquire()
1572        try:
1573            self._send = self.context.send
1574            for msg in self._buffer:
1575                self._send(msg)
1576            self._buffer = None
1577        finally:
1578            self._buffer_lock.release()
1579
1580    def _send(self, msg):
1581        self._buffer_lock.acquire()
1582        try:
1583            if self._buffer is None:
1584                # uncork() may run concurrent to _send()
1585                self._send(msg)
1586            else:
1587                self._buffer.append(msg)
1588        finally:
1589            self._buffer_lock.release()
1590
1591    def emit(self, rec):
1592        """
1593        Send a :data:`FORWARD_LOG` message towards the target context.
1594        """
1595        if rec.name == 'mitogen.io' or \
1596           getattr(self.local, 'in_emit', False):
1597            return
1598
1599        self.local.in_emit = True
1600        try:
1601            msg = self.format(rec)
1602            encoded = '%s\x00%s\x00%s' % (rec.name, rec.levelno, msg)
1603            if isinstance(encoded, UnicodeType):
1604                # Logging package emits both :(
1605                encoded = encoded.encode('utf-8')
1606            self._send(Message(data=encoded, handle=FORWARD_LOG))
1607        finally:
1608            self.local.in_emit = False
1609
1610
1611class Stream(object):
1612    """
1613    A :class:`Stream` is one readable and optionally one writeable file
1614    descriptor (represented by :class:`Side`) aggregated alongside an
1615    associated :class:`Protocol` that knows how to respond to IO readiness
1616    events for those descriptors.
1617
1618    Streams are registered with :class:`Broker`, and callbacks are invoked on
1619    the broker thread in response to IO activity. When registered using
1620    :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker
1621    may call any of :meth:`on_receive`, :meth:`on_transmit`,
1622    :meth:`on_shutdown` or :meth:`on_disconnect`.
1623
1624    It is expected that the :class:`Protocol` associated with a stream will
1625    change over its life. For example during connection setup, the initial
1626    protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to
1627    enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to
1628    the target, before handing off to :class:`MitogenProtocol` when the target
1629    process is initialized.
1630
1631    Streams connecting to children are in turn aggregated by
1632    :class:`mitogen.parent.Connection`, which contains additional logic for
1633    managing any child process, and a reference to any separate ``stderr``
1634    :class:`Stream` connected to that process.
1635    """
1636    #: A :class:`Side` representing the stream's receive file descriptor.
1637    receive_side = None
1638
1639    #: A :class:`Side` representing the stream's transmit file descriptor.
1640    transmit_side = None
1641
1642    #: A :class:`Protocol` representing the protocol active on the stream.
1643    protocol = None
1644
1645    #: In parents, the :class:`mitogen.parent.Connection` instance.
1646    conn = None
1647
1648    #: The stream name. This is used in the :meth:`__repr__` output in any log
1649    #: messages, it may be any descriptive string.
1650    name = u'default'
1651
1652    def set_protocol(self, protocol):
1653        """
1654        Bind a :class:`Protocol` to this stream, by updating
1655        :attr:`Protocol.stream` to refer to this stream, and updating this
1656        stream's :attr:`Stream.protocol` to the refer to the protocol. Any
1657        prior protocol's :attr:`Protocol.stream` is set to :data:`None`.
1658        """
1659        if self.protocol:
1660            self.protocol.stream = None
1661        self.protocol = protocol
1662        self.protocol.stream = self
1663
1664    def accept(self, rfp, wfp):
1665        """
1666        Attach a pair of file objects to :attr:`receive_side` and
1667        :attr:`transmit_side`, after wrapping them in :class:`Side` instances.
1668        :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec`
1669        on the underlying file descriptors during construction.
1670
1671        The same file object may be used for both sides. The default
1672        :meth:`on_disconnect` is handles the possibility that only one
1673        descriptor may need to be closed.
1674
1675        :param file rfp:
1676            The file object to receive from.
1677        :param file wfp:
1678            The file object to transmit to.
1679        """
1680        self.receive_side = Side(self, rfp)
1681        self.transmit_side = Side(self, wfp)
1682
1683    def __repr__(self):
1684        return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,)
1685
1686    def on_receive(self, broker):
1687        """
1688        Invoked by :class:`Broker` when the stream's :attr:`receive_side` has
1689        been marked readable using :meth:`Broker.start_receive` and the broker
1690        has detected the associated file descriptor is ready for reading.
1691
1692        Subclasses must implement this if they are registered using
1693        :meth:`Broker.start_receive`, and the method must invoke
1694        :meth:`on_disconnect` if reading produces an empty string.
1695
1696        The default implementation reads :attr:`Protocol.read_size` bytes and
1697        passes the resulting bytestring to :meth:`Protocol.on_receive`. If the
1698        bytestring is 0 bytes, invokes :meth:`on_disconnect` instead.
1699        """
1700        buf = self.receive_side.read(self.protocol.read_size)
1701        if not buf:
1702            LOG.debug('%r: empty read, disconnecting', self.receive_side)
1703            return self.on_disconnect(broker)
1704
1705        self.protocol.on_receive(broker, buf)
1706
1707    def on_transmit(self, broker):
1708        """
1709        Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has
1710        been marked writeable using :meth:`Broker._start_transmit` and the
1711        broker has detected the associated file descriptor is ready for
1712        writing.
1713
1714        Subclasses must implement they are ever registerd with
1715        :meth:`Broker._start_transmit`.
1716
1717        The default implementation invokes :meth:`Protocol.on_transmit`.
1718        """
1719        self.protocol.on_transmit(broker)
1720
1721    def on_shutdown(self, broker):
1722        """
1723        Invoked by :meth:`Broker.shutdown` to allow the stream time to
1724        gracefully shutdown.
1725
1726        The default implementation emits a ``shutdown`` signal before
1727        invoking :meth:`on_disconnect`.
1728        """
1729        fire(self, 'shutdown')
1730        self.protocol.on_shutdown(broker)
1731
1732    def on_disconnect(self, broker):
1733        """
1734        Invoked by :class:`Broker` to force disconnect the stream during
1735        shutdown, invoked by the default :meth:`on_shutdown` implementation,
1736        and usually invoked by any subclass :meth:`on_receive` implementation
1737        in response to a 0-byte read.
1738
1739        The base implementation fires a ``disconnect`` event, then closes
1740        :attr:`receive_side` and :attr:`transmit_side` after unregistering the
1741        stream from the broker.
1742        """
1743        fire(self, 'disconnect')
1744        self.protocol.on_disconnect(broker)
1745
1746
1747class Protocol(object):
1748    """
1749    Implement the program behaviour associated with activity on a
1750    :class:`Stream`. The protocol in use may vary over a stream's life, for
1751    example to allow :class:`mitogen.parent.BootstrapProtocol` to initialize
1752    the connected child before handing it off to :class:`MitogenProtocol`. A
1753    stream's active protocol is tracked in the :attr:`Stream.protocol`
1754    attribute, and modified via :meth:`Stream.set_protocol`.
1755
1756    Protocols do not handle IO, they are entirely reliant on the interface
1757    provided by :class:`Stream` and :class:`Side`, allowing the underlying IO
1758    implementation to be replaced without modifying behavioural logic.
1759    """
1760    stream_class = Stream
1761
1762    #: The :class:`Stream` this protocol is currently bound to, or
1763    #: :data:`None`.
1764    stream = None
1765
1766    #: The size of the read buffer used by :class:`Stream` when this is the
1767    #: active protocol for the stream.
1768    read_size = CHUNK_SIZE
1769
1770    @classmethod
1771    def build_stream(cls, *args, **kwargs):
1772        stream = cls.stream_class()
1773        stream.set_protocol(cls(*args, **kwargs))
1774        return stream
1775
1776    def __repr__(self):
1777        return '%s(%s)' % (
1778            self.__class__.__name__,
1779            self.stream and self.stream.name,
1780        )
1781
1782    def on_shutdown(self, broker):
1783        _v and LOG.debug('%r: shutting down', self)
1784        self.stream.on_disconnect(broker)
1785
1786    def on_disconnect(self, broker):
1787        # Normally both sides an FD, so it is important that tranmit_side is
1788        # deregistered from Poller before closing the receive side, as pollers
1789        # like epoll and kqueue unregister all events on FD close, causing
1790        # subsequent attempt to unregister the transmit side to fail.
1791        LOG.debug('%r: disconnecting', self)
1792        broker.stop_receive(self.stream)
1793        if self.stream.transmit_side:
1794            broker._stop_transmit(self.stream)
1795
1796        self.stream.receive_side.close()
1797        if self.stream.transmit_side:
1798            self.stream.transmit_side.close()
1799
1800
1801class DelimitedProtocol(Protocol):
1802    """
1803    Provide a :meth:`Protocol.on_receive` implementation for protocols that are
1804    delimited by a fixed string, like text based protocols. Each message is
1805    passed to :meth:`on_line_received` as it arrives, with incomplete messages
1806    passed to :meth:`on_partial_line_received`.
1807
1808    When emulating user input it is often necessary to respond to incomplete
1809    lines, such as when a "Password: " prompt is sent.
1810    :meth:`on_partial_line_received` may be called repeatedly with an
1811    increasingly complete message. When a complete message is finally received,
1812    :meth:`on_line_received` will be called once for it before the buffer is
1813    discarded.
1814
1815    If :func:`on_line_received` returns :data:`False`, remaining data is passed
1816    unprocessed to the stream's current protocol's :meth:`on_receive`. This
1817    allows switching from line-oriented to binary while the input buffer
1818    contains both kinds of data.
1819    """
1820    #: The delimiter. Defaults to newline.
1821    delimiter = b('\n')
1822    _trailer = b('')
1823
1824    def on_receive(self, broker, buf):
1825        _vv and IOLOG.debug('%r.on_receive()', self)
1826        stream = self.stream
1827        self._trailer, cont = mitogen.core.iter_split(
1828            buf=self._trailer + buf,
1829            delim=self.delimiter,
1830            func=self.on_line_received,
1831        )
1832
1833        if self._trailer:
1834            if cont:
1835                self.on_partial_line_received(self._trailer)
1836            else:
1837                assert stream.protocol is not self, \
1838                    'stream protocol is no longer %r' % (self,)
1839                stream.protocol.on_receive(broker, self._trailer)
1840
1841    def on_line_received(self, line):
1842        """
1843        Receive a line from the stream.
1844
1845        :param bytes line:
1846            The encoded line, excluding the delimiter.
1847        :returns:
1848            :data:`False` to indicate this invocation modified the stream's
1849            active protocol, and any remaining buffered data should be passed
1850            to the new protocol's :meth:`on_receive` method.
1851
1852            Any other return value is ignored.
1853        """
1854        pass
1855
1856    def on_partial_line_received(self, line):
1857        """
1858        Receive a trailing unterminated partial line from the stream.
1859
1860        :param bytes line:
1861            The encoded partial line.
1862        """
1863        pass
1864
1865
1866class BufferedWriter(object):
1867    """
1868    Implement buffered output while avoiding quadratic string operations. This
1869    is currently constructed by each protocol, in future it may become fixed
1870    for each stream instead.
1871    """
1872    def __init__(self, broker, protocol):
1873        self._broker = broker
1874        self._protocol = protocol
1875        self._buf = collections.deque()
1876        self._len = 0
1877
1878    def write(self, s):
1879        """
1880        Transmit `s` immediately, falling back to enqueuing it and marking the
1881        stream writeable if no OS buffer space is available.
1882        """
1883        if not self._len:
1884            # Modifying epoll/Kqueue state is expensive, as are needless broker
1885            # loops. Rather than wait for writeability, just write immediately,
1886            # and fall back to the broker loop on error or full buffer.
1887            try:
1888                n = self._protocol.stream.transmit_side.write(s)
1889                if n:
1890                    if n == len(s):
1891                        return
1892                    s = s[n:]
1893            except OSError:
1894                pass
1895
1896            self._broker._start_transmit(self._protocol.stream)
1897        self._buf.append(s)
1898        self._len += len(s)
1899
1900    def on_transmit(self, broker):
1901        """
1902        Respond to stream writeability by retrying previously buffered
1903        :meth:`write` calls.
1904        """
1905        if self._buf:
1906            buf = self._buf.popleft()
1907            written = self._protocol.stream.transmit_side.write(buf)
1908            if not written:
1909                _v and LOG.debug('disconnected during write to %r', self)
1910                self._protocol.stream.on_disconnect(broker)
1911                return
1912            elif written != len(buf):
1913                self._buf.appendleft(BufferType(buf, written))
1914
1915            _vv and IOLOG.debug('transmitted %d bytes to %r', written, self)
1916            self._len -= written
1917
1918        if not self._buf:
1919            broker._stop_transmit(self._protocol.stream)
1920
1921
1922class Side(object):
1923    """
1924    Represent one side of a :class:`Stream`. This allows unidirectional (e.g.
1925    pipe) and bidirectional (e.g. socket) streams to operate identically.
1926
1927    Sides are also responsible for tracking the open/closed state of the
1928    underlying FD, preventing erroneous duplicate calls to :func:`os.close` due
1929    to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk
1930    silently succeeding by closing an unrelated descriptor. For this reason, it
1931    is crucial only one file object exists per unique descriptor.
1932
1933    :param mitogen.core.Stream stream:
1934        The stream this side is associated with.
1935    :param object fp:
1936        The file or socket object managing the underlying file descriptor. Any
1937        object may be used that supports `fileno()` and `close()` methods.
1938    :param bool cloexec:
1939        If :data:`True`, the descriptor has its :data:`fcntl.FD_CLOEXEC` flag
1940        enabled using :func:`fcntl.fcntl`.
1941    :param bool keep_alive:
1942        If :data:`True`, the continued existence of this side will extend the
1943        shutdown grace period until it has been unregistered from the broker.
1944    :param bool blocking:
1945        If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag
1946        enabled using :func:`fcntl.fcntl`.
1947    """
1948    _fork_refs = weakref.WeakValueDictionary()
1949    closed = False
1950
1951    def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False):
1952        #: The :class:`Stream` for which this is a read or write side.
1953        self.stream = stream
1954        # File or socket object responsible for the lifetime of its underlying
1955        # file descriptor.
1956        self.fp = fp
1957        #: Integer file descriptor to perform IO on, or :data:`None` if
1958        #: :meth:`close` has been called. This is saved separately from the
1959        #: file object, since :meth:`file.fileno` cannot be called on it after
1960        #: it has been closed.
1961        self.fd = fp.fileno()
1962        #: If :data:`True`, causes presence of this side in
1963        #: :class:`Broker`'s active reader set to defer shutdown until the
1964        #: side is disconnected.
1965        self.keep_alive = keep_alive
1966        self._fork_refs[id(self)] = self
1967        if cloexec:
1968            set_cloexec(self.fd)
1969        if not blocking:
1970            set_nonblock(self.fd)
1971
1972    def __repr__(self):
1973        return '<Side of %s fd %s>' % (
1974            self.stream.name or repr(self.stream),
1975            self.fd
1976        )
1977
1978    @classmethod
1979    def _on_fork(cls):
1980        while cls._fork_refs:
1981            _, side = cls._fork_refs.popitem()
1982            _vv and IOLOG.debug('Side._on_fork() closing %r', side)
1983            side.close()
1984
1985    def close(self):
1986        """
1987        Call :meth:`file.close` on :attr:`fp` if it is not :data:`None`,
1988        then set it to :data:`None`.
1989        """
1990        _vv and IOLOG.debug('%r.close()', self)
1991        if not self.closed:
1992            self.closed = True
1993            self.fp.close()
1994
1995    def read(self, n=CHUNK_SIZE):
1996        """
1997        Read up to `n` bytes from the file descriptor, wrapping the underlying
1998        :func:`os.read` call with :func:`io_op` to trap common disconnection
1999        conditions.
2000
2001        :meth:`read` always behaves as if it is reading from a regular UNIX
2002        file; socket, pipe, and TTY disconnection errors are masked and result
2003        in a 0-sized read like a regular file.
2004
2005        :returns:
2006            Bytes read, or the empty string to indicate disconnection was
2007            detected.
2008        """
2009        if self.closed:
2010            # Refuse to touch the handle after closed, it may have been reused
2011            # by another thread. TODO: synchronize read()/write()/close().
2012            return b('')
2013        s, disconnected = io_op(os.read, self.fd, n)
2014        if disconnected:
2015            LOG.debug('%r: disconnected during read: %s', self, disconnected)
2016            return b('')
2017        return s
2018
2019    def write(self, s):
2020        """
2021        Write as much of the bytes from `s` as possible to the file descriptor,
2022        wrapping the underlying :func:`os.write` call with :func:`io_op` to
2023        trap common disconnection conditions.
2024
2025        :returns:
2026            Number of bytes written, or :data:`None` if disconnection was
2027            detected.
2028        """
2029        if self.closed:
2030            # Don't touch the handle after close, it may be reused elsewhere.
2031            return None
2032
2033        written, disconnected = io_op(os.write, self.fd, s)
2034        if disconnected:
2035            LOG.debug('%r: disconnected during write: %s', self, disconnected)
2036            return None
2037        return written
2038
2039
2040class MitogenProtocol(Protocol):
2041    """
2042    :class:`Protocol` implementing mitogen's :ref:`stream protocol
2043    <stream-protocol>`.
2044    """
2045    #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and
2046    #: its value is the same as :data:`mitogen.context_id` or appears in
2047    #: :data:`mitogen.parent_ids`.
2048    is_privileged = False
2049
2050    #: Invoked as `on_message(stream, msg)` each message received from the
2051    #: peer.
2052    on_message = None
2053
2054    def __init__(self, router, remote_id, auth_id=None,
2055                 local_id=None, parent_ids=None):
2056        self._router = router
2057        self.remote_id = remote_id
2058        #: If not :data:`None`, :class:`Router` stamps this into
2059        #: :attr:`Message.auth_id` of every message received on this stream.
2060        self.auth_id = auth_id
2061
2062        if parent_ids is None:
2063            parent_ids = mitogen.parent_ids
2064        if local_id is None:
2065            local_id = mitogen.context_id
2066
2067        self.is_privileged = (
2068            (remote_id in parent_ids) or
2069            auth_id in ([local_id] + parent_ids)
2070        )
2071        self.sent_modules = set(['mitogen', 'mitogen.core'])
2072        self._input_buf = collections.deque()
2073        self._input_buf_len = 0
2074        self._writer = BufferedWriter(router.broker, self)
2075
2076        #: Routing records the dst_id of every message arriving from this
2077        #: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID.
2078        self.egress_ids = set()
2079
2080    def on_receive(self, broker, buf):
2081        """
2082        Handle the next complete message on the stream. Raise
2083        :class:`StreamError` on failure.
2084        """
2085        _vv and IOLOG.debug('%r.on_receive()', self)
2086        if self._input_buf and self._input_buf_len < 128:
2087            self._input_buf[0] += buf
2088        else:
2089            self._input_buf.append(buf)
2090
2091        self._input_buf_len += len(buf)
2092        while self._receive_one(broker):
2093            pass
2094
2095    corrupt_msg = (
2096        '%s: Corruption detected: frame signature incorrect. This likely means'
2097        ' some external process is interfering with the connection. Received:'
2098        '\n\n'
2099        '%r'
2100    )
2101
2102    def _receive_one(self, broker):
2103        if self._input_buf_len < Message.HEADER_LEN:
2104            return False
2105
2106        msg = Message()
2107        msg.router = self._router
2108        (magic, msg.dst_id, msg.src_id, msg.auth_id,
2109         msg.handle, msg.reply_to, msg_len) = struct.unpack(
2110            Message.HEADER_FMT,
2111            self._input_buf[0][:Message.HEADER_LEN],
2112        )
2113
2114        if magic != Message.HEADER_MAGIC:
2115            LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048])
2116            self.stream.on_disconnect(broker)
2117            return False
2118
2119        if msg_len > self._router.max_message_size:
2120            LOG.error('%r: Maximum message size exceeded (got %d, max %d)',
2121                      self, msg_len, self._router.max_message_size)
2122            self.stream.on_disconnect(broker)
2123            return False
2124
2125        total_len = msg_len + Message.HEADER_LEN
2126        if self._input_buf_len < total_len:
2127            _vv and IOLOG.debug(
2128                '%r: Input too short (want %d, got %d)',
2129                self, msg_len, self._input_buf_len - Message.HEADER_LEN
2130            )
2131            return False
2132
2133        start = Message.HEADER_LEN
2134        prev_start = start
2135        remain = total_len
2136        bits = []
2137        while remain:
2138            buf = self._input_buf.popleft()
2139            bit = buf[start:remain]
2140            bits.append(bit)
2141            remain -= len(bit) + start
2142            prev_start = start
2143            start = 0
2144
2145        msg.data = b('').join(bits)
2146        self._input_buf.appendleft(buf[prev_start+len(bit):])
2147        self._input_buf_len -= total_len
2148        self._router._async_route(msg, self.stream)
2149        return True
2150
2151    def pending_bytes(self):
2152        """
2153        Return the number of bytes queued for transmission on this stream. This
2154        can be used to limit the amount of data buffered in RAM by an otherwise
2155        unlimited consumer.
2156
2157        For an accurate result, this method should be called from the Broker
2158        thread, for example by using :meth:`Broker.defer_sync`.
2159        """
2160        return self._writer._len
2161
2162    def on_transmit(self, broker):
2163        """
2164        Transmit buffered messages.
2165        """
2166        _vv and IOLOG.debug('%r.on_transmit()', self)
2167        self._writer.on_transmit(broker)
2168
2169    def _send(self, msg):
2170        _vv and IOLOG.debug('%r._send(%r)', self, msg)
2171        self._writer.write(msg.pack())
2172
2173    def send(self, msg):
2174        """
2175        Send `data` to `handle`, and tell the broker we have output. May be
2176        called from any thread.
2177        """
2178        self._router.broker.defer(self._send, msg)
2179
2180    def on_shutdown(self, broker):
2181        """
2182        Disable :class:`Protocol` immediate disconnect behaviour.
2183        """
2184        _v and LOG.debug('%r: shutting down', self)
2185
2186
2187class Context(object):
2188    """
2189    Represent a remote context regardless of the underlying connection method.
2190    Context objects are simple facades that emit messages through an
2191    associated router, and have :ref:`signals` raised against them in response
2192    to various events relating to the context.
2193
2194    **Note:** This is the somewhat limited core version, used by child
2195    contexts. The master subclass is documented below this one.
2196
2197    Contexts maintain no internal state and are thread-safe.
2198
2199    Prefer :meth:`Router.context_by_id` over constructing context objects
2200    explicitly, as that method is deduplicating, and returns the only context
2201    instance :ref:`signals` will be raised on.
2202
2203    :param mitogen.core.Router router:
2204        Router to emit messages through.
2205    :param int context_id:
2206        Context ID.
2207    :param str name:
2208        Context name.
2209    """
2210    name = None
2211    remote_name = None
2212
2213    def __init__(self, router, context_id, name=None):
2214        self.router = router
2215        self.context_id = context_id
2216        if name:
2217            self.name = to_text(name)
2218
2219    def __reduce__(self):
2220        return _unpickle_context, (self.context_id, self.name)
2221
2222    def on_disconnect(self):
2223        _v and LOG.debug('%r: disconnecting', self)
2224        fire(self, 'disconnect')
2225
2226    def send_async(self, msg, persist=False):
2227        """
2228        Arrange for `msg` to be delivered to this context, with replies
2229        directed to a newly constructed receiver. :attr:`dst_id
2230        <Message.dst_id>` is set to the target context ID, and :attr:`reply_to
2231        <Message.reply_to>` is set to the newly constructed receiver's handle.
2232
2233        :param bool persist:
2234            If :data:`False`, the handler will be unregistered after a single
2235            message has been received.
2236
2237        :param mitogen.core.Message msg:
2238            The message.
2239
2240        :returns:
2241            :class:`Receiver` configured to receive any replies sent to the
2242            message's `reply_to` handle.
2243        """
2244        receiver = Receiver(self.router, persist=persist, respondent=self)
2245        msg.dst_id = self.context_id
2246        msg.reply_to = receiver.handle
2247
2248        _v and LOG.debug('sending message to %r: %r', self, msg)
2249        self.send(msg)
2250        return receiver
2251
2252    def call_service_async(self, service_name, method_name, **kwargs):
2253        if isinstance(service_name, BytesType):
2254            service_name = service_name.encode('utf-8')
2255        elif not isinstance(service_name, UnicodeType):
2256            service_name = service_name.name()  # Service.name()
2257        _v and LOG.debug('calling service %s.%s of %r, args: %r',
2258                         service_name, method_name, self, kwargs)
2259        tup = (service_name, to_text(method_name), Kwargs(kwargs))
2260        msg = Message.pickled(tup, handle=CALL_SERVICE)
2261        return self.send_async(msg)
2262
2263    def send(self, msg):
2264        """
2265        Arrange for `msg` to be delivered to this context. :attr:`dst_id
2266        <Message.dst_id>` is set to the target context ID.
2267
2268        :param Message msg:
2269            Message.
2270        """
2271        msg.dst_id = self.context_id
2272        self.router.route(msg)
2273
2274    def call_service(self, service_name, method_name, **kwargs):
2275        recv = self.call_service_async(service_name, method_name, **kwargs)
2276        return recv.get().unpickle()
2277
2278    def send_await(self, msg, deadline=None):
2279        """
2280        Like :meth:`send_async`, but expect a single reply (`persist=False`)
2281        delivered within `deadline` seconds.
2282
2283        :param mitogen.core.Message msg:
2284            The message.
2285        :param float deadline:
2286            If not :data:`None`, seconds before timing out waiting for a reply.
2287        :returns:
2288            Deserialized reply.
2289        :raises TimeoutError:
2290            No message was received and `deadline` passed.
2291        """
2292        receiver = self.send_async(msg)
2293        response = receiver.get(deadline)
2294        data = response.unpickle()
2295        _vv and IOLOG.debug('%r._send_await() -> %r', self, data)
2296        return data
2297
2298    def __repr__(self):
2299        return 'Context(%s, %r)' % (self.context_id, self.name)
2300
2301
2302def _unpickle_context(context_id, name, router=None):
2303    if not (isinstance(context_id, (int, long)) and context_id >= 0 and (
2304        (name is None) or
2305        (isinstance(name, UnicodeType) and len(name) < 100))
2306    ):
2307        raise TypeError('cannot unpickle Context: bad input')
2308
2309    if isinstance(router, Router):
2310        return router.context_by_id(context_id, name=name)
2311    return Context(None, context_id, name)  # For plain Jane pickle.
2312
2313
2314class Poller(object):
2315    """
2316    A poller manages OS file descriptors the user is waiting to become
2317    available for IO. The :meth:`poll` method blocks the calling thread
2318    until one or more become ready. The default implementation is based on
2319    :func:`select.poll`.
2320
2321    Each descriptor has an associated `data` element, which is unique for each
2322    readiness type, and defaults to being the same as the file descriptor. The
2323    :meth:`poll` method yields the data associated with a descriptor, rather
2324    than the descriptor itself, allowing concise loops like::
2325
2326        p = Poller()
2327        p.start_receive(conn.fd, data=conn.on_read)
2328        p.start_transmit(conn.fd, data=conn.on_write)
2329
2330        for callback in p.poll():
2331            callback()  # invoke appropriate bound instance method
2332
2333    Pollers may be modified while :meth:`poll` is yielding results. Removals
2334    are processed immediately, causing pending events for the descriptor to be
2335    discarded.
2336
2337    The :meth:`close` method must be called when a poller is discarded to avoid
2338    a resource leak.
2339
2340    Pollers may only be used by one thread at a time.
2341    """
2342    SUPPORTED = True
2343
2344    # This changed from select() to poll() in Mitogen 0.2.4. Since poll() has
2345    # no upper FD limit, it is suitable for use with Latch, which must handle
2346    # FDs larger than select's limit during many-host runs. We want this
2347    # because poll() requires no setup and teardown: just a single system call,
2348    # which is important because Latch.get() creates a Poller on each
2349    # invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in
2350    # this scenario. If select() must return in future, it is important
2351    # Latch.poller_class is set from parent.py to point to the industrial
2352    # strength poller for the OS, otherwise Latch will fail randomly.
2353
2354    #: Increments on every poll(). Used to version _rfds and _wfds.
2355    _generation = 1
2356
2357    def __init__(self):
2358        self._rfds = {}
2359        self._wfds = {}
2360
2361    def __repr__(self):
2362        return '%s' % (type(self).__name__,)
2363
2364    def _update(self, fd):
2365        """
2366        Required by PollPoller subclass.
2367        """
2368        pass
2369
2370    @property
2371    def readers(self):
2372        """
2373        Return a list of `(fd, data)` tuples for every FD registered for
2374        receive readiness.
2375        """
2376        return list((fd, data) for fd, (data, gen) in self._rfds.items())
2377
2378    @property
2379    def writers(self):
2380        """
2381        Return a list of `(fd, data)` tuples for every FD registered for
2382        transmit readiness.
2383        """
2384        return list((fd, data) for fd, (data, gen) in self._wfds.items())
2385
2386    def close(self):
2387        """
2388        Close any underlying OS resource used by the poller.
2389        """
2390        pass
2391
2392    def start_receive(self, fd, data=None):
2393        """
2394        Cause :meth:`poll` to yield `data` when `fd` is readable.
2395        """
2396        self._rfds[fd] = (data or fd, self._generation)
2397        self._update(fd)
2398
2399    def stop_receive(self, fd):
2400        """
2401        Stop yielding readability events for `fd`.
2402
2403        Redundant calls to :meth:`stop_receive` are silently ignored, this may
2404        change in future.
2405        """
2406        self._rfds.pop(fd, None)
2407        self._update(fd)
2408
2409    def start_transmit(self, fd, data=None):
2410        """
2411        Cause :meth:`poll` to yield `data` when `fd` is writeable.
2412        """
2413        self._wfds[fd] = (data or fd, self._generation)
2414        self._update(fd)
2415
2416    def stop_transmit(self, fd):
2417        """
2418        Stop yielding writeability events for `fd`.
2419
2420        Redundant calls to :meth:`stop_transmit` are silently ignored, this may
2421        change in future.
2422        """
2423        self._wfds.pop(fd, None)
2424        self._update(fd)
2425
2426    def _poll(self, timeout):
2427        (rfds, wfds, _), _ = io_op(select.select,
2428            self._rfds,
2429            self._wfds,
2430            (), timeout
2431        )
2432
2433        for fd in rfds:
2434            _vv and IOLOG.debug('%r: POLLIN for %r', self, fd)
2435            data, gen = self._rfds.get(fd, (None, None))
2436            if gen and gen < self._generation:
2437                yield data
2438
2439        for fd in wfds:
2440            _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd)
2441            data, gen = self._wfds.get(fd, (None, None))
2442            if gen and gen < self._generation:
2443                yield data
2444
2445    def poll(self, timeout=None):
2446        """
2447        Block the calling thread until one or more FDs are ready for IO.
2448
2449        :param float timeout:
2450            If not :data:`None`, seconds to wait without an event before
2451            returning an empty iterable.
2452        :returns:
2453            Iterable of `data` elements associated with ready FDs.
2454        """
2455        _vv and IOLOG.debug('%r.poll(%r)', self, timeout)
2456        self._generation += 1
2457        return self._poll(timeout)
2458
2459
2460class Latch(object):
2461    """
2462    A latch is a :class:`Queue.Queue`-like object that supports mutation and
2463    waiting from multiple threads, however unlike :class:`Queue.Queue`,
2464    waiting threads always remain interruptible, so CTRL+C always succeeds, and
2465    waits where a timeout is set experience no wake up latency. These
2466    properties are not possible in combination using the built-in threading
2467    primitives available in Python 2.x.
2468
2469    Latches implement queues using the UNIX self-pipe trick, and a per-thread
2470    :func:`socket.socketpair` that is lazily created the first time any
2471    latch attempts to sleep on a thread, and dynamically associated with the
2472    waiting Latch only for duration of the wait.
2473
2474    See :ref:`waking-sleeping-threads` for further discussion.
2475    """
2476    #: The :class:`Poller` implementation to use for waiting. Since the poller
2477    #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller`
2478    #: if it is available, or :class:`mitogen.core.Poller` otherwise, since
2479    #: these implementations require no system calls to create, configure or
2480    #: destroy.
2481    poller_class = Poller
2482
2483    #: If not :data:`None`, a function invoked as `notify(latch)` after a
2484    #: successful call to :meth:`put`. The function is invoked on the
2485    #: :meth:`put` caller's thread, which may be the :class:`Broker` thread,
2486    #: therefore it must not block. Used by :class:`mitogen.select.Select` to
2487    #: efficiently implement waiting on multiple event sources.
2488    notify = None
2489
2490    # The _cls_ prefixes here are to make it crystal clear in the code which
2491    # state mutation isn't covered by :attr:`_lock`.
2492
2493    #: List of reusable :func:`socket.socketpair` tuples. The list is mutated
2494    #: from multiple threads, the only safe operations are `append()` and
2495    #: `pop()`.
2496    _cls_idle_socketpairs = []
2497
2498    #: List of every socket object that must be closed by :meth:`_on_fork`.
2499    #: Inherited descriptors cannot be reused, as the duplicated handles
2500    #: reference the same underlying kernel object in use by the parent.
2501    _cls_all_sockets = []
2502
2503    def __init__(self):
2504        self.closed = False
2505        self._lock = threading.Lock()
2506        #: List of unconsumed enqueued items.
2507        self._queue = []
2508        #: List of `(wsock, cookie)` awaiting an element, where `wsock` is the
2509        #: socketpair's write side, and `cookie` is the string to write.
2510        self._sleeping = []
2511        #: Number of elements of :attr:`_sleeping` that have already been
2512        #: woken, and have a corresponding element index from :attr:`_queue`
2513        #: assigned to them.
2514        self._waking = 0
2515
2516    @classmethod
2517    def _on_fork(cls):
2518        """
2519        Clean up any files belonging to the parent process after a fork.
2520        """
2521        cls._cls_idle_socketpairs = []
2522        while cls._cls_all_sockets:
2523            cls._cls_all_sockets.pop().close()
2524
2525    def close(self):
2526        """
2527        Mark the latch as closed, and cause every sleeping thread to be woken,
2528        with :class:`mitogen.core.LatchError` raised in each thread.
2529        """
2530        self._lock.acquire()
2531        try:
2532            self.closed = True
2533            while self._waking < len(self._sleeping):
2534                wsock, cookie = self._sleeping[self._waking]
2535                self._wake(wsock, cookie)
2536                self._waking += 1
2537        finally:
2538            self._lock.release()
2539
2540    def size(self):
2541        """
2542        Return the number of items currently buffered.
2543
2544        As with :class:`Queue.Queue`, `0` may be returned even though a
2545        subsequent call to :meth:`get` will succeed, since a message may be
2546        posted at any moment between :meth:`size` and :meth:`get`.
2547
2548        As with :class:`Queue.Queue`, `>0` may be returned even though a
2549        subsequent call to :meth:`get` will block, since another waiting thread
2550        may be woken at any moment between :meth:`size` and :meth:`get`.
2551
2552        :raises LatchError:
2553            The latch has already been marked closed.
2554        """
2555        self._lock.acquire()
2556        try:
2557            if self.closed:
2558                raise LatchError()
2559            return len(self._queue)
2560        finally:
2561            self._lock.release()
2562
2563    def empty(self):
2564        """
2565        Return `size() == 0`.
2566
2567        .. deprecated:: 0.2.8
2568           Use :meth:`size` instead.
2569
2570        :raises LatchError:
2571            The latch has already been marked closed.
2572        """
2573        return self.size() == 0
2574
2575    def _get_socketpair(self):
2576        """
2577        Return an unused socketpair, creating one if none exist.
2578        """
2579        try:
2580            return self._cls_idle_socketpairs.pop()  # pop() must be atomic
2581        except IndexError:
2582            rsock, wsock = socket.socketpair()
2583            rsock.setblocking(False)
2584            set_cloexec(rsock.fileno())
2585            set_cloexec(wsock.fileno())
2586            self._cls_all_sockets.extend((rsock, wsock))
2587            return rsock, wsock
2588
2589    COOKIE_MAGIC, = struct.unpack('L', b('LTCH') * (struct.calcsize('L')//4))
2590    COOKIE_FMT = '>Qqqq'  # #545: id() and get_ident() may exceed long on armhfp.
2591    COOKIE_SIZE = struct.calcsize(COOKIE_FMT)
2592
2593    def _make_cookie(self):
2594        """
2595        Return a string encoding the ID of the process, instance and thread.
2596        This disambiguates legitimate wake-ups, accidental writes to the FD,
2597        and buggy internal FD sharing.
2598        """
2599        return struct.pack(self.COOKIE_FMT, self.COOKIE_MAGIC,
2600                           os.getpid(), id(self), thread.get_ident())
2601
2602    def get(self, timeout=None, block=True):
2603        """
2604        Return the next enqueued object, or sleep waiting for one.
2605
2606        :param float timeout:
2607            If not :data:`None`, specifies a timeout in seconds.
2608
2609        :param bool block:
2610            If :data:`False`, immediately raise
2611            :class:`mitogen.core.TimeoutError` if the latch is empty.
2612
2613        :raises mitogen.core.LatchError:
2614            :meth:`close` has been called, and the object is no longer valid.
2615
2616        :raises mitogen.core.TimeoutError:
2617            Timeout was reached.
2618
2619        :returns:
2620            The de-queued object.
2621        """
2622        _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
2623                            self, timeout, block)
2624        self._lock.acquire()
2625        try:
2626            if self.closed:
2627                raise LatchError()
2628            i = len(self._sleeping)
2629            if len(self._queue) > i:
2630                _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[i])
2631                return self._queue.pop(i)
2632            if not block:
2633                raise TimeoutError()
2634            rsock, wsock = self._get_socketpair()
2635            cookie = self._make_cookie()
2636            self._sleeping.append((wsock, cookie))
2637        finally:
2638            self._lock.release()
2639
2640        poller = self.poller_class()
2641        poller.start_receive(rsock.fileno())
2642        try:
2643            return self._get_sleep(poller, timeout, block, rsock, wsock, cookie)
2644        finally:
2645            poller.close()
2646
2647    def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie):
2648        """
2649        When a result is not immediately available, sleep waiting for
2650        :meth:`put` to write a byte to our socket pair.
2651        """
2652        _vv and IOLOG.debug(
2653            '%r._get_sleep(timeout=%r, block=%r, fd=%d/%d)',
2654            self, timeout, block, rsock.fileno(), wsock.fileno()
2655        )
2656
2657        e = None
2658        try:
2659            list(poller.poll(timeout))
2660        except Exception:
2661            e = sys.exc_info()[1]
2662
2663        self._lock.acquire()
2664        try:
2665            i = self._sleeping.index((wsock, cookie))
2666            del self._sleeping[i]
2667
2668            try:
2669                got_cookie = rsock.recv(self.COOKIE_SIZE)
2670            except socket.error:
2671                e2 = sys.exc_info()[1]
2672                if e2.args[0] == errno.EAGAIN:
2673                    e = TimeoutError()
2674                else:
2675                    e = e2
2676
2677            self._cls_idle_socketpairs.append((rsock, wsock))
2678            if e:
2679                raise e
2680
2681            assert cookie == got_cookie, (
2682                "Cookie incorrect; got %r, expected %r" \
2683                % (binascii.hexlify(got_cookie),
2684                   binascii.hexlify(cookie))
2685            )
2686            assert i < self._waking, (
2687                "Cookie correct, but no queue element assigned."
2688            )
2689            self._waking -= 1
2690            if self.closed:
2691                raise LatchError()
2692            _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i])
2693            return self._queue.pop(i)
2694        finally:
2695            self._lock.release()
2696
2697    def put(self, obj=None):
2698        """
2699        Enqueue an object, waking the first thread waiting for a result, if one
2700        exists.
2701
2702        :param obj:
2703            Object to enqueue. Defaults to :data:`None` as a convenience when
2704            using :class:`Latch` only for synchronization.
2705        :raises mitogen.core.LatchError:
2706            :meth:`close` has been called, and the object is no longer valid.
2707        """
2708        _vv and IOLOG.debug('%r.put(%r)', self, obj)
2709        self._lock.acquire()
2710        try:
2711            if self.closed:
2712                raise LatchError()
2713            self._queue.append(obj)
2714
2715            wsock = None
2716            if self._waking < len(self._sleeping):
2717                wsock, cookie = self._sleeping[self._waking]
2718                self._waking += 1
2719                _vv and IOLOG.debug('%r.put() -> waking wfd=%r',
2720                                    self, wsock.fileno())
2721            elif self.notify:
2722                self.notify(self)
2723        finally:
2724            self._lock.release()
2725
2726        if wsock:
2727            self._wake(wsock, cookie)
2728
2729    def _wake(self, wsock, cookie):
2730        written, disconnected = io_op(os.write, wsock.fileno(), cookie)
2731        assert written == len(cookie) and not disconnected
2732
2733    def __repr__(self):
2734        return 'Latch(%#x, size=%d, t=%r)' % (
2735            id(self),
2736            len(self._queue),
2737            threading.currentThread().getName(),
2738        )
2739
2740
2741class Waker(Protocol):
2742    """
2743    :class:`Protocol` implementing the `UNIX self-pipe trick`_. Used to wake
2744    :class:`Broker` when another thread needs to modify its state, by enqueing
2745    a function call to run on the :class:`Broker` thread.
2746
2747    .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
2748    """
2749    read_size = 1
2750    broker_ident = None
2751
2752    @classmethod
2753    def build_stream(cls, broker):
2754        stream = super(Waker, cls).build_stream(broker)
2755        stream.accept(*pipe())
2756        return stream
2757
2758    def __init__(self, broker):
2759        self._broker = broker
2760        self._deferred = collections.deque()
2761
2762    def __repr__(self):
2763        return 'Waker(fd=%r/%r)' % (
2764            self.stream.receive_side and self.stream.receive_side.fd,
2765            self.stream.transmit_side and self.stream.transmit_side.fd,
2766        )
2767
2768    @property
2769    def keep_alive(self):
2770        """
2771        Prevent immediate Broker shutdown while deferred functions remain.
2772        """
2773        return len(self._deferred)
2774
2775    def on_receive(self, broker, buf):
2776        """
2777        Drain the pipe and fire callbacks. Since :attr:`_deferred` is
2778        synchronized, :meth:`defer` and :meth:`on_receive` can conspire to
2779        ensure only one byte needs to be pending regardless of queue length.
2780        """
2781        _vv and IOLOG.debug('%r.on_receive()', self)
2782        while True:
2783            try:
2784                func, args, kwargs = self._deferred.popleft()
2785            except IndexError:
2786                return
2787
2788            try:
2789                func(*args, **kwargs)
2790            except Exception:
2791                LOG.exception('defer() crashed: %r(*%r, **%r)',
2792                              func, args, kwargs)
2793                broker.shutdown()
2794
2795    def _wake(self):
2796        """
2797        Wake the multiplexer by writing a byte. If Broker is midway through
2798        teardown, the FD may already be closed, so ignore EBADF.
2799        """
2800        try:
2801            self.stream.transmit_side.write(b(' '))
2802        except OSError:
2803            e = sys.exc_info()[1]
2804            if e.args[0] in (errno.EBADF, errno.EWOULDBLOCK):
2805                raise
2806
2807    broker_shutdown_msg = (
2808        "An attempt was made to enqueue a message with a Broker that has "
2809        "already exitted. It is likely your program called Broker.shutdown() "
2810        "too early."
2811    )
2812
2813    def defer(self, func, *args, **kwargs):
2814        """
2815        Arrange for `func()` to execute on the broker thread. This function
2816        returns immediately without waiting the result of `func()`. Use
2817        :meth:`defer_sync` to block until a result is available.
2818
2819        :raises mitogen.core.Error:
2820            :meth:`defer` was called after :class:`Broker` has begun shutdown.
2821        """
2822        if thread.get_ident() == self.broker_ident:
2823            _vv and IOLOG.debug('%r.defer() [immediate]', self)
2824            return func(*args, **kwargs)
2825        if self._broker._exitted:
2826            raise Error(self.broker_shutdown_msg)
2827
2828        _vv and IOLOG.debug('%r.defer() [fd=%r]', self,
2829                            self.stream.transmit_side.fd)
2830        self._deferred.append((func, args, kwargs))
2831        self._wake()
2832
2833
2834class IoLoggerProtocol(DelimitedProtocol):
2835    """
2836    Attached to one end of a socket pair whose other end overwrites one of the
2837    standard ``stdout`` or ``stderr`` file descriptors in a child context.
2838    Received data is split up into lines, decoded as UTF-8 and logged to the
2839    :mod:`logging` package as either the ``stdout`` or ``stderr`` logger.
2840
2841    Logging in child contexts is in turn forwarded to the master process using
2842    :class:`LogHandler`.
2843    """
2844    @classmethod
2845    def build_stream(cls, name, dest_fd):
2846        """
2847        Even though the file descriptor `dest_fd` will hold the opposite end of
2848        the socket open, we must keep a separate dup() of it (i.e. wsock) in
2849        case some code decides to overwrite `dest_fd` later, which would
2850        prevent break :meth:`on_shutdown` from calling :meth:`shutdown()
2851        <socket.socket.shutdown>` on it.
2852        """
2853        rsock, wsock = socket.socketpair()
2854        os.dup2(wsock.fileno(), dest_fd)
2855        stream = super(IoLoggerProtocol, cls).build_stream(name)
2856        stream.name = name
2857        stream.accept(rsock, wsock)
2858        return stream
2859
2860    def __init__(self, name):
2861        self._log = logging.getLogger(name)
2862        # #453: prevent accidental log initialization in a child creating a
2863        # feedback loop.
2864        self._log.propagate = False
2865        self._log.handlers = logging.getLogger().handlers[:]
2866
2867    def on_shutdown(self, broker):
2868        """
2869        Shut down the write end of the socket, preventing any further writes to
2870        it by this process, or subprocess that inherited it. This allows any
2871        remaining kernel-buffered data to be drained during graceful shutdown
2872        without the buffer continuously refilling due to some out of control
2873        child process.
2874        """
2875        _v and LOG.debug('%r: shutting down', self)
2876        if not IS_WSL:
2877            # #333: WSL generates invalid readiness indication on shutdown().
2878            # This modifies the *kernel object* inherited by children, causing
2879            # EPIPE on subsequent writes to any dupped FD in any process. The
2880            # read side can then drain completely of prior buffered data.
2881            self.stream.transmit_side.fp.shutdown(socket.SHUT_WR)
2882        self.stream.transmit_side.close()
2883
2884    def on_line_received(self, line):
2885        """
2886        Decode the received line as UTF-8 and pass it to the logging framework.
2887        """
2888        self._log.info('%s', line.decode('utf-8', 'replace'))
2889
2890
2891class Router(object):
2892    """
2893    Route messages between contexts, and invoke local handlers for messages
2894    addressed to this context. :meth:`Router.route() <route>` straddles the
2895    :class:`Broker` thread and user threads, it is safe to call anywhere.
2896
2897    **Note:** This is the somewhat limited core version of the Router class
2898    used by child contexts. The master subclass is documented below this one.
2899    """
2900    #: The :class:`mitogen.core.Context` subclass to use when constructing new
2901    #: :class:`Context` objects in :meth:`myself` and :meth:`context_by_id`.
2902    #: Permits :class:`Router` subclasses to extend the :class:`Context`
2903    #: interface, as done in :class:`mitogen.parent.Router`.
2904    context_class = Context
2905
2906    max_message_size = 128 * 1048576
2907
2908    #: When :data:`True`, permit children to only communicate with the current
2909    #: context or a parent of the current context. Routing between siblings or
2910    #: children of parents is prohibited, ensuring no communication is possible
2911    #: between intentionally partitioned networks, such as when a program
2912    #: simultaneously manipulates hosts spread across a corporate and a
2913    #: production network, or production networks that are otherwise
2914    #: air-gapped.
2915    #:
2916    #: Sending a prohibited message causes an error to be logged and a dead
2917    #: message to be sent in reply to the errant message, if that message has
2918    #: ``reply_to`` set.
2919    #:
2920    #: The value of :data:`unidirectional` becomes the default for the
2921    #: :meth:`local() <mitogen.master.Router.local>` `unidirectional`
2922    #: parameter.
2923    unidirectional = False
2924
2925    duplicate_handle_msg = 'cannot register a handle that already exists'
2926    refused_msg = 'refused by policy'
2927    invalid_handle_msg = 'invalid handle'
2928    too_large_msg = 'message too large (max %d bytes)'
2929    respondent_disconnect_msg = 'the respondent Context has disconnected'
2930    broker_exit_msg = 'Broker has exitted'
2931    no_route_msg = 'no route to %r, my ID is %r'
2932    unidirectional_msg = (
2933        'routing mode prevents forward of message from context %d to '
2934        'context %d via context %d'
2935    )
2936
2937    def __init__(self, broker):
2938        self.broker = broker
2939        listen(broker, 'exit', self._on_broker_exit)
2940        self._setup_logging()
2941
2942        self._write_lock = threading.Lock()
2943        #: context ID -> Stream; must hold _write_lock to edit or iterate
2944        self._stream_by_id = {}
2945        #: List of contexts to notify of shutdown; must hold _write_lock
2946        self._context_by_id = {}
2947        self._last_handle = itertools.count(1000)
2948        #: handle -> (persistent?, func(msg))
2949        self._handle_map = {}
2950        #: Context -> set { handle, .. }
2951        self._handles_by_respondent = {}
2952        self.add_handler(self._on_del_route, DEL_ROUTE)
2953
2954    def __repr__(self):
2955        return 'Router(%r)' % (self.broker,)
2956
2957    def _setup_logging(self):
2958        """
2959        This is done in the :class:`Router` constructor for historical reasons.
2960        It must be called before ExternalContext logs its first messages, but
2961        after logging has been setup. It must also be called when any router is
2962        constructed for a consumer app.
2963        """
2964        # Here seems as good a place as any.
2965        global _v, _vv
2966        _v = logging.getLogger().level <= logging.DEBUG
2967        _vv = IOLOG.level <= logging.DEBUG
2968
2969    def _on_del_route(self, msg):
2970        """
2971        Stub :data:`DEL_ROUTE` handler; fires 'disconnect' events on the
2972        corresponding :attr:`_context_by_id` member. This is replaced by
2973        :class:`mitogen.parent.RouteMonitor` in an upgraded context.
2974        """
2975        if msg.is_dead:
2976            return
2977
2978        target_id_s, _, name = bytes_partition(msg.data, b(':'))
2979        target_id = int(target_id_s, 10)
2980        LOG.error('%r: deleting route to %s (%d)',
2981                  self, to_text(name), target_id)
2982        context = self._context_by_id.get(target_id)
2983        if context:
2984            fire(context, 'disconnect')
2985        else:
2986            LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg)
2987
2988    def _on_stream_disconnect(self, stream):
2989        notify = []
2990        self._write_lock.acquire()
2991        try:
2992            for context in list(self._context_by_id.values()):
2993                stream_ = self._stream_by_id.get(context.context_id)
2994                if stream_ is stream:
2995                    del self._stream_by_id[context.context_id]
2996                    notify.append(context)
2997        finally:
2998            self._write_lock.release()
2999
3000        # Happens outside lock as e.g. RouteMonitor wants the same lock.
3001        for context in notify:
3002            context.on_disconnect()
3003
3004    def _on_broker_exit(self):
3005        """
3006        Called prior to broker exit, informs callbacks registered with
3007        :meth:`add_handler` the connection is dead.
3008        """
3009        _v and LOG.debug('%r: broker has exitted', self)
3010        while self._handle_map:
3011            _, (_, func, _, _) = self._handle_map.popitem()
3012            func(Message.dead(self.broker_exit_msg))
3013
3014    def myself(self):
3015        """
3016        Return a :class:`Context` referring to the current process. Since
3017        :class:`Context` is serializable, this is convenient to use in remote
3018        function call parameter lists.
3019        """
3020        return self.context_class(
3021            router=self,
3022            context_id=mitogen.context_id,
3023            name='self',
3024        )
3025
3026    def context_by_id(self, context_id, via_id=None, create=True, name=None):
3027        """
3028        Return or construct a :class:`Context` given its ID. An internal
3029        mapping of ID to the canonical :class:`Context` representing that ID,
3030        so that :ref:`signals` can be raised.
3031
3032        This may be called from any thread, lookup and construction are atomic.
3033
3034        :param int context_id:
3035            The context ID to look up.
3036        :param int via_id:
3037            If the :class:`Context` does not already exist, set its
3038            :attr:`Context.via` to the :class:`Context` matching this ID.
3039        :param bool create:
3040            If the :class:`Context` does not already exist, create it.
3041        :param str name:
3042            If the :class:`Context` does not already exist, set its name.
3043
3044        :returns:
3045            :class:`Context`, or return :data:`None` if `create` is
3046            :data:`False` and no :class:`Context` previously existed.
3047        """
3048        context = self._context_by_id.get(context_id)
3049        if context:
3050            return context
3051
3052        if create and via_id is not None:
3053            via = self.context_by_id(via_id)
3054        else:
3055            via = None
3056
3057        self._write_lock.acquire()
3058        try:
3059            context = self._context_by_id.get(context_id)
3060            if create and not context:
3061                context = self.context_class(self, context_id, name=name)
3062                context.via = via
3063                self._context_by_id[context_id] = context
3064        finally:
3065            self._write_lock.release()
3066
3067        return context
3068
3069    def register(self, context, stream):
3070        """
3071        Register a newly constructed context and its associated stream, and add
3072        the stream's receive side to the I/O multiplexer. This method remains
3073        public while the design has not yet settled.
3074        """
3075        _v and LOG.debug('%s: registering %r to stream %r',
3076                         self, context, stream)
3077        self._write_lock.acquire()
3078        try:
3079            self._stream_by_id[context.context_id] = stream
3080            self._context_by_id[context.context_id] = context
3081        finally:
3082            self._write_lock.release()
3083
3084        self.broker.start_receive(stream)
3085        listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream))
3086
3087    def stream_by_id(self, dst_id):
3088        """
3089        Return the :class:`Stream` that should be used to communicate with
3090        `dst_id`. If a specific route for `dst_id` is not known, a reference to
3091        the parent context's stream is returned. If the parent is disconnected,
3092        or when running in the master context, return :data:`None` instead.
3093
3094        This can be used from any thread, but its output is only meaningful
3095        from the context of the :class:`Broker` thread, as disconnection or
3096        replacement could happen in parallel on the broker thread at any
3097        moment.
3098        """
3099        return (
3100            self._stream_by_id.get(dst_id) or
3101            self._stream_by_id.get(mitogen.parent_id)
3102        )
3103
3104    def del_handler(self, handle):
3105        """
3106        Remove the handle registered for `handle`
3107
3108        :raises KeyError:
3109            The handle wasn't registered.
3110        """
3111        _, _, _, respondent = self._handle_map.pop(handle)
3112        if respondent:
3113            self._handles_by_respondent[respondent].discard(handle)
3114
3115    def add_handler(self, fn, handle=None, persist=True,
3116                    policy=None, respondent=None,
3117                    overwrite=False):
3118        """
3119        Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to
3120        `handle` from this context. Unregister after one invocation if
3121        `persist` is :data:`False`. If `handle` is :data:`None`, a new handle
3122        is allocated and returned.
3123
3124        :param int handle:
3125            If not :data:`None`, an explicit handle to register, usually one of
3126            the ``mitogen.core.*`` constants. If unspecified, a new unused
3127            handle will be allocated.
3128
3129        :param bool persist:
3130            If :data:`False`, the handler will be unregistered after a single
3131            message has been received.
3132
3133        :param mitogen.core.Context respondent:
3134            Context that messages to this handle are expected to be sent from.
3135            If specified, arranges for a dead message to be delivered to `fn`
3136            when disconnection of the context is detected.
3137
3138            In future `respondent` will likely also be used to prevent other
3139            contexts from sending messages to the handle.
3140
3141        :param function policy:
3142            Function invoked as `policy(msg, stream)` where `msg` is a
3143            :class:`mitogen.core.Message` about to be delivered, and `stream`
3144            is the :class:`mitogen.core.Stream` on which it was received. The
3145            function must return :data:`True`, otherwise an error is logged and
3146            delivery is refused.
3147
3148            Two built-in policy functions exist:
3149
3150            * :func:`has_parent_authority`: requires the message arrived from a
3151              parent context, or a context acting with a parent context's
3152              authority (``auth_id``).
3153
3154            * :func:`mitogen.parent.is_immediate_child`: requires the
3155              message arrived from an immediately connected child, for use in
3156              messaging patterns where either something becomes buggy or
3157              insecure by permitting indirect upstream communication.
3158
3159            In case of refusal, and the message's ``reply_to`` field is
3160            nonzero, a :class:`mitogen.core.CallError` is delivered to the
3161            sender indicating refusal occurred.
3162
3163        :param bool overwrite:
3164            If :data:`True`, allow existing handles to be silently overwritten.
3165
3166        :return:
3167            `handle`, or if `handle` was :data:`None`, the newly allocated
3168            handle.
3169        :raises Error:
3170            Attemp to register handle that was already registered.
3171        """
3172        handle = handle or next(self._last_handle)
3173        _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
3174        if handle in self._handle_map and not overwrite:
3175            raise Error(self.duplicate_handle_msg)
3176
3177        self._handle_map[handle] = persist, fn, policy, respondent
3178        if respondent:
3179            if respondent not in self._handles_by_respondent:
3180                self._handles_by_respondent[respondent] = set()
3181                listen(respondent, 'disconnect',
3182                       lambda: self._on_respondent_disconnect(respondent))
3183            self._handles_by_respondent[respondent].add(handle)
3184
3185        return handle
3186
3187    def _on_respondent_disconnect(self, context):
3188        for handle in self._handles_by_respondent.pop(context, ()):
3189            _, fn, _, _  = self._handle_map[handle]
3190            fn(Message.dead(self.respondent_disconnect_msg))
3191            del self._handle_map[handle]
3192
3193    def _maybe_send_dead(self, unreachable, msg, reason, *args):
3194        """
3195        Send a dead message to either the original sender or the intended
3196        recipient of `msg`, if the original sender was expecting a reply
3197        (because its `reply_to` was set), otherwise assume the message is a
3198        reply of some sort, and send the dead message to the original
3199        destination.
3200
3201        :param bool unreachable:
3202            If :data:`True`, the recipient is known to be dead or routing
3203            failed due to a security precaution, so don't attempt to fallback
3204            to sending the dead message to the recipient if the original sender
3205            did not include a reply address.
3206        :param mitogen.core.Message msg:
3207            Message that triggered the dead message.
3208        :param str reason:
3209            Human-readable error reason.
3210        :param tuple args:
3211            Elements to interpolate with `reason`.
3212        """
3213        if args:
3214            reason %= args
3215        LOG.debug('%r: %r is dead: %r', self, msg, reason)
3216        if msg.reply_to and not msg.is_dead:
3217            msg.reply(Message.dead(reason=reason), router=self)
3218        elif not unreachable:
3219            self._async_route(
3220                Message.dead(
3221                    dst_id=msg.dst_id,
3222                    handle=msg.handle,
3223                    reason=reason,
3224                )
3225            )
3226
3227    def _invoke(self, msg, stream):
3228        # IOLOG.debug('%r._invoke(%r)', self, msg)
3229        try:
3230            persist, fn, policy, respondent = self._handle_map[msg.handle]
3231        except KeyError:
3232            self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg)
3233            return
3234
3235        if respondent and not (msg.is_dead or
3236                               msg.src_id == respondent.context_id):
3237            self._maybe_send_dead(True, msg, 'reply from unexpected context')
3238            return
3239
3240        if policy and not policy(msg, stream):
3241            self._maybe_send_dead(True, msg, self.refused_msg)
3242            return
3243
3244        if not persist:
3245            self.del_handler(msg.handle)
3246
3247        try:
3248            fn(msg)
3249        except Exception:
3250            LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
3251
3252    def _async_route(self, msg, in_stream=None):
3253        """
3254        Arrange for `msg` to be forwarded towards its destination. If its
3255        destination is the local context, then arrange for it to be dispatched
3256        using the local handlers.
3257
3258        This is a lower overhead version of :meth:`route` that may only be
3259        called from the :class:`Broker` thread.
3260
3261        :param Stream in_stream:
3262            If not :data:`None`, the stream the message arrived on. Used for
3263            performing source route verification, to ensure sensitive messages
3264            such as ``CALL_FUNCTION`` arrive only from trusted contexts.
3265        """
3266        _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
3267
3268        if len(msg.data) > self.max_message_size:
3269            self._maybe_send_dead(False, msg, self.too_large_msg % (
3270                self.max_message_size,
3271            ))
3272            return
3273
3274        parent_stream = self._stream_by_id.get(mitogen.parent_id)
3275        src_stream = self._stream_by_id.get(msg.src_id, parent_stream)
3276
3277        # When the ingress stream is known, verify the message was received on
3278        # the same as the stream we would expect to receive messages from the
3279        # src_id and auth_id. This is like Reverse Path Filtering in IP, and
3280        # ensures messages from a privileged context cannot be spoofed by a
3281        # child.
3282        if in_stream:
3283            auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream)
3284            if in_stream != auth_stream:
3285                LOG.error('%r: bad auth_id: got %r via %r, not %r: %r',
3286                          self, msg.auth_id, in_stream, auth_stream, msg)
3287                return
3288
3289            if msg.src_id != msg.auth_id and in_stream != src_stream:
3290                LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
3291                          self, msg.src_id, in_stream, src_stream, msg)
3292                return
3293
3294            # If the stream's MitogenProtocol has auth_id set, copy it to the
3295            # message. This allows subtrees to become privileged by stamping a
3296            # parent's context ID. It is used by mitogen.unix to mark client
3297            # streams (like Ansible WorkerProcess) as having the same rights as
3298            # the parent.
3299            if in_stream.protocol.auth_id is not None:
3300                msg.auth_id = in_stream.protocol.auth_id
3301            if in_stream.protocol.on_message is not None:
3302                in_stream.protocol.on_message(in_stream, msg)
3303
3304            # Record the IDs the source ever communicated with.
3305            in_stream.protocol.egress_ids.add(msg.dst_id)
3306
3307        if msg.dst_id == mitogen.context_id:
3308            return self._invoke(msg, in_stream)
3309
3310        out_stream = self._stream_by_id.get(msg.dst_id)
3311        if (not out_stream) and (parent_stream != src_stream or not in_stream):
3312            # No downstream route exists. The message could be from a child or
3313            # ourselves for a parent, in which case we must forward it
3314            # upstream, or it could be from a parent for a dead child, in which
3315            # case its src_id/auth_id would fail verification if returned to
3316            # the parent, so in that case reply with a dead message instead.
3317            out_stream = parent_stream
3318
3319        if out_stream is None:
3320            self._maybe_send_dead(True, msg, self.no_route_msg,
3321                                  msg.dst_id, mitogen.context_id)
3322            return
3323
3324        if in_stream and self.unidirectional and not \
3325                (in_stream.protocol.is_privileged or
3326                 out_stream.protocol.is_privileged):
3327            self._maybe_send_dead(True, msg, self.unidirectional_msg,
3328                                  in_stream.protocol.remote_id,
3329                                  out_stream.protocol.remote_id,
3330                                  mitogen.context_id)
3331            return
3332
3333        out_stream.protocol._send(msg)
3334
3335    def route(self, msg):
3336        """
3337        Arrange for the :class:`Message` `msg` to be delivered to its
3338        destination using any relevant downstream context, or if none is found,
3339        by forwarding the message upstream towards the master context. If `msg`
3340        is destined for the local context, it is dispatched using the handles
3341        registered with :meth:`add_handler`.
3342
3343        This may be called from any thread.
3344        """
3345        self.broker.defer(self._async_route, msg)
3346
3347
3348class NullTimerList(object):
3349    def get_timeout(self):
3350        return None
3351
3352
3353class Broker(object):
3354    """
3355    Responsible for handling I/O multiplexing in a private thread.
3356
3357    **Note:** This somewhat limited core version is used by children. The
3358    master subclass is documented below.
3359    """
3360    poller_class = Poller
3361    _waker = None
3362    _thread = None
3363
3364    # :func:`mitogen.parent._upgrade_broker` replaces this with
3365    # :class:`mitogen.parent.TimerList` during upgrade.
3366    timers = NullTimerList()
3367
3368    #: Seconds grace to allow :class:`streams <Stream>` to shutdown gracefully
3369    #: before force-disconnecting them during :meth:`shutdown`.
3370    shutdown_timeout = 3.0
3371
3372    def __init__(self, poller_class=None, activate_compat=True):
3373        self._alive = True
3374        self._exitted = False
3375        self._waker = Waker.build_stream(self)
3376        #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker
3377        #: thread, or immediately if the current thread is the broker thread.
3378        #: Safe to call from any thread.
3379        self.defer = self._waker.protocol.defer
3380        self.poller = self.poller_class()
3381        self.poller.start_receive(
3382            self._waker.receive_side.fd,
3383            (self._waker.receive_side, self._waker.on_receive)
3384        )
3385        self._thread = threading.Thread(
3386            target=self._broker_main,
3387            name='mitogen.broker'
3388        )
3389        self._thread.start()
3390        if activate_compat:
3391            self._py24_25_compat()
3392
3393    def _py24_25_compat(self):
3394        """
3395        Python 2.4/2.5 have grave difficulties with threads/fork. We
3396        mandatorily quiesce all running threads during fork using a
3397        monkey-patch there.
3398        """
3399        if sys.version_info < (2, 6):
3400            # import_module() is used to avoid dep scanner.
3401            os_fork = import_module('mitogen.os_fork')
3402            os_fork._notice_broker_or_pool(self)
3403
3404    def start_receive(self, stream):
3405        """
3406        Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as
3407        ready for reading. Safe to call from any thread. When the associated
3408        file descriptor becomes ready for reading,
3409        :meth:`BasicStream.on_receive` will be called.
3410        """
3411        _vv and IOLOG.debug('%r.start_receive(%r)', self, stream)
3412        side = stream.receive_side
3413        assert side and not side.closed
3414        self.defer(self.poller.start_receive,
3415                   side.fd, (side, stream.on_receive))
3416
3417    def stop_receive(self, stream):
3418        """
3419        Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as not
3420        ready for reading. Safe to call from any thread.
3421        """
3422        _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream)
3423        self.defer(self.poller.stop_receive, stream.receive_side.fd)
3424
3425    def _start_transmit(self, stream):
3426        """
3427        Mark the :attr:`transmit_side <Stream.transmit_side>` on `stream` as
3428        ready for writing. Must only be called from the Broker thread. When the
3429        associated file descriptor becomes ready for writing,
3430        :meth:`BasicStream.on_transmit` will be called.
3431        """
3432        _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream)
3433        side = stream.transmit_side
3434        assert side and not side.closed
3435        self.poller.start_transmit(side.fd, (side, stream.on_transmit))
3436
3437    def _stop_transmit(self, stream):
3438        """
3439        Mark the :attr:`transmit_side <Stream.receive_side>` on `stream` as not
3440        ready for writing.
3441        """
3442        _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream)
3443        self.poller.stop_transmit(stream.transmit_side.fd)
3444
3445    def keep_alive(self):
3446        """
3447        Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute
3448        is :data:`True`, or any :class:`Context` is still registered that is
3449        not the master. Used to delay shutdown while some important work is in
3450        progress (e.g. log draining).
3451        """
3452        it = (side.keep_alive for (_, (side, _)) in self.poller.readers)
3453        return sum(it, 0) > 0 or self.timers.get_timeout() is not None
3454
3455    def defer_sync(self, func):
3456        """
3457        Arrange for `func()` to execute on :class:`Broker` thread, blocking the
3458        current thread until a result or exception is available.
3459
3460        :returns:
3461            Return value of `func()`.
3462        """
3463        latch = Latch()
3464        def wrapper():
3465            try:
3466                latch.put(func())
3467            except Exception:
3468                latch.put(sys.exc_info()[1])
3469        self.defer(wrapper)
3470        res = latch.get()
3471        if isinstance(res, Exception):
3472            raise res
3473        return res
3474
3475    def _call(self, stream, func):
3476        """
3477        Call `func(self)`, catching any exception that might occur, logging it,
3478        and force-disconnecting the related `stream`.
3479        """
3480        try:
3481            func(self)
3482        except Exception:
3483            LOG.exception('%r crashed', stream)
3484            stream.on_disconnect(self)
3485
3486    def _loop_once(self, timeout=None):
3487        """
3488        Execute a single :class:`Poller` wait, dispatching any IO events that
3489        caused the wait to complete.
3490
3491        :param float timeout:
3492            If not :data:`None`, maximum time in seconds to wait for events.
3493        """
3494        _vv and IOLOG.debug('%r._loop_once(%r, %r)',
3495                            self, timeout, self.poller)
3496
3497        timer_to = self.timers.get_timeout()
3498        if timeout is None:
3499            timeout = timer_to
3500        elif timer_to is not None and timer_to < timeout:
3501            timeout = timer_to
3502
3503        #IOLOG.debug('readers =\n%s', pformat(self.poller.readers))
3504        #IOLOG.debug('writers =\n%s', pformat(self.poller.writers))
3505        for side, func in self.poller.poll(timeout):
3506            self._call(side.stream, func)
3507        if timer_to is not None:
3508            self.timers.expire()
3509
3510    def _broker_exit(self):
3511        """
3512        Forcefully call :meth:`Stream.on_disconnect` on any streams that failed
3513        to shut down gracefully, then discard the :class:`Poller`.
3514        """
3515        for _, (side, _) in self.poller.readers + self.poller.writers:
3516            LOG.debug('%r: force disconnecting %r', self, side)
3517            side.stream.on_disconnect(self)
3518
3519        self.poller.close()
3520
3521    def _broker_shutdown(self):
3522        """
3523        Invoke :meth:`Stream.on_shutdown` for every active stream, then allow
3524        up to :attr:`shutdown_timeout` seconds for the streams to unregister
3525        themselves, logging an error if any did not unregister during the grace
3526        period.
3527        """
3528        for _, (side, _) in self.poller.readers + self.poller.writers:
3529            self._call(side.stream, side.stream.on_shutdown)
3530
3531        deadline = now() + self.shutdown_timeout
3532        while self.keep_alive() and now() < deadline:
3533            self._loop_once(max(0, deadline - now()))
3534
3535        if self.keep_alive():
3536            LOG.error('%r: pending work still existed %d seconds after '
3537                      'shutdown began. This may be due to a timer that is yet '
3538                      'to expire, or a child connection that did not fully '
3539                      'shut down.', self, self.shutdown_timeout)
3540
3541    def _do_broker_main(self):
3542        """
3543        Broker thread main function. Dispatches IO events until
3544        :meth:`shutdown` is called.
3545        """
3546        # For Python 2.4, no way to retrieve ident except on thread.
3547        self._waker.protocol.broker_ident = thread.get_ident()
3548        try:
3549            while self._alive:
3550                self._loop_once()
3551
3552            fire(self, 'before_shutdown')
3553            fire(self, 'shutdown')
3554            self._broker_shutdown()
3555        except Exception:
3556            e = sys.exc_info()[1]
3557            LOG.exception('broker crashed')
3558            syslog.syslog(syslog.LOG_ERR, 'broker crashed: %s' % (e,))
3559            syslog.closelog()  # prevent test 'fd leak'.
3560
3561        self._alive = False  # Ensure _alive is consistent on crash.
3562        self._exitted = True
3563        self._broker_exit()
3564
3565    def _broker_main(self):
3566        try:
3567            _profile_hook('mitogen.broker', self._do_broker_main)
3568        finally:
3569            # 'finally' to ensure _on_broker_exit() can always SIGTERM.
3570            fire(self, 'exit')
3571
3572    def shutdown(self):
3573        """
3574        Request broker gracefully disconnect streams and stop. Safe to call
3575        from any thread.
3576        """
3577        _v and LOG.debug('%r: shutting down', self)
3578        def _shutdown():
3579            self._alive = False
3580        if self._alive and not self._exitted:
3581            self.defer(_shutdown)
3582
3583    def join(self):
3584        """
3585        Wait for the broker to stop, expected to be called after
3586        :meth:`shutdown`.
3587        """
3588        self._thread.join()
3589
3590    def __repr__(self):
3591        return 'Broker(%04x)' % (id(self) & 0xffff,)
3592
3593
3594class Dispatcher(object):
3595    """
3596    Implementation of the :data:`CALL_FUNCTION` handle for a child context.
3597    Listens on the child's main thread for messages sent by
3598    :class:`mitogen.parent.CallChain` and dispatches the function calls they
3599    describe.
3600
3601    If a :class:`mitogen.parent.CallChain` sending a message is in pipelined
3602    mode, any exception that occurs is recorded, and causes all subsequent
3603    calls with the same `chain_id` to fail with the same exception.
3604    """
3605    _service_recv = None
3606
3607    def __repr__(self):
3608        return 'Dispatcher'
3609
3610    def __init__(self, econtext):
3611        self.econtext = econtext
3612        #: Chain ID -> CallError if prior call failed.
3613        self._error_by_chain_id = {}
3614        self.recv = Receiver(
3615            router=econtext.router,
3616            handle=CALL_FUNCTION,
3617            policy=has_parent_authority,
3618        )
3619        #: The :data:`CALL_SERVICE` :class:`Receiver` that will eventually be
3620        #: reused by :class:`mitogen.service.Pool`, should it ever be loaded.
3621        #: This is necessary for race-free reception of all service requests
3622        #: delivered regardless of whether the stub or real service pool are
3623        #: loaded. See #547 for related sorrows.
3624        Dispatcher._service_recv = Receiver(
3625            router=econtext.router,
3626            handle=CALL_SERVICE,
3627            policy=has_parent_authority,
3628        )
3629        self._service_recv.notify = self._on_call_service
3630        listen(econtext.broker, 'shutdown', self._on_broker_shutdown)
3631
3632    def _on_broker_shutdown(self):
3633        if self._service_recv.notify == self._on_call_service:
3634            self._service_recv.notify = None
3635        self.recv.close()
3636
3637
3638    @classmethod
3639    @takes_econtext
3640    def forget_chain(cls, chain_id, econtext):
3641        econtext.dispatcher._error_by_chain_id.pop(chain_id, None)
3642
3643    def _parse_request(self, msg):
3644        data = msg.unpickle(throw=False)
3645        _v and LOG.debug('%r: dispatching %r', self, data)
3646
3647        chain_id, modname, klass, func, args, kwargs = data
3648        obj = import_module(modname)
3649        if klass:
3650            obj = getattr(obj, klass)
3651        fn = getattr(obj, func)
3652        if getattr(fn, 'mitogen_takes_econtext', None):
3653            kwargs.setdefault('econtext', self.econtext)
3654        if getattr(fn, 'mitogen_takes_router', None):
3655            kwargs.setdefault('router', self.econtext.router)
3656
3657        return chain_id, fn, args, kwargs
3658
3659    def _dispatch_one(self, msg):
3660        try:
3661            chain_id, fn, args, kwargs = self._parse_request(msg)
3662        except Exception:
3663            return None, CallError(sys.exc_info()[1])
3664
3665        if chain_id in self._error_by_chain_id:
3666            return chain_id, self._error_by_chain_id[chain_id]
3667
3668        try:
3669            return chain_id, fn(*args, **kwargs)
3670        except Exception:
3671            e = CallError(sys.exc_info()[1])
3672            if chain_id is not None:
3673                self._error_by_chain_id[chain_id] = e
3674            return chain_id, e
3675
3676    def _on_call_service(self, recv):
3677        """
3678        Notifier for the :data:`CALL_SERVICE` receiver. This is called on the
3679        :class:`Broker` thread for any service messages arriving at this
3680        context, for as long as no real service pool implementation is loaded.
3681
3682        In order to safely bootstrap the service pool implementation a sentinel
3683        message is enqueued on the :data:`CALL_FUNCTION` receiver in order to
3684        wake the main thread, where the importer can run without any
3685        possibility of suffering deadlock due to concurrent uses of the
3686        importer.
3687
3688        Should the main thread be blocked indefinitely, preventing the import
3689        from ever running, if it is blocked waiting on a service call, then it
3690        means :mod:`mitogen.service` has already been imported and
3691        :func:`mitogen.service.get_or_create_pool` has already run, meaning the
3692        service pool is already active and the duplicate initialization was not
3693        needed anyway.
3694
3695        #547: This trickery is needed to avoid the alternate option of spinning
3696        a temporary thread to import the service pool, which could deadlock if
3697        a custom import hook executing on the main thread (under the importer
3698        lock) would block waiting for some data that was in turn received by a
3699        service. Main thread import lock can't be released until service is
3700        running, service cannot satisfy request until import lock is released.
3701        """
3702        self.recv._on_receive(Message(handle=STUB_CALL_SERVICE))
3703
3704    def _init_service_pool(self):
3705        import mitogen.service
3706        mitogen.service.get_or_create_pool(router=self.econtext.router)
3707
3708    def _dispatch_calls(self):
3709        for msg in self.recv:
3710            if msg.handle == STUB_CALL_SERVICE:
3711                if msg.src_id == mitogen.context_id:
3712                    self._init_service_pool()
3713                continue
3714
3715            chain_id, ret = self._dispatch_one(msg)
3716            _v and LOG.debug('%r: %r -> %r', self, msg, ret)
3717            if msg.reply_to:
3718                msg.reply(ret)
3719            elif isinstance(ret, CallError) and chain_id is None:
3720                LOG.error('No-reply function call failed: %s', ret)
3721
3722    def run(self):
3723        if self.econtext.config.get('on_start'):
3724            self.econtext.config['on_start'](self.econtext)
3725
3726        _profile_hook('mitogen.child_main', self._dispatch_calls)
3727
3728
3729class ExternalContext(object):
3730    """
3731    External context implementation.
3732
3733    This class contains the main program implementation for new children. It is
3734    responsible for setting up everything about the process environment, import
3735    hooks, standard IO redirection, logging, configuring a :class:`Router` and
3736    :class:`Broker`, and finally arranging for :class:`Dispatcher` to take over
3737    the main thread after initialization is complete.
3738
3739    .. attribute:: broker
3740
3741        The :class:`mitogen.core.Broker` instance.
3742
3743    .. attribute:: context
3744
3745        The :class:`mitogen.core.Context` instance.
3746
3747    .. attribute:: channel
3748
3749        The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION`
3750        requests are received.
3751
3752    .. attribute:: importer
3753
3754        The :class:`mitogen.core.Importer` instance.
3755
3756    .. attribute:: stdout_log
3757
3758        The :class:`IoLogger` connected to :data:`sys.stdout`.
3759
3760    .. attribute:: stderr_log
3761
3762        The :class:`IoLogger` connected to :data:`sys.stderr`.
3763    """
3764    detached = False
3765
3766    def __init__(self, config):
3767        self.config = config
3768
3769    def _on_broker_exit(self):
3770        if not self.config['profiling']:
3771            os.kill(os.getpid(), signal.SIGTERM)
3772
3773    def _on_shutdown_msg(self, msg):
3774        if not msg.is_dead:
3775            _v and LOG.debug('shutdown request from context %d', msg.src_id)
3776            self.broker.shutdown()
3777
3778    def _on_parent_disconnect(self):
3779        if self.detached:
3780            mitogen.parent_ids = []
3781            mitogen.parent_id = None
3782            LOG.info('Detachment complete')
3783        else:
3784            _v and LOG.debug('parent stream is gone, dying.')
3785            self.broker.shutdown()
3786
3787    def detach(self):
3788        self.detached = True
3789        stream = self.router.stream_by_id(mitogen.parent_id)
3790        if stream:  # not double-detach()'d
3791            os.setsid()
3792            self.parent.send_await(Message(handle=DETACHING))
3793            LOG.info('Detaching from %r; parent is %s', stream, self.parent)
3794            for x in range(20):
3795                pending = self.broker.defer_sync(stream.protocol.pending_bytes)
3796                if not pending:
3797                    break
3798                time.sleep(0.05)
3799            if pending:
3800                LOG.error('Stream had %d bytes after 2000ms', pending)
3801            self.broker.defer(stream.on_disconnect, self.broker)
3802
3803    def _setup_master(self):
3804        Router.max_message_size = self.config['max_message_size']
3805        if self.config['profiling']:
3806            enable_profiling()
3807        self.broker = Broker(activate_compat=False)
3808        self.router = Router(self.broker)
3809        self.router.debug = self.config.get('debug', False)
3810        self.router.unidirectional = self.config['unidirectional']
3811        self.router.add_handler(
3812            fn=self._on_shutdown_msg,
3813            handle=SHUTDOWN,
3814            policy=has_parent_authority,
3815        )
3816        self.master = Context(self.router, 0, 'master')
3817        parent_id = self.config['parent_ids'][0]
3818        if parent_id == 0:
3819            self.parent = self.master
3820        else:
3821            self.parent = Context(self.router, parent_id, 'parent')
3822
3823        in_fd = self.config.get('in_fd', 100)
3824        in_fp = os.fdopen(os.dup(in_fd), 'rb', 0)
3825        os.close(in_fd)
3826
3827        out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0)
3828        self.stream = MitogenProtocol.build_stream(
3829            self.router,
3830            parent_id,
3831            local_id=self.config['context_id'],
3832            parent_ids=self.config['parent_ids']
3833        )
3834        self.stream.accept(in_fp, out_fp)
3835        self.stream.name = 'parent'
3836        self.stream.receive_side.keep_alive = False
3837
3838        listen(self.stream, 'disconnect', self._on_parent_disconnect)
3839        listen(self.broker, 'exit', self._on_broker_exit)
3840
3841    def _reap_first_stage(self):
3842        try:
3843            os.wait()  # Reap first stage.
3844        except OSError:
3845            pass  # No first stage exists (e.g. fakessh)
3846
3847    def _setup_logging(self):
3848        self.log_handler = LogHandler(self.master)
3849        root = logging.getLogger()
3850        root.setLevel(self.config['log_level'])
3851        root.handlers = [self.log_handler]
3852        if self.config['debug']:
3853            enable_debug_logging()
3854
3855    def _setup_importer(self):
3856        importer = self.config.get('importer')
3857        if importer:
3858            importer._install_handler(self.router)
3859            importer._context = self.parent
3860        else:
3861            core_src_fd = self.config.get('core_src_fd', 101)
3862            if core_src_fd:
3863                fp = os.fdopen(core_src_fd, 'rb', 1)
3864                try:
3865                    core_src = fp.read()
3866                    # Strip "ExternalContext.main()" call from last line.
3867                    core_src = b('\n').join(core_src.splitlines()[:-1])
3868                finally:
3869                    fp.close()
3870            else:
3871                core_src = None
3872
3873            importer = Importer(
3874                self.router,
3875                self.parent,
3876                core_src,
3877                self.config.get('whitelist', ()),
3878                self.config.get('blacklist', ()),
3879            )
3880
3881        self.importer = importer
3882        self.router.importer = importer
3883        sys.meta_path.insert(0, self.importer)
3884
3885    def _setup_package(self):
3886        global mitogen
3887        mitogen = imp.new_module('mitogen')
3888        mitogen.__package__ = 'mitogen'
3889        mitogen.__path__ = []
3890        mitogen.__loader__ = self.importer
3891        mitogen.main = lambda *args, **kwargs: (lambda func: None)
3892        mitogen.core = sys.modules['__main__']
3893        mitogen.core.__file__ = 'x/mitogen/core.py'  # For inspect.getsource()
3894        mitogen.core.__loader__ = self.importer
3895        sys.modules['mitogen'] = mitogen
3896        sys.modules['mitogen.core'] = mitogen.core
3897        del sys.modules['__main__']
3898
3899    def _setup_globals(self):
3900        mitogen.is_master = False
3901        mitogen.__version__ = self.config['version']
3902        mitogen.context_id = self.config['context_id']
3903        mitogen.parent_ids = self.config['parent_ids'][:]
3904        mitogen.parent_id = mitogen.parent_ids[0]
3905
3906    def _nullify_stdio(self):
3907        """
3908        Open /dev/null to replace stdio temporarily. In case of odd startup,
3909        assume we may be allocated a standard handle.
3910        """
3911        for stdfd, mode in ((0, os.O_RDONLY), (1, os.O_RDWR), (2, os.O_RDWR)):
3912            fd = os.open('/dev/null', mode)
3913            if fd != stdfd:
3914                os.dup2(fd, stdfd)
3915                os.close(fd)
3916
3917    def _preserve_tty_fp(self):
3918        """
3919        #481: when stderr is a TTY due to being started via tty_create_child()
3920        or hybrid_tty_create_child(), and some privilege escalation tool like
3921        prehistoric versions of sudo exec this process over the top of itself,
3922        there is nothing left to keep the slave PTY open after we replace our
3923        stdio. Therefore if stderr is a TTY, keep around a permanent dup() to
3924        avoid receiving SIGHUP.
3925        """
3926        try:
3927            if os.isatty(2):
3928                self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0)
3929                set_cloexec(self.reserve_tty_fp.fileno())
3930        except OSError:
3931            pass
3932
3933    def _setup_stdio(self):
3934        self._preserve_tty_fp()
3935        # When sys.stdout was opened by the runtime, overwriting it will not
3936        # close FD 1. However when forking from a child that previously used
3937        # fdopen(), overwriting it /will/ close FD 1. So we must swallow the
3938        # close before IoLogger overwrites FD 1, otherwise its new FD 1 will be
3939        # clobbered. Additionally, stdout must be replaced with /dev/null prior
3940        # to stdout.close(), since if block buffering was active in the parent,
3941        # any pre-fork buffered data will be flushed on close(), corrupting the
3942        # connection to the parent.
3943        self._nullify_stdio()
3944        sys.stdout.close()
3945        self._nullify_stdio()
3946
3947        self.loggers = []
3948        for name, fd in (('stdout', 1), ('stderr', 2)):
3949            log = IoLoggerProtocol.build_stream(name, fd)
3950            self.broker.start_receive(log)
3951            self.loggers.append(log)
3952
3953        # Reopen with line buffering.
3954        sys.stdout = os.fdopen(1, 'w', 1)
3955
3956    def main(self):
3957        self._setup_master()
3958        try:
3959            try:
3960                self._setup_logging()
3961                self._setup_importer()
3962                self._reap_first_stage()
3963                if self.config.get('setup_package', True):
3964                    self._setup_package()
3965                self._setup_globals()
3966                if self.config.get('setup_stdio', True):
3967                    self._setup_stdio()
3968
3969                self.dispatcher = Dispatcher(self)
3970                self.router.register(self.parent, self.stream)
3971                self.router._setup_logging()
3972
3973                _v and LOG.debug('Python version is %s', sys.version)
3974                _v and LOG.debug('Parent is context %r (%s); my ID is %r',
3975                                 self.parent.context_id, self.parent.name,
3976                                 mitogen.context_id)
3977                _v and LOG.debug('pid:%r ppid:%r uid:%r/%r, gid:%r/%r host:%r',
3978                                 os.getpid(), os.getppid(), os.geteuid(),
3979                                 os.getuid(), os.getegid(), os.getgid(),
3980                                 socket.gethostname())
3981
3982                sys.executable = os.environ.pop('ARGV0', sys.executable)
3983                _v and LOG.debug('Recovered sys.executable: %r', sys.executable)
3984
3985                if self.config.get('send_ec2', True):
3986                    self.stream.transmit_side.write(b('MITO002\n'))
3987                self.broker._py24_25_compat()
3988                self.log_handler.uncork()
3989                self.dispatcher.run()
3990                _v and LOG.debug('ExternalContext.main() normal exit')
3991            except KeyboardInterrupt:
3992                LOG.debug('KeyboardInterrupt received, exiting gracefully.')
3993            except BaseException:
3994                LOG.exception('ExternalContext.main() crashed')
3995                raise
3996        finally:
3997            self.broker.shutdown()
3998