1# Copyright 2019, David Wilson 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are met: 5# 6# 1. Redistributions of source code must retain the above copyright notice, 7# this list of conditions and the following disclaimer. 8# 9# 2. Redistributions in binary form must reproduce the above copyright notice, 10# this list of conditions and the following disclaimer in the documentation 11# and/or other materials provided with the distribution. 12# 13# 3. Neither the name of the copyright holder nor the names of its contributors 14# may be used to endorse or promote products derived from this software without 15# specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28 29# !mitogen: minify_safe 30 31""" 32This module implements most package functionality, but remains separate from 33non-essential code in order to reduce its size, since it is also serves as the 34bootstrap implementation sent to every new slave context. 35""" 36 37import binascii 38import collections 39import encodings.latin_1 40import encodings.utf_8 41import errno 42import fcntl 43import itertools 44import linecache 45import logging 46import os 47import pickle as py_pickle 48import pstats 49import signal 50import socket 51import struct 52import sys 53import syslog 54import threading 55import time 56import traceback 57import warnings 58import weakref 59import zlib 60 61# Python >3.7 deprecated the imp module. 62warnings.filterwarnings('ignore', message='the imp module is deprecated') 63import imp 64 65# Absolute imports for <2.5. 66select = __import__('select') 67 68try: 69 import cProfile 70except ImportError: 71 cProfile = None 72 73try: 74 import thread 75except ImportError: 76 import threading as thread 77 78try: 79 import cPickle as pickle 80except ImportError: 81 import pickle 82 83try: 84 from cStringIO import StringIO as BytesIO 85except ImportError: 86 from io import BytesIO 87 88try: 89 BaseException 90except NameError: 91 BaseException = Exception 92 93try: 94 ModuleNotFoundError 95except NameError: 96 ModuleNotFoundError = ImportError 97 98# TODO: usage of 'import' after setting __name__, but before fixing up 99# sys.modules generates a warning. This happens when profiling = True. 100warnings.filterwarnings('ignore', 101 "Parent module 'mitogen' not found while handling absolute import") 102 103LOG = logging.getLogger('mitogen') 104IOLOG = logging.getLogger('mitogen.io') 105IOLOG.setLevel(logging.INFO) 106 107# str.encode() may take import lock. Deadlock possible if broker calls 108# .encode() on behalf of thread currently waiting for module. 109LATIN1_CODEC = encodings.latin_1.Codec() 110 111_v = False 112_vv = False 113 114GET_MODULE = 100 115CALL_FUNCTION = 101 116FORWARD_LOG = 102 117ADD_ROUTE = 103 118DEL_ROUTE = 104 119ALLOCATE_ID = 105 120SHUTDOWN = 106 121LOAD_MODULE = 107 122FORWARD_MODULE = 108 123DETACHING = 109 124CALL_SERVICE = 110 125STUB_CALL_SERVICE = 111 126 127#: Special value used to signal disconnection or the inability to route a 128#: message, when it appears in the `reply_to` field. Usually causes 129#: :class:`mitogen.core.ChannelError` to be raised when it is received. 130#: 131#: It indicates the sender did not know how to process the message, or wishes 132#: no further messages to be delivered to it. It is used when: 133#: 134#: * a remote receiver is disconnected or explicitly closed. 135#: * a related message could not be delivered due to no route existing for it. 136#: * a router is being torn down, as a sentinel value to notify 137#: :meth:`mitogen.core.Router.add_handler` callbacks to clean up. 138IS_DEAD = 999 139 140try: 141 BaseException 142except NameError: 143 BaseException = Exception 144 145PY24 = sys.version_info < (2, 5) 146PY3 = sys.version_info > (3,) 147if PY3: 148 b = str.encode 149 BytesType = bytes 150 UnicodeType = str 151 FsPathTypes = (str,) 152 BufferType = lambda buf, start: memoryview(buf)[start:] 153 long = int 154else: 155 b = str 156 BytesType = str 157 FsPathTypes = (str, unicode) 158 BufferType = buffer 159 UnicodeType = unicode 160 161AnyTextType = (BytesType, UnicodeType) 162 163try: 164 next 165except NameError: 166 next = lambda it: it.next() 167 168# #550: prehistoric WSL did not advertise itself in uname output. 169try: 170 fp = open('/proc/sys/kernel/osrelease') 171 IS_WSL = 'Microsoft' in fp.read() 172 fp.close() 173except IOError: 174 IS_WSL = False 175 176 177#: Default size for calls to :meth:`Side.read` or :meth:`Side.write`, and the 178#: size of buffers configured by :func:`mitogen.parent.create_socketpair`. This 179#: value has many performance implications, 128KiB seems to be a sweet spot. 180#: 181#: * When set low, large messages cause many :class:`Broker` IO loop 182#: iterations, burning CPU and reducing throughput. 183#: * When set high, excessive RAM is reserved by the OS for socket buffers (2x 184#: per child), and an identically sized temporary userspace buffer is 185#: allocated on each read that requires zeroing, and over a particular size 186#: may require two system calls to allocate/deallocate. 187#: 188#: Care must be taken to ensure the underlying kernel object and receiving 189#: program support the desired size. For example, 190#: 191#: * Most UNIXes have TTYs with fixed 2KiB-4KiB buffers, making them unsuitable 192#: for efficient IO. 193#: * Different UNIXes have varying presets for pipes, which may not be 194#: configurable. On recent Linux the default pipe buffer size is 64KiB, but 195#: under memory pressure may be as low as 4KiB for unprivileged processes. 196#: * When communication is via an intermediary process, its internal buffers 197#: effect the speed OS buffers will drain. For example OpenSSH uses 64KiB 198#: reads. 199#: 200#: An ideal :class:`Message` has a size that is a multiple of 201#: :data:`CHUNK_SIZE` inclusive of headers, to avoid wasting IO loop iterations 202#: writing small trailer chunks. 203CHUNK_SIZE = 131072 204 205_tls = threading.local() 206 207 208if __name__ == 'mitogen.core': 209 # When loaded using import mechanism, ExternalContext.main() will not have 210 # a chance to set the synthetic mitogen global, so just import it here. 211 import mitogen 212else: 213 # When loaded as __main__, ensure classes and functions gain a __module__ 214 # attribute consistent with the host process, so that pickling succeeds. 215 __name__ = 'mitogen.core' 216 217 218class Error(Exception): 219 """ 220 Base for all exceptions raised by Mitogen. 221 222 :param str fmt: 223 Exception text, or format string if `args` is non-empty. 224 :param tuple args: 225 Format string arguments. 226 """ 227 def __init__(self, fmt=None, *args): 228 if args: 229 fmt %= args 230 if fmt and not isinstance(fmt, UnicodeType): 231 fmt = fmt.decode('utf-8') 232 Exception.__init__(self, fmt) 233 234 235class LatchError(Error): 236 """ 237 Raised when an attempt is made to use a :class:`mitogen.core.Latch` that 238 has been marked closed. 239 """ 240 pass 241 242 243class Blob(BytesType): 244 """ 245 A serializable bytes subclass whose content is summarized in repr() output, 246 making it suitable for logging binary data. 247 """ 248 def __repr__(self): 249 return '[blob: %d bytes]' % len(self) 250 251 def __reduce__(self): 252 return (Blob, (BytesType(self),)) 253 254 255class Secret(UnicodeType): 256 """ 257 A serializable unicode subclass whose content is masked in repr() output, 258 making it suitable for logging passwords. 259 """ 260 def __repr__(self): 261 return '[secret]' 262 263 if not PY3: 264 # TODO: what is this needed for in 2.x? 265 def __str__(self): 266 return UnicodeType(self) 267 268 def __reduce__(self): 269 return (Secret, (UnicodeType(self),)) 270 271 272class Kwargs(dict): 273 """ 274 A serializable dict subclass that indicates its keys should be coerced to 275 Unicode on Python 3 and bytes on Python<2.6. 276 277 Python 2 produces keyword argument dicts whose keys are bytes, requiring a 278 helper to ensure compatibility with Python 3 where Unicode is required, 279 whereas Python 3 produces keyword argument dicts whose keys are Unicode, 280 requiring a helper for Python 2.4/2.5, where bytes are required. 281 """ 282 if PY3: 283 def __init__(self, dct): 284 for k, v in dct.items(): 285 if type(k) is bytes: 286 self[k.decode()] = v 287 else: 288 self[k] = v 289 elif sys.version_info < (2, 6, 5): 290 def __init__(self, dct): 291 for k, v in dct.iteritems(): 292 if type(k) is unicode: 293 k, _ = encodings.utf_8.encode(k) 294 self[k] = v 295 296 def __repr__(self): 297 return 'Kwargs(%s)' % (dict.__repr__(self),) 298 299 def __reduce__(self): 300 return (Kwargs, (dict(self),)) 301 302 303class CallError(Error): 304 """ 305 Serializable :class:`Error` subclass raised when :meth:`Context.call() 306 <mitogen.parent.Context.call>` fails. A copy of the traceback from the 307 external context is appended to the exception message. 308 """ 309 def __init__(self, fmt=None, *args): 310 if not isinstance(fmt, BaseException): 311 Error.__init__(self, fmt, *args) 312 else: 313 e = fmt 314 cls = e.__class__ 315 fmt = '%s.%s: %s' % (cls.__module__, cls.__name__, e) 316 tb = sys.exc_info()[2] 317 if tb: 318 fmt += '\n' 319 fmt += ''.join(traceback.format_tb(tb)) 320 Error.__init__(self, fmt) 321 322 def __reduce__(self): 323 return (_unpickle_call_error, (self.args[0],)) 324 325 326def _unpickle_call_error(s): 327 if not (type(s) is UnicodeType and len(s) < 10000): 328 raise TypeError('cannot unpickle CallError: bad input') 329 return CallError(s) 330 331 332class ChannelError(Error): 333 """ 334 Raised when a channel dies or has been closed. 335 """ 336 remote_msg = 'Channel closed by remote end.' 337 local_msg = 'Channel closed by local end.' 338 339 340class StreamError(Error): 341 """ 342 Raised when a stream cannot be established. 343 """ 344 pass 345 346 347class TimeoutError(Error): 348 """ 349 Raised when a timeout occurs on a stream. 350 """ 351 pass 352 353 354def to_text(o): 355 """ 356 Coerce `o` to Unicode by decoding it from UTF-8 if it is an instance of 357 :class:`bytes`, otherwise pass it to the :class:`str` constructor. The 358 returned object is always a plain :class:`str`, any subclass is removed. 359 """ 360 if isinstance(o, BytesType): 361 return o.decode('utf-8') 362 return UnicodeType(o) 363 364 365# Documented in api.rst to work around Sphinx limitation. 366now = getattr(time, 'monotonic', time.time) 367 368 369# Python 2.4 370try: 371 any 372except NameError: 373 def any(it): 374 for elem in it: 375 if elem: 376 return True 377 378 379def _partition(s, sep, find): 380 """ 381 (str|unicode).(partition|rpartition) for Python 2.4/2.5. 382 """ 383 idx = find(sep) 384 if idx != -1: 385 left = s[0:idx] 386 return left, sep, s[len(left)+len(sep):] 387 388 389if hasattr(UnicodeType, 'rpartition'): 390 str_partition = UnicodeType.partition 391 str_rpartition = UnicodeType.rpartition 392 bytes_partition = BytesType.partition 393else: 394 def str_partition(s, sep): 395 return _partition(s, sep, s.find) or (s, u'', u'') 396 def str_rpartition(s, sep): 397 return _partition(s, sep, s.rfind) or (u'', u'', s) 398 def bytes_partition(s, sep): 399 return _partition(s, sep, s.find) or (s, '', '') 400 401 402def _has_parent_authority(context_id): 403 return ( 404 (context_id == mitogen.context_id) or 405 (context_id in mitogen.parent_ids) 406 ) 407 408def has_parent_authority(msg, _stream=None): 409 """ 410 Policy function for use with :class:`Receiver` and 411 :meth:`Router.add_handler` that requires incoming messages to originate 412 from a parent context, or on a :class:`Stream` whose :attr:`auth_id 413 <Stream.auth_id>` has been set to that of a parent context or the current 414 context. 415 """ 416 return _has_parent_authority(msg.auth_id) 417 418 419def _signals(obj, signal): 420 return ( 421 obj.__dict__ 422 .setdefault('_signals', {}) 423 .setdefault(signal, []) 424 ) 425 426 427def listen(obj, name, func): 428 """ 429 Arrange for `func()` to be invoked when signal `name` is fired on `obj`. 430 """ 431 _signals(obj, name).append(func) 432 433 434def unlisten(obj, name, func): 435 """ 436 Remove `func()` from the list of functions invoked when signal `name` is 437 fired by `obj`. 438 439 :raises ValueError: 440 `func()` was not on the list. 441 """ 442 _signals(obj, name).remove(func) 443 444 445def fire(obj, name, *args, **kwargs): 446 """ 447 Arrange for `func(*args, **kwargs)` to be invoked for every function 448 registered for signal `name` on `obj`. 449 """ 450 for func in _signals(obj, name): 451 func(*args, **kwargs) 452 453 454def takes_econtext(func): 455 """ 456 Decorator that marks a function or class method to automatically receive a 457 kwarg named `econtext`, referencing the 458 :class:`mitogen.core.ExternalContext` active in the context in which the 459 function is being invoked in. The decorator is only meaningful when the 460 function is invoked via :data:`CALL_FUNCTION <mitogen.core.CALL_FUNCTION>`. 461 462 When the function is invoked directly, `econtext` must still be passed to 463 it explicitly. 464 """ 465 func.mitogen_takes_econtext = True 466 return func 467 468 469def takes_router(func): 470 """ 471 Decorator that marks a function or class method to automatically receive a 472 kwarg named `router`, referencing the :class:`mitogen.core.Router` active 473 in the context in which the function is being invoked in. The decorator is 474 only meaningful when the function is invoked via :data:`CALL_FUNCTION 475 <mitogen.core.CALL_FUNCTION>`. 476 477 When the function is invoked directly, `router` must still be passed to it 478 explicitly. 479 """ 480 func.mitogen_takes_router = True 481 return func 482 483 484def is_blacklisted_import(importer, fullname): 485 """ 486 Return :data:`True` if `fullname` is part of a blacklisted package, or if 487 any packages have been whitelisted and `fullname` is not part of one. 488 489 NB: 490 - If a package is on both lists, then it is treated as blacklisted. 491 - If any package is whitelisted, then all non-whitelisted packages are 492 treated as blacklisted. 493 """ 494 return ((not any(fullname.startswith(s) for s in importer.whitelist)) or 495 (any(fullname.startswith(s) for s in importer.blacklist))) 496 497 498def set_cloexec(fd): 499 """ 500 Set the file descriptor `fd` to automatically close on :func:`os.execve`. 501 This has no effect on file descriptors inherited across :func:`os.fork`, 502 they must be explicitly closed through some other means, such as 503 :func:`mitogen.fork.on_fork`. 504 """ 505 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 506 assert fd > 2, 'fd %r <= 2' % (fd,) 507 fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) 508 509 510def set_nonblock(fd): 511 """ 512 Set the file descriptor `fd` to non-blocking mode. For most underlying file 513 types, this causes :func:`os.read` or :func:`os.write` to raise 514 :class:`OSError` with :data:`errno.EAGAIN` rather than block the thread 515 when the underlying kernel buffer is exhausted. 516 """ 517 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 518 fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 519 520 521def set_block(fd): 522 """ 523 Inverse of :func:`set_nonblock`, i.e. cause `fd` to block the thread when 524 the underlying kernel buffer is exhausted. 525 """ 526 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 527 fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) 528 529 530def io_op(func, *args): 531 """ 532 Wrap `func(*args)` that may raise :class:`select.error`, :class:`IOError`, 533 or :class:`OSError`, trapping UNIX error codes relating to disconnection 534 and retry events in various subsystems: 535 536 * When a signal is delivered to the process on Python 2, system call retry 537 is signalled through :data:`errno.EINTR`. The invocation is automatically 538 restarted. 539 * When performing IO against a TTY, disconnection of the remote end is 540 signalled by :data:`errno.EIO`. 541 * When performing IO against a socket, disconnection of the remote end is 542 signalled by :data:`errno.ECONNRESET`. 543 * When performing IO against a pipe, disconnection of the remote end is 544 signalled by :data:`errno.EPIPE`. 545 546 :returns: 547 Tuple of `(return_value, disconnect_reason)`, where `return_value` is 548 the return value of `func(*args)`, and `disconnected` is an exception 549 instance when disconnection was detected, otherwise :data:`None`. 550 """ 551 while True: 552 try: 553 return func(*args), None 554 except (select.error, OSError, IOError): 555 e = sys.exc_info()[1] 556 _vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e) 557 if e.args[0] == errno.EINTR: 558 continue 559 if e.args[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE): 560 return None, e 561 raise 562 563 564class PidfulStreamHandler(logging.StreamHandler): 565 """ 566 A :class:`logging.StreamHandler` subclass used when 567 :meth:`Router.enable_debug() <mitogen.master.Router.enable_debug>` has been 568 called, or the `debug` parameter was specified during context construction. 569 Verifies the process ID has not changed on each call to :meth:`emit`, 570 reopening the associated log file when a change is detected. 571 572 This ensures logging to the per-process output files happens correctly even 573 when uncooperative third party components call :func:`os.fork`. 574 """ 575 #: PID that last opened the log file. 576 open_pid = None 577 578 #: Output path template. 579 template = '/tmp/mitogen.%s.%s.log' 580 581 def _reopen(self): 582 self.acquire() 583 try: 584 if self.open_pid == os.getpid(): 585 return 586 ts = time.strftime('%Y%m%d_%H%M%S') 587 path = self.template % (os.getpid(), ts) 588 self.stream = open(path, 'w', 1) 589 set_cloexec(self.stream.fileno()) 590 self.stream.write('Parent PID: %s\n' % (os.getppid(),)) 591 self.stream.write('Created by:\n\n%s\n' % ( 592 ''.join(traceback.format_stack()), 593 )) 594 self.open_pid = os.getpid() 595 finally: 596 self.release() 597 598 def emit(self, record): 599 if self.open_pid != os.getpid(): 600 self._reopen() 601 logging.StreamHandler.emit(self, record) 602 603 604def enable_debug_logging(): 605 global _v, _vv 606 _v = True 607 _vv = True 608 root = logging.getLogger() 609 root.setLevel(logging.DEBUG) 610 IOLOG.setLevel(logging.DEBUG) 611 handler = PidfulStreamHandler() 612 handler.formatter = logging.Formatter( 613 '%(asctime)s %(levelname).1s %(name)s: %(message)s', 614 '%H:%M:%S' 615 ) 616 root.handlers.insert(0, handler) 617 618 619_profile_hook = lambda name, func, *args: func(*args) 620_profile_fmt = os.environ.get( 621 'MITOGEN_PROFILE_FMT', 622 '/tmp/mitogen.stats.%(pid)s.%(identity)s.%(now)s.%(ext)s', 623) 624 625 626def _profile_hook(name, func, *args): 627 """ 628 Call `func(*args)` and return its result. This function is replaced by 629 :func:`_real_profile_hook` when :func:`enable_profiling` is called. This 630 interface is obsolete and will be replaced by a signals-based integration 631 later on. 632 """ 633 return func(*args) 634 635 636def _real_profile_hook(name, func, *args): 637 profiler = cProfile.Profile() 638 profiler.enable() 639 try: 640 return func(*args) 641 finally: 642 path = _profile_fmt % { 643 'now': int(1e6 * now()), 644 'identity': name, 645 'pid': os.getpid(), 646 'ext': '%s' 647 } 648 profiler.dump_stats(path % ('pstats',)) 649 profiler.create_stats() 650 fp = open(path % ('log',), 'w') 651 try: 652 stats = pstats.Stats(profiler, stream=fp) 653 stats.sort_stats('cumulative') 654 stats.print_stats() 655 finally: 656 fp.close() 657 658 659def enable_profiling(econtext=None): 660 global _profile_hook 661 _profile_hook = _real_profile_hook 662 663 664def import_module(modname): 665 """ 666 Import `module` and return the attribute named `attr`. 667 """ 668 return __import__(modname, None, None, ['']) 669 670 671def pipe(): 672 """ 673 Create a UNIX pipe pair using :func:`os.pipe`, wrapping the returned 674 descriptors in Python file objects in order to manage their lifetime and 675 ensure they are closed when their last reference is discarded and they have 676 not been closed explicitly. 677 """ 678 rfd, wfd = os.pipe() 679 return ( 680 os.fdopen(rfd, 'rb', 0), 681 os.fdopen(wfd, 'wb', 0) 682 ) 683 684 685def iter_split(buf, delim, func): 686 """ 687 Invoke `func(s)` for each `delim`-delimited chunk in the potentially large 688 `buf`, avoiding intermediate lists and quadratic string operations. Return 689 the trailing undelimited portion of `buf`, or any unprocessed portion of 690 `buf` after `func(s)` returned :data:`False`. 691 692 :returns: 693 `(trailer, cont)`, where `cont` is :data:`False` if the last call to 694 `func(s)` returned :data:`False`. 695 """ 696 dlen = len(delim) 697 start = 0 698 cont = True 699 while cont: 700 nl = buf.find(delim, start) 701 if nl == -1: 702 break 703 cont = not func(buf[start:nl]) is False 704 start = nl + dlen 705 return buf[start:], cont 706 707 708class Py24Pickler(py_pickle.Pickler): 709 """ 710 Exceptions were classic classes until Python 2.5. Sadly for 2.4, cPickle 711 offers little control over how a classic instance is pickled. Therefore 2.4 712 uses a pure-Python pickler, so CallError can be made to look as it does on 713 newer Pythons. 714 715 This mess will go away once proper serialization exists. 716 """ 717 @classmethod 718 def dumps(cls, obj, protocol): 719 bio = BytesIO() 720 self = cls(bio, protocol=protocol) 721 self.dump(obj) 722 return bio.getvalue() 723 724 def save_exc_inst(self, obj): 725 if isinstance(obj, CallError): 726 func, args = obj.__reduce__() 727 self.save(func) 728 self.save(args) 729 self.write(py_pickle.REDUCE) 730 else: 731 py_pickle.Pickler.save_inst(self, obj) 732 733 if PY24: 734 dispatch = py_pickle.Pickler.dispatch.copy() 735 dispatch[py_pickle.InstanceType] = save_exc_inst 736 737 738if PY3: 739 # In 3.x Unpickler is a class exposing find_class as an overridable, but it 740 # cannot be overridden without subclassing. 741 class _Unpickler(pickle.Unpickler): 742 def find_class(self, module, func): 743 return self.find_global(module, func) 744 pickle__dumps = pickle.dumps 745elif PY24: 746 # On Python 2.4, we must use a pure-Python pickler. 747 pickle__dumps = Py24Pickler.dumps 748 _Unpickler = pickle.Unpickler 749else: 750 pickle__dumps = pickle.dumps 751 # In 2.x Unpickler is a function exposing a writeable find_global 752 # attribute. 753 _Unpickler = pickle.Unpickler 754 755 756class Message(object): 757 """ 758 Messages are the fundamental unit of communication, comprising fields from 759 the :ref:`stream-protocol` header, an optional reference to the receiving 760 :class:`mitogen.core.Router` for ingress messages, and helper methods for 761 deserialization and generating replies. 762 """ 763 #: Integer target context ID. :class:`Router` delivers messages locally 764 #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise 765 #: they are routed up or downstream. 766 dst_id = None 767 768 #: Integer source context ID. Used as the target of replies if any are 769 #: generated. 770 src_id = None 771 772 #: Context ID under whose authority the message is acting. See 773 #: :ref:`source-verification`. 774 auth_id = None 775 776 #: Integer target handle in the destination context. This is one of the 777 #: :ref:`standard-handles`, or a dynamically generated handle used to 778 #: receive a one-time reply, such as the return value of a function call. 779 handle = None 780 781 #: Integer target handle to direct any reply to this message. Used to 782 #: receive a one-time reply, such as the return value of a function call. 783 #: :data:`IS_DEAD` has a special meaning when it appears in this field. 784 reply_to = None 785 786 #: Raw message data bytes. 787 data = b('') 788 789 _unpickled = object() 790 791 #: The :class:`Router` responsible for routing the message. This is 792 #: :data:`None` for locally originated messages. 793 router = None 794 795 #: The :class:`Receiver` over which the message was last received. Part of 796 #: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`. 797 receiver = None 798 799 HEADER_FMT = '>hLLLLLL' 800 HEADER_LEN = struct.calcsize(HEADER_FMT) 801 HEADER_MAGIC = 0x4d49 # 'MI' 802 803 def __init__(self, **kwargs): 804 """ 805 Construct a message from from the supplied `kwargs`. :attr:`src_id` and 806 :attr:`auth_id` are always set to :data:`mitogen.context_id`. 807 """ 808 self.src_id = mitogen.context_id 809 self.auth_id = mitogen.context_id 810 vars(self).update(kwargs) 811 assert isinstance(self.data, BytesType), 'Message data is not Bytes' 812 813 def pack(self): 814 return ( 815 struct.pack(self.HEADER_FMT, self.HEADER_MAGIC, self.dst_id, 816 self.src_id, self.auth_id, self.handle, 817 self.reply_to or 0, len(self.data)) 818 + self.data 819 ) 820 821 def _unpickle_context(self, context_id, name): 822 return _unpickle_context(context_id, name, router=self.router) 823 824 def _unpickle_sender(self, context_id, dst_handle): 825 return _unpickle_sender(self.router, context_id, dst_handle) 826 827 def _unpickle_bytes(self, s, encoding): 828 s, n = LATIN1_CODEC.encode(s) 829 return s 830 831 def _find_global(self, module, func): 832 """ 833 Return the class implementing `module_name.class_name` or raise 834 `StreamError` if the module is not whitelisted. 835 """ 836 if module == __name__: 837 if func == '_unpickle_call_error' or func == 'CallError': 838 return _unpickle_call_error 839 elif func == '_unpickle_sender': 840 return self._unpickle_sender 841 elif func == '_unpickle_context': 842 return self._unpickle_context 843 elif func == 'Blob': 844 return Blob 845 elif func == 'Secret': 846 return Secret 847 elif func == 'Kwargs': 848 return Kwargs 849 elif module == '_codecs' and func == 'encode': 850 return self._unpickle_bytes 851 elif module == '__builtin__' and func == 'bytes': 852 return BytesType 853 raise StreamError('cannot unpickle %r/%r', module, func) 854 855 @property 856 def is_dead(self): 857 """ 858 :data:`True` if :attr:`reply_to` is set to the magic value 859 :data:`IS_DEAD`, indicating the sender considers the channel dead. Dead 860 messages can be raised in a variety of circumstances, see 861 :data:`IS_DEAD` for more information. 862 """ 863 return self.reply_to == IS_DEAD 864 865 @classmethod 866 def dead(cls, reason=None, **kwargs): 867 """ 868 Syntax helper to construct a dead message. 869 """ 870 kwargs['data'], _ = encodings.utf_8.encode(reason or u'') 871 return cls(reply_to=IS_DEAD, **kwargs) 872 873 @classmethod 874 def pickled(cls, obj, **kwargs): 875 """ 876 Construct a pickled message, setting :attr:`data` to the serialization 877 of `obj`, and setting remaining fields using `kwargs`. 878 879 :returns: 880 The new message. 881 """ 882 self = cls(**kwargs) 883 try: 884 self.data = pickle__dumps(obj, protocol=2) 885 except pickle.PicklingError: 886 e = sys.exc_info()[1] 887 self.data = pickle__dumps(CallError(e), protocol=2) 888 return self 889 890 def reply(self, msg, router=None, **kwargs): 891 """ 892 Compose a reply to this message and send it using :attr:`router`, or 893 `router` is :attr:`router` is :data:`None`. 894 895 :param obj: 896 Either a :class:`Message`, or an object to be serialized in order 897 to construct a new message. 898 :param router: 899 Optional router to use if :attr:`router` is :data:`None`. 900 :param kwargs: 901 Optional keyword parameters overriding message fields in the reply. 902 """ 903 if not isinstance(msg, Message): 904 msg = Message.pickled(msg) 905 msg.dst_id = self.src_id 906 msg.handle = self.reply_to 907 vars(msg).update(kwargs) 908 if msg.handle: 909 (self.router or router).route(msg) 910 else: 911 LOG.debug('dropping reply to message with no return address: %r', 912 msg) 913 914 if PY3: 915 UNPICKLER_KWARGS = {'encoding': 'bytes'} 916 else: 917 UNPICKLER_KWARGS = {} 918 919 def _throw_dead(self): 920 if len(self.data): 921 raise ChannelError(self.data.decode('utf-8', 'replace')) 922 elif self.src_id == mitogen.context_id: 923 raise ChannelError(ChannelError.local_msg) 924 else: 925 raise ChannelError(ChannelError.remote_msg) 926 927 def unpickle(self, throw=True, throw_dead=True): 928 """ 929 Unpickle :attr:`data`, optionally raising any exceptions present. 930 931 :param bool throw_dead: 932 If :data:`True`, raise exceptions, otherwise it is the caller's 933 responsibility. 934 935 :raises CallError: 936 The serialized data contained CallError exception. 937 :raises ChannelError: 938 The `is_dead` field was set. 939 """ 940 _vv and IOLOG.debug('%r.unpickle()', self) 941 if throw_dead and self.is_dead: 942 self._throw_dead() 943 944 obj = self._unpickled 945 if obj is Message._unpickled: 946 fp = BytesIO(self.data) 947 unpickler = _Unpickler(fp, **self.UNPICKLER_KWARGS) 948 unpickler.find_global = self._find_global 949 try: 950 # Must occur off the broker thread. 951 try: 952 obj = unpickler.load() 953 except: 954 LOG.error('raw pickle was: %r', self.data) 955 raise 956 self._unpickled = obj 957 except (TypeError, ValueError): 958 e = sys.exc_info()[1] 959 raise StreamError('invalid message: %s', e) 960 961 if throw: 962 if isinstance(obj, CallError): 963 raise obj 964 965 return obj 966 967 def __repr__(self): 968 return 'Message(%r, %r, %r, %r, %r, %r..%d)' % ( 969 self.dst_id, self.src_id, self.auth_id, self.handle, 970 self.reply_to, (self.data or '')[:50], len(self.data) 971 ) 972 973 974class Sender(object): 975 """ 976 Senders are used to send pickled messages to a handle in another context, 977 it is the inverse of :class:`mitogen.core.Receiver`. 978 979 Senders may be serialized, making them convenient to wire up data flows. 980 See :meth:`mitogen.core.Receiver.to_sender` for more information. 981 982 :param mitogen.core.Context context: 983 Context to send messages to. 984 :param int dst_handle: 985 Destination handle to send messages to. 986 """ 987 def __init__(self, context, dst_handle): 988 self.context = context 989 self.dst_handle = dst_handle 990 991 def send(self, data): 992 """ 993 Send `data` to the remote end. 994 """ 995 _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) 996 self.context.send(Message.pickled(data, handle=self.dst_handle)) 997 998 explicit_close_msg = 'Sender was explicitly closed' 999 1000 def close(self): 1001 """ 1002 Send a dead message to the remote, causing :meth:`ChannelError` to be 1003 raised in any waiting thread. 1004 """ 1005 _vv and IOLOG.debug('%r.close()', self) 1006 self.context.send( 1007 Message.dead( 1008 reason=self.explicit_close_msg, 1009 handle=self.dst_handle 1010 ) 1011 ) 1012 1013 def __repr__(self): 1014 return 'Sender(%r, %r)' % (self.context, self.dst_handle) 1015 1016 def __reduce__(self): 1017 return _unpickle_sender, (self.context.context_id, self.dst_handle) 1018 1019 1020def _unpickle_sender(router, context_id, dst_handle): 1021 if not (isinstance(router, Router) and 1022 isinstance(context_id, (int, long)) and context_id >= 0 and 1023 isinstance(dst_handle, (int, long)) and dst_handle > 0): 1024 raise TypeError('cannot unpickle Sender: bad input or missing router') 1025 return Sender(Context(router, context_id), dst_handle) 1026 1027 1028class Receiver(object): 1029 """ 1030 Receivers maintain a thread-safe queue of messages sent to a handle of this 1031 context from another context. 1032 1033 :param mitogen.core.Router router: 1034 Router to register the handler on. 1035 1036 :param int handle: 1037 If not :data:`None`, an explicit handle to register, otherwise an 1038 unused handle is chosen. 1039 1040 :param bool persist: 1041 If :data:`False`, unregister the handler after one message is received. 1042 Single-message receivers are intended for RPC-like transactions, such 1043 as in the case of :meth:`mitogen.parent.Context.call_async`. 1044 1045 :param mitogen.core.Context respondent: 1046 Context this receiver is receiving from. If not :data:`None`, arranges 1047 for the receiver to receive a dead message if messages can no longer be 1048 routed to the context due to disconnection, and ignores messages that 1049 did not originate from the respondent context. 1050 """ 1051 #: If not :data:`None`, a function invoked as `notify(receiver)` after a 1052 #: message has been received. The function is invoked on :class:`Broker` 1053 #: thread, therefore it must not block. Used by 1054 #: :class:`mitogen.select.Select` to efficiently implement waiting on 1055 #: multiple event sources. 1056 notify = None 1057 1058 raise_channelerror = True 1059 1060 def __init__(self, router, handle=None, persist=True, 1061 respondent=None, policy=None, overwrite=False): 1062 self.router = router 1063 #: The handle. 1064 self.handle = handle # Avoid __repr__ crash in add_handler() 1065 self._latch = Latch() # Must exist prior to .add_handler() 1066 self.handle = router.add_handler( 1067 fn=self._on_receive, 1068 handle=handle, 1069 policy=policy, 1070 persist=persist, 1071 respondent=respondent, 1072 overwrite=overwrite, 1073 ) 1074 1075 def __repr__(self): 1076 return 'Receiver(%r, %r)' % (self.router, self.handle) 1077 1078 def __enter__(self): 1079 return self 1080 1081 def __exit__(self, _1, _2, _3): 1082 self.close() 1083 1084 def to_sender(self): 1085 """ 1086 Return a :class:`Sender` configured to deliver messages to this 1087 receiver. As senders are serializable, this makes it convenient to pass 1088 `(context_id, handle)` pairs around:: 1089 1090 def deliver_monthly_report(sender): 1091 for line in open('monthly_report.txt'): 1092 sender.send(line) 1093 sender.close() 1094 1095 @mitogen.main() 1096 def main(router): 1097 remote = router.ssh(hostname='mainframe') 1098 recv = mitogen.core.Receiver(router) 1099 remote.call(deliver_monthly_report, recv.to_sender()) 1100 for msg in recv: 1101 print(msg) 1102 """ 1103 return Sender(self.router.myself(), self.handle) 1104 1105 def _on_receive(self, msg): 1106 """ 1107 Callback registered for the handle with :class:`Router`; appends data 1108 to the internal queue. 1109 """ 1110 _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) 1111 self._latch.put(msg) 1112 if self.notify: 1113 self.notify(self) 1114 1115 closed_msg = 'the Receiver has been closed' 1116 1117 def close(self): 1118 """ 1119 Unregister the receiver's handle from its associated router, and cause 1120 :class:`ChannelError` to be raised in any thread waiting in :meth:`get` 1121 on this receiver. 1122 """ 1123 if self.handle: 1124 self.router.del_handler(self.handle) 1125 self.handle = None 1126 self._latch.close() 1127 1128 def size(self): 1129 """ 1130 Return the number of items currently buffered. 1131 1132 As with :class:`Queue.Queue`, `0` may be returned even though a 1133 subsequent call to :meth:`get` will succeed, since a message may be 1134 posted at any moment between :meth:`size` and :meth:`get`. 1135 1136 As with :class:`Queue.Queue`, `>0` may be returned even though a 1137 subsequent call to :meth:`get` will block, since another waiting thread 1138 may be woken at any moment between :meth:`size` and :meth:`get`. 1139 1140 :raises LatchError: 1141 The underlying latch has already been marked closed. 1142 """ 1143 return self._latch.size() 1144 1145 def empty(self): 1146 """ 1147 Return `size() == 0`. 1148 1149 .. deprecated:: 0.2.8 1150 Use :meth:`size` instead. 1151 1152 :raises LatchError: 1153 The latch has already been marked closed. 1154 """ 1155 return self._latch.empty() 1156 1157 def get(self, timeout=None, block=True, throw_dead=True): 1158 """ 1159 Sleep waiting for a message to arrive on this receiver. 1160 1161 :param float timeout: 1162 If not :data:`None`, specifies a timeout in seconds. 1163 1164 :raises mitogen.core.ChannelError: 1165 The remote end indicated the channel should be closed, 1166 communication with it was lost, or :meth:`close` was called in the 1167 local process. 1168 1169 :raises mitogen.core.TimeoutError: 1170 Timeout was reached. 1171 1172 :returns: 1173 :class:`Message` that was received. 1174 """ 1175 _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) 1176 try: 1177 msg = self._latch.get(timeout=timeout, block=block) 1178 except LatchError: 1179 raise ChannelError(self.closed_msg) 1180 if msg.is_dead and throw_dead: 1181 msg._throw_dead() 1182 return msg 1183 1184 def __iter__(self): 1185 """ 1186 Yield consecutive :class:`Message` instances delivered to this receiver 1187 until :class:`ChannelError` is raised. 1188 """ 1189 while True: 1190 try: 1191 msg = self.get() 1192 except ChannelError: 1193 return 1194 yield msg 1195 1196 1197class Channel(Sender, Receiver): 1198 """ 1199 A channel inherits from :class:`mitogen.core.Sender` and 1200 `mitogen.core.Receiver` to provide bidirectional functionality. 1201 1202 .. deprecated:: 0.2.0 1203 This class is incomplete and obsolete, it will be removed in Mitogen 1204 0.3. 1205 1206 Channels were an early attempt at syntax sugar. It is always easier to pass 1207 around unidirectional pairs of senders/receivers, even though the syntax is 1208 baroque: 1209 1210 .. literalinclude:: ../examples/ping_pong.py 1211 1212 Since all handles aren't known until after both ends are constructed, for 1213 both ends to communicate through a channel, it is necessary for one end to 1214 retrieve the handle allocated to the other and reconfigure its own channel 1215 to match. Currently this is a manual task. 1216 """ 1217 def __init__(self, router, context, dst_handle, handle=None): 1218 Sender.__init__(self, context, dst_handle) 1219 Receiver.__init__(self, router, handle) 1220 1221 def close(self): 1222 Receiver.close(self) 1223 Sender.close(self) 1224 1225 def __repr__(self): 1226 return 'Channel(%s, %s)' % ( 1227 Sender.__repr__(self), 1228 Receiver.__repr__(self) 1229 ) 1230 1231 1232class Importer(object): 1233 """ 1234 Import protocol implementation that fetches modules from the parent 1235 process. 1236 1237 :param context: Context to communicate via. 1238 """ 1239 # The Mitogen package is handled specially, since the child context must 1240 # construct it manually during startup. 1241 MITOGEN_PKG_CONTENT = [ 1242 'buildah', 1243 'compat', 1244 'debug', 1245 'doas', 1246 'docker', 1247 'kubectl', 1248 'fakessh', 1249 'fork', 1250 'jail', 1251 'lxc', 1252 'lxd', 1253 'master', 1254 'minify', 1255 'os_fork', 1256 'parent', 1257 'select', 1258 'service', 1259 'setns', 1260 'ssh', 1261 'su', 1262 'sudo', 1263 'utils', 1264 ] 1265 1266 ALWAYS_BLACKLIST = [ 1267 # 2.x generates needless imports for 'builtins', while 3.x does the 1268 # same for '__builtin__'. The correct one is built-in, the other always 1269 # a negative round-trip. 1270 'builtins', 1271 '__builtin__', 1272 'thread', 1273 1274 # org.python.core imported by copy, pickle, xml.sax; breaks Jython, but 1275 # very unlikely to trigger a bug report. 1276 'org', 1277 ] 1278 1279 if PY3: 1280 ALWAYS_BLACKLIST += ['cStringIO'] 1281 1282 def __init__(self, router, context, core_src, whitelist=(), blacklist=()): 1283 self._log = logging.getLogger('mitogen.importer') 1284 self._context = context 1285 self._present = {'mitogen': self.MITOGEN_PKG_CONTENT} 1286 self._lock = threading.Lock() 1287 self.whitelist = list(whitelist) or [''] 1288 self.blacklist = list(blacklist) + self.ALWAYS_BLACKLIST 1289 1290 # Preserve copies of the original server-supplied whitelist/blacklist 1291 # for later use by children. 1292 self.master_whitelist = self.whitelist[:] 1293 self.master_blacklist = self.blacklist[:] 1294 1295 # Presence of an entry in this map indicates in-flight GET_MODULE. 1296 self._callbacks = {} 1297 self._cache = {} 1298 if core_src: 1299 self._update_linecache('x/mitogen/core.py', core_src) 1300 self._cache['mitogen.core'] = ( 1301 'mitogen.core', 1302 None, 1303 'x/mitogen/core.py', 1304 zlib.compress(core_src, 9), 1305 [], 1306 ) 1307 self._install_handler(router) 1308 1309 def _update_linecache(self, path, data): 1310 """ 1311 The Python 2.4 linecache module, used to fetch source code for 1312 tracebacks and :func:`inspect.getsource`, does not support PEP-302, 1313 meaning it needs extra help to for Mitogen-loaded modules. Directly 1314 populate its cache if a loaded module belongs to the Mitogen package. 1315 """ 1316 if PY24 and 'mitogen' in path: 1317 linecache.cache[path] = ( 1318 len(data), 1319 0.0, 1320 [line+'\n' for line in data.splitlines()], 1321 path, 1322 ) 1323 1324 def _install_handler(self, router): 1325 router.add_handler( 1326 fn=self._on_load_module, 1327 handle=LOAD_MODULE, 1328 policy=has_parent_authority, 1329 ) 1330 1331 def __repr__(self): 1332 return 'Importer' 1333 1334 def builtin_find_module(self, fullname): 1335 # imp.find_module() will always succeed for __main__, because it is a 1336 # built-in module. That means it exists on a special linked list deep 1337 # within the bowels of the interpreter. We must special case it. 1338 if fullname == '__main__': 1339 raise ModuleNotFoundError() 1340 1341 parent, _, modname = str_rpartition(fullname, '.') 1342 if parent: 1343 path = sys.modules[parent].__path__ 1344 else: 1345 path = None 1346 1347 fp, pathname, description = imp.find_module(modname, path) 1348 if fp: 1349 fp.close() 1350 1351 def find_module(self, fullname, path=None): 1352 if hasattr(_tls, 'running'): 1353 return None 1354 1355 _tls.running = True 1356 try: 1357 #_v and self._log.debug('Python requested %r', fullname) 1358 fullname = to_text(fullname) 1359 pkgname, dot, _ = str_rpartition(fullname, '.') 1360 pkg = sys.modules.get(pkgname) 1361 if pkgname and getattr(pkg, '__loader__', None) is not self: 1362 self._log.debug('%s is submodule of a locally loaded package', 1363 fullname) 1364 return None 1365 1366 suffix = fullname[len(pkgname+dot):] 1367 if pkgname and suffix not in self._present.get(pkgname, ()): 1368 self._log.debug('%s has no submodule %s', pkgname, suffix) 1369 return None 1370 1371 # #114: explicitly whitelisted prefixes override any 1372 # system-installed package. 1373 if self.whitelist != ['']: 1374 if any(fullname.startswith(s) for s in self.whitelist): 1375 return self 1376 1377 try: 1378 self.builtin_find_module(fullname) 1379 _vv and self._log.debug('%r is available locally', fullname) 1380 except ImportError: 1381 _vv and self._log.debug('we will try to load %r', fullname) 1382 return self 1383 finally: 1384 del _tls.running 1385 1386 blacklisted_msg = ( 1387 '%r is present in the Mitogen importer blacklist, therefore this ' 1388 'context will not attempt to request it from the master, as the ' 1389 'request will always be refused.' 1390 ) 1391 pkg_resources_msg = ( 1392 'pkg_resources is prohibited from importing __main__, as it causes ' 1393 'problems in applications whose main module is not designed to be ' 1394 're-imported by children.' 1395 ) 1396 absent_msg = ( 1397 'The Mitogen master process was unable to serve %r. It may be a ' 1398 'native Python extension, or it may be missing entirely. Check the ' 1399 'importer debug logs on the master for more information.' 1400 ) 1401 1402 def _refuse_imports(self, fullname): 1403 if is_blacklisted_import(self, fullname): 1404 raise ModuleNotFoundError(self.blacklisted_msg % (fullname,)) 1405 1406 f = sys._getframe(2) 1407 requestee = f.f_globals['__name__'] 1408 1409 if fullname == '__main__' and requestee == 'pkg_resources': 1410 # Anything that imports pkg_resources will eventually cause 1411 # pkg_resources to try and scan __main__ for its __requires__ 1412 # attribute (pkg_resources/__init__.py::_build_master()). This 1413 # breaks any app that is not expecting its __main__ to suddenly be 1414 # sucked over a network and injected into a remote process, like 1415 # py.test. 1416 raise ModuleNotFoundError(self.pkg_resources_msg) 1417 1418 if fullname == 'pbr': 1419 # It claims to use pkg_resources to read version information, which 1420 # would result in PEP-302 being used, but it actually does direct 1421 # filesystem access. So instead smodge the environment to override 1422 # any version that was defined. This will probably break something 1423 # later. 1424 os.environ['PBR_VERSION'] = '0.0.0' 1425 1426 def _on_load_module(self, msg): 1427 if msg.is_dead: 1428 return 1429 1430 tup = msg.unpickle() 1431 fullname = tup[0] 1432 _v and self._log.debug('received %s', fullname) 1433 1434 self._lock.acquire() 1435 try: 1436 self._cache[fullname] = tup 1437 if tup[2] is not None and PY24: 1438 self._update_linecache( 1439 path='master:' + tup[2], 1440 data=zlib.decompress(tup[3]) 1441 ) 1442 callbacks = self._callbacks.pop(fullname, []) 1443 finally: 1444 self._lock.release() 1445 1446 for callback in callbacks: 1447 callback() 1448 1449 def _request_module(self, fullname, callback): 1450 self._lock.acquire() 1451 try: 1452 present = fullname in self._cache 1453 if not present: 1454 funcs = self._callbacks.get(fullname) 1455 if funcs is not None: 1456 _v and self._log.debug('existing request for %s in flight', 1457 fullname) 1458 funcs.append(callback) 1459 else: 1460 _v and self._log.debug('sending new %s request to parent', 1461 fullname) 1462 self._callbacks[fullname] = [callback] 1463 self._context.send( 1464 Message(data=b(fullname), handle=GET_MODULE) 1465 ) 1466 finally: 1467 self._lock.release() 1468 1469 if present: 1470 callback() 1471 1472 def load_module(self, fullname): 1473 fullname = to_text(fullname) 1474 _v and self._log.debug('requesting %s', fullname) 1475 self._refuse_imports(fullname) 1476 1477 event = threading.Event() 1478 self._request_module(fullname, event.set) 1479 event.wait() 1480 1481 ret = self._cache[fullname] 1482 if ret[2] is None: 1483 raise ModuleNotFoundError(self.absent_msg % (fullname,)) 1484 1485 pkg_present = ret[1] 1486 mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) 1487 mod.__file__ = self.get_filename(fullname) 1488 mod.__loader__ = self 1489 if pkg_present is not None: # it's a package. 1490 mod.__path__ = [] 1491 mod.__package__ = fullname 1492 self._present[fullname] = pkg_present 1493 else: 1494 mod.__package__ = str_rpartition(fullname, '.')[0] or None 1495 1496 if mod.__package__ and not PY3: 1497 # 2.x requires __package__ to be exactly a string. 1498 mod.__package__, _ = encodings.utf_8.encode(mod.__package__) 1499 1500 source = self.get_source(fullname) 1501 try: 1502 code = compile(source, mod.__file__, 'exec', 0, 1) 1503 except SyntaxError: 1504 LOG.exception('while importing %r', fullname) 1505 raise 1506 1507 if PY3: 1508 exec(code, vars(mod)) 1509 else: 1510 exec('exec code in vars(mod)') 1511 1512 # #590: if a module replaces itself in sys.modules during import, below 1513 # is necessary. This matches PyImport_ExecCodeModuleEx() 1514 return sys.modules.get(fullname, mod) 1515 1516 def get_filename(self, fullname): 1517 if fullname in self._cache: 1518 path = self._cache[fullname][2] 1519 if path is None: 1520 # If find_loader() returns self but a subsequent master RPC 1521 # reveals the module can't be loaded, and so load_module() 1522 # throws ImportError, on Python 3.x it is still possible for 1523 # the loader to be called to fetch metadata. 1524 raise ModuleNotFoundError(self.absent_msg % (fullname,)) 1525 return u'master:' + self._cache[fullname][2] 1526 1527 def get_source(self, fullname): 1528 if fullname in self._cache: 1529 compressed = self._cache[fullname][3] 1530 if compressed is None: 1531 raise ModuleNotFoundError(self.absent_msg % (fullname,)) 1532 1533 source = zlib.decompress(self._cache[fullname][3]) 1534 if PY3: 1535 return to_text(source) 1536 return source 1537 1538 1539class LogHandler(logging.Handler): 1540 """ 1541 A :class:`logging.Handler` subclass that arranges for :data:`FORWARD_LOG` 1542 messages to be sent to a parent context in response to logging messages 1543 generated by the current context. This is installed by default in child 1544 contexts during bootstrap, so that :mod:`logging` events can be viewed and 1545 managed centrally in the master process. 1546 1547 The handler is initially *corked* after construction, such that it buffers 1548 messages until :meth:`uncork` is called. This allows logging to be 1549 installed prior to communication with the target being available, and 1550 avoids any possible race where early log messages might be dropped. 1551 1552 :param mitogen.core.Context context: 1553 The context to send log messages towards. At present this is always 1554 the master process. 1555 """ 1556 def __init__(self, context): 1557 logging.Handler.__init__(self) 1558 self.context = context 1559 self.local = threading.local() 1560 self._buffer = [] 1561 # Private synchronization is needed while corked, to ensure no 1562 # concurrent call to _send() exists during uncork(). 1563 self._buffer_lock = threading.Lock() 1564 1565 def uncork(self): 1566 """ 1567 #305: during startup :class:`LogHandler` may be installed before it is 1568 possible to route messages, therefore messages are buffered until 1569 :meth:`uncork` is called by :class:`ExternalContext`. 1570 """ 1571 self._buffer_lock.acquire() 1572 try: 1573 self._send = self.context.send 1574 for msg in self._buffer: 1575 self._send(msg) 1576 self._buffer = None 1577 finally: 1578 self._buffer_lock.release() 1579 1580 def _send(self, msg): 1581 self._buffer_lock.acquire() 1582 try: 1583 if self._buffer is None: 1584 # uncork() may run concurrent to _send() 1585 self._send(msg) 1586 else: 1587 self._buffer.append(msg) 1588 finally: 1589 self._buffer_lock.release() 1590 1591 def emit(self, rec): 1592 """ 1593 Send a :data:`FORWARD_LOG` message towards the target context. 1594 """ 1595 if rec.name == 'mitogen.io' or \ 1596 getattr(self.local, 'in_emit', False): 1597 return 1598 1599 self.local.in_emit = True 1600 try: 1601 msg = self.format(rec) 1602 encoded = '%s\x00%s\x00%s' % (rec.name, rec.levelno, msg) 1603 if isinstance(encoded, UnicodeType): 1604 # Logging package emits both :( 1605 encoded = encoded.encode('utf-8') 1606 self._send(Message(data=encoded, handle=FORWARD_LOG)) 1607 finally: 1608 self.local.in_emit = False 1609 1610 1611class Stream(object): 1612 """ 1613 A :class:`Stream` is one readable and optionally one writeable file 1614 descriptor (represented by :class:`Side`) aggregated alongside an 1615 associated :class:`Protocol` that knows how to respond to IO readiness 1616 events for those descriptors. 1617 1618 Streams are registered with :class:`Broker`, and callbacks are invoked on 1619 the broker thread in response to IO activity. When registered using 1620 :meth:`Broker.start_receive` or :meth:`Broker._start_transmit`, the broker 1621 may call any of :meth:`on_receive`, :meth:`on_transmit`, 1622 :meth:`on_shutdown` or :meth:`on_disconnect`. 1623 1624 It is expected that the :class:`Protocol` associated with a stream will 1625 change over its life. For example during connection setup, the initial 1626 protocol may be :class:`mitogen.parent.BootstrapProtocol` that knows how to 1627 enter SSH and sudo passwords and transmit the :mod:`mitogen.core` source to 1628 the target, before handing off to :class:`MitogenProtocol` when the target 1629 process is initialized. 1630 1631 Streams connecting to children are in turn aggregated by 1632 :class:`mitogen.parent.Connection`, which contains additional logic for 1633 managing any child process, and a reference to any separate ``stderr`` 1634 :class:`Stream` connected to that process. 1635 """ 1636 #: A :class:`Side` representing the stream's receive file descriptor. 1637 receive_side = None 1638 1639 #: A :class:`Side` representing the stream's transmit file descriptor. 1640 transmit_side = None 1641 1642 #: A :class:`Protocol` representing the protocol active on the stream. 1643 protocol = None 1644 1645 #: In parents, the :class:`mitogen.parent.Connection` instance. 1646 conn = None 1647 1648 #: The stream name. This is used in the :meth:`__repr__` output in any log 1649 #: messages, it may be any descriptive string. 1650 name = u'default' 1651 1652 def set_protocol(self, protocol): 1653 """ 1654 Bind a :class:`Protocol` to this stream, by updating 1655 :attr:`Protocol.stream` to refer to this stream, and updating this 1656 stream's :attr:`Stream.protocol` to the refer to the protocol. Any 1657 prior protocol's :attr:`Protocol.stream` is set to :data:`None`. 1658 """ 1659 if self.protocol: 1660 self.protocol.stream = None 1661 self.protocol = protocol 1662 self.protocol.stream = self 1663 1664 def accept(self, rfp, wfp): 1665 """ 1666 Attach a pair of file objects to :attr:`receive_side` and 1667 :attr:`transmit_side`, after wrapping them in :class:`Side` instances. 1668 :class:`Side` will call :func:`set_nonblock` and :func:`set_cloexec` 1669 on the underlying file descriptors during construction. 1670 1671 The same file object may be used for both sides. The default 1672 :meth:`on_disconnect` is handles the possibility that only one 1673 descriptor may need to be closed. 1674 1675 :param file rfp: 1676 The file object to receive from. 1677 :param file wfp: 1678 The file object to transmit to. 1679 """ 1680 self.receive_side = Side(self, rfp) 1681 self.transmit_side = Side(self, wfp) 1682 1683 def __repr__(self): 1684 return "<Stream %s #%04x>" % (self.name, id(self) & 0xffff,) 1685 1686 def on_receive(self, broker): 1687 """ 1688 Invoked by :class:`Broker` when the stream's :attr:`receive_side` has 1689 been marked readable using :meth:`Broker.start_receive` and the broker 1690 has detected the associated file descriptor is ready for reading. 1691 1692 Subclasses must implement this if they are registered using 1693 :meth:`Broker.start_receive`, and the method must invoke 1694 :meth:`on_disconnect` if reading produces an empty string. 1695 1696 The default implementation reads :attr:`Protocol.read_size` bytes and 1697 passes the resulting bytestring to :meth:`Protocol.on_receive`. If the 1698 bytestring is 0 bytes, invokes :meth:`on_disconnect` instead. 1699 """ 1700 buf = self.receive_side.read(self.protocol.read_size) 1701 if not buf: 1702 LOG.debug('%r: empty read, disconnecting', self.receive_side) 1703 return self.on_disconnect(broker) 1704 1705 self.protocol.on_receive(broker, buf) 1706 1707 def on_transmit(self, broker): 1708 """ 1709 Invoked by :class:`Broker` when the stream's :attr:`transmit_side` has 1710 been marked writeable using :meth:`Broker._start_transmit` and the 1711 broker has detected the associated file descriptor is ready for 1712 writing. 1713 1714 Subclasses must implement they are ever registerd with 1715 :meth:`Broker._start_transmit`. 1716 1717 The default implementation invokes :meth:`Protocol.on_transmit`. 1718 """ 1719 self.protocol.on_transmit(broker) 1720 1721 def on_shutdown(self, broker): 1722 """ 1723 Invoked by :meth:`Broker.shutdown` to allow the stream time to 1724 gracefully shutdown. 1725 1726 The default implementation emits a ``shutdown`` signal before 1727 invoking :meth:`on_disconnect`. 1728 """ 1729 fire(self, 'shutdown') 1730 self.protocol.on_shutdown(broker) 1731 1732 def on_disconnect(self, broker): 1733 """ 1734 Invoked by :class:`Broker` to force disconnect the stream during 1735 shutdown, invoked by the default :meth:`on_shutdown` implementation, 1736 and usually invoked by any subclass :meth:`on_receive` implementation 1737 in response to a 0-byte read. 1738 1739 The base implementation fires a ``disconnect`` event, then closes 1740 :attr:`receive_side` and :attr:`transmit_side` after unregistering the 1741 stream from the broker. 1742 """ 1743 fire(self, 'disconnect') 1744 self.protocol.on_disconnect(broker) 1745 1746 1747class Protocol(object): 1748 """ 1749 Implement the program behaviour associated with activity on a 1750 :class:`Stream`. The protocol in use may vary over a stream's life, for 1751 example to allow :class:`mitogen.parent.BootstrapProtocol` to initialize 1752 the connected child before handing it off to :class:`MitogenProtocol`. A 1753 stream's active protocol is tracked in the :attr:`Stream.protocol` 1754 attribute, and modified via :meth:`Stream.set_protocol`. 1755 1756 Protocols do not handle IO, they are entirely reliant on the interface 1757 provided by :class:`Stream` and :class:`Side`, allowing the underlying IO 1758 implementation to be replaced without modifying behavioural logic. 1759 """ 1760 stream_class = Stream 1761 1762 #: The :class:`Stream` this protocol is currently bound to, or 1763 #: :data:`None`. 1764 stream = None 1765 1766 #: The size of the read buffer used by :class:`Stream` when this is the 1767 #: active protocol for the stream. 1768 read_size = CHUNK_SIZE 1769 1770 @classmethod 1771 def build_stream(cls, *args, **kwargs): 1772 stream = cls.stream_class() 1773 stream.set_protocol(cls(*args, **kwargs)) 1774 return stream 1775 1776 def __repr__(self): 1777 return '%s(%s)' % ( 1778 self.__class__.__name__, 1779 self.stream and self.stream.name, 1780 ) 1781 1782 def on_shutdown(self, broker): 1783 _v and LOG.debug('%r: shutting down', self) 1784 self.stream.on_disconnect(broker) 1785 1786 def on_disconnect(self, broker): 1787 # Normally both sides an FD, so it is important that tranmit_side is 1788 # deregistered from Poller before closing the receive side, as pollers 1789 # like epoll and kqueue unregister all events on FD close, causing 1790 # subsequent attempt to unregister the transmit side to fail. 1791 LOG.debug('%r: disconnecting', self) 1792 broker.stop_receive(self.stream) 1793 if self.stream.transmit_side: 1794 broker._stop_transmit(self.stream) 1795 1796 self.stream.receive_side.close() 1797 if self.stream.transmit_side: 1798 self.stream.transmit_side.close() 1799 1800 1801class DelimitedProtocol(Protocol): 1802 """ 1803 Provide a :meth:`Protocol.on_receive` implementation for protocols that are 1804 delimited by a fixed string, like text based protocols. Each message is 1805 passed to :meth:`on_line_received` as it arrives, with incomplete messages 1806 passed to :meth:`on_partial_line_received`. 1807 1808 When emulating user input it is often necessary to respond to incomplete 1809 lines, such as when a "Password: " prompt is sent. 1810 :meth:`on_partial_line_received` may be called repeatedly with an 1811 increasingly complete message. When a complete message is finally received, 1812 :meth:`on_line_received` will be called once for it before the buffer is 1813 discarded. 1814 1815 If :func:`on_line_received` returns :data:`False`, remaining data is passed 1816 unprocessed to the stream's current protocol's :meth:`on_receive`. This 1817 allows switching from line-oriented to binary while the input buffer 1818 contains both kinds of data. 1819 """ 1820 #: The delimiter. Defaults to newline. 1821 delimiter = b('\n') 1822 _trailer = b('') 1823 1824 def on_receive(self, broker, buf): 1825 _vv and IOLOG.debug('%r.on_receive()', self) 1826 stream = self.stream 1827 self._trailer, cont = mitogen.core.iter_split( 1828 buf=self._trailer + buf, 1829 delim=self.delimiter, 1830 func=self.on_line_received, 1831 ) 1832 1833 if self._trailer: 1834 if cont: 1835 self.on_partial_line_received(self._trailer) 1836 else: 1837 assert stream.protocol is not self, \ 1838 'stream protocol is no longer %r' % (self,) 1839 stream.protocol.on_receive(broker, self._trailer) 1840 1841 def on_line_received(self, line): 1842 """ 1843 Receive a line from the stream. 1844 1845 :param bytes line: 1846 The encoded line, excluding the delimiter. 1847 :returns: 1848 :data:`False` to indicate this invocation modified the stream's 1849 active protocol, and any remaining buffered data should be passed 1850 to the new protocol's :meth:`on_receive` method. 1851 1852 Any other return value is ignored. 1853 """ 1854 pass 1855 1856 def on_partial_line_received(self, line): 1857 """ 1858 Receive a trailing unterminated partial line from the stream. 1859 1860 :param bytes line: 1861 The encoded partial line. 1862 """ 1863 pass 1864 1865 1866class BufferedWriter(object): 1867 """ 1868 Implement buffered output while avoiding quadratic string operations. This 1869 is currently constructed by each protocol, in future it may become fixed 1870 for each stream instead. 1871 """ 1872 def __init__(self, broker, protocol): 1873 self._broker = broker 1874 self._protocol = protocol 1875 self._buf = collections.deque() 1876 self._len = 0 1877 1878 def write(self, s): 1879 """ 1880 Transmit `s` immediately, falling back to enqueuing it and marking the 1881 stream writeable if no OS buffer space is available. 1882 """ 1883 if not self._len: 1884 # Modifying epoll/Kqueue state is expensive, as are needless broker 1885 # loops. Rather than wait for writeability, just write immediately, 1886 # and fall back to the broker loop on error or full buffer. 1887 try: 1888 n = self._protocol.stream.transmit_side.write(s) 1889 if n: 1890 if n == len(s): 1891 return 1892 s = s[n:] 1893 except OSError: 1894 pass 1895 1896 self._broker._start_transmit(self._protocol.stream) 1897 self._buf.append(s) 1898 self._len += len(s) 1899 1900 def on_transmit(self, broker): 1901 """ 1902 Respond to stream writeability by retrying previously buffered 1903 :meth:`write` calls. 1904 """ 1905 if self._buf: 1906 buf = self._buf.popleft() 1907 written = self._protocol.stream.transmit_side.write(buf) 1908 if not written: 1909 _v and LOG.debug('disconnected during write to %r', self) 1910 self._protocol.stream.on_disconnect(broker) 1911 return 1912 elif written != len(buf): 1913 self._buf.appendleft(BufferType(buf, written)) 1914 1915 _vv and IOLOG.debug('transmitted %d bytes to %r', written, self) 1916 self._len -= written 1917 1918 if not self._buf: 1919 broker._stop_transmit(self._protocol.stream) 1920 1921 1922class Side(object): 1923 """ 1924 Represent one side of a :class:`Stream`. This allows unidirectional (e.g. 1925 pipe) and bidirectional (e.g. socket) streams to operate identically. 1926 1927 Sides are also responsible for tracking the open/closed state of the 1928 underlying FD, preventing erroneous duplicate calls to :func:`os.close` due 1929 to duplicate :meth:`Stream.on_disconnect` calls, which would otherwise risk 1930 silently succeeding by closing an unrelated descriptor. For this reason, it 1931 is crucial only one file object exists per unique descriptor. 1932 1933 :param mitogen.core.Stream stream: 1934 The stream this side is associated with. 1935 :param object fp: 1936 The file or socket object managing the underlying file descriptor. Any 1937 object may be used that supports `fileno()` and `close()` methods. 1938 :param bool cloexec: 1939 If :data:`True`, the descriptor has its :data:`fcntl.FD_CLOEXEC` flag 1940 enabled using :func:`fcntl.fcntl`. 1941 :param bool keep_alive: 1942 If :data:`True`, the continued existence of this side will extend the 1943 shutdown grace period until it has been unregistered from the broker. 1944 :param bool blocking: 1945 If :data:`False`, the descriptor has its :data:`os.O_NONBLOCK` flag 1946 enabled using :func:`fcntl.fcntl`. 1947 """ 1948 _fork_refs = weakref.WeakValueDictionary() 1949 closed = False 1950 1951 def __init__(self, stream, fp, cloexec=True, keep_alive=True, blocking=False): 1952 #: The :class:`Stream` for which this is a read or write side. 1953 self.stream = stream 1954 # File or socket object responsible for the lifetime of its underlying 1955 # file descriptor. 1956 self.fp = fp 1957 #: Integer file descriptor to perform IO on, or :data:`None` if 1958 #: :meth:`close` has been called. This is saved separately from the 1959 #: file object, since :meth:`file.fileno` cannot be called on it after 1960 #: it has been closed. 1961 self.fd = fp.fileno() 1962 #: If :data:`True`, causes presence of this side in 1963 #: :class:`Broker`'s active reader set to defer shutdown until the 1964 #: side is disconnected. 1965 self.keep_alive = keep_alive 1966 self._fork_refs[id(self)] = self 1967 if cloexec: 1968 set_cloexec(self.fd) 1969 if not blocking: 1970 set_nonblock(self.fd) 1971 1972 def __repr__(self): 1973 return '<Side of %s fd %s>' % ( 1974 self.stream.name or repr(self.stream), 1975 self.fd 1976 ) 1977 1978 @classmethod 1979 def _on_fork(cls): 1980 while cls._fork_refs: 1981 _, side = cls._fork_refs.popitem() 1982 _vv and IOLOG.debug('Side._on_fork() closing %r', side) 1983 side.close() 1984 1985 def close(self): 1986 """ 1987 Call :meth:`file.close` on :attr:`fp` if it is not :data:`None`, 1988 then set it to :data:`None`. 1989 """ 1990 _vv and IOLOG.debug('%r.close()', self) 1991 if not self.closed: 1992 self.closed = True 1993 self.fp.close() 1994 1995 def read(self, n=CHUNK_SIZE): 1996 """ 1997 Read up to `n` bytes from the file descriptor, wrapping the underlying 1998 :func:`os.read` call with :func:`io_op` to trap common disconnection 1999 conditions. 2000 2001 :meth:`read` always behaves as if it is reading from a regular UNIX 2002 file; socket, pipe, and TTY disconnection errors are masked and result 2003 in a 0-sized read like a regular file. 2004 2005 :returns: 2006 Bytes read, or the empty string to indicate disconnection was 2007 detected. 2008 """ 2009 if self.closed: 2010 # Refuse to touch the handle after closed, it may have been reused 2011 # by another thread. TODO: synchronize read()/write()/close(). 2012 return b('') 2013 s, disconnected = io_op(os.read, self.fd, n) 2014 if disconnected: 2015 LOG.debug('%r: disconnected during read: %s', self, disconnected) 2016 return b('') 2017 return s 2018 2019 def write(self, s): 2020 """ 2021 Write as much of the bytes from `s` as possible to the file descriptor, 2022 wrapping the underlying :func:`os.write` call with :func:`io_op` to 2023 trap common disconnection conditions. 2024 2025 :returns: 2026 Number of bytes written, or :data:`None` if disconnection was 2027 detected. 2028 """ 2029 if self.closed: 2030 # Don't touch the handle after close, it may be reused elsewhere. 2031 return None 2032 2033 written, disconnected = io_op(os.write, self.fd, s) 2034 if disconnected: 2035 LOG.debug('%r: disconnected during write: %s', self, disconnected) 2036 return None 2037 return written 2038 2039 2040class MitogenProtocol(Protocol): 2041 """ 2042 :class:`Protocol` implementing mitogen's :ref:`stream protocol 2043 <stream-protocol>`. 2044 """ 2045 #: If not :data:`False`, indicates the stream has :attr:`auth_id` set and 2046 #: its value is the same as :data:`mitogen.context_id` or appears in 2047 #: :data:`mitogen.parent_ids`. 2048 is_privileged = False 2049 2050 #: Invoked as `on_message(stream, msg)` each message received from the 2051 #: peer. 2052 on_message = None 2053 2054 def __init__(self, router, remote_id, auth_id=None, 2055 local_id=None, parent_ids=None): 2056 self._router = router 2057 self.remote_id = remote_id 2058 #: If not :data:`None`, :class:`Router` stamps this into 2059 #: :attr:`Message.auth_id` of every message received on this stream. 2060 self.auth_id = auth_id 2061 2062 if parent_ids is None: 2063 parent_ids = mitogen.parent_ids 2064 if local_id is None: 2065 local_id = mitogen.context_id 2066 2067 self.is_privileged = ( 2068 (remote_id in parent_ids) or 2069 auth_id in ([local_id] + parent_ids) 2070 ) 2071 self.sent_modules = set(['mitogen', 'mitogen.core']) 2072 self._input_buf = collections.deque() 2073 self._input_buf_len = 0 2074 self._writer = BufferedWriter(router.broker, self) 2075 2076 #: Routing records the dst_id of every message arriving from this 2077 #: stream. Any arriving DEL_ROUTE is rebroadcast for any such ID. 2078 self.egress_ids = set() 2079 2080 def on_receive(self, broker, buf): 2081 """ 2082 Handle the next complete message on the stream. Raise 2083 :class:`StreamError` on failure. 2084 """ 2085 _vv and IOLOG.debug('%r.on_receive()', self) 2086 if self._input_buf and self._input_buf_len < 128: 2087 self._input_buf[0] += buf 2088 else: 2089 self._input_buf.append(buf) 2090 2091 self._input_buf_len += len(buf) 2092 while self._receive_one(broker): 2093 pass 2094 2095 corrupt_msg = ( 2096 '%s: Corruption detected: frame signature incorrect. This likely means' 2097 ' some external process is interfering with the connection. Received:' 2098 '\n\n' 2099 '%r' 2100 ) 2101 2102 def _receive_one(self, broker): 2103 if self._input_buf_len < Message.HEADER_LEN: 2104 return False 2105 2106 msg = Message() 2107 msg.router = self._router 2108 (magic, msg.dst_id, msg.src_id, msg.auth_id, 2109 msg.handle, msg.reply_to, msg_len) = struct.unpack( 2110 Message.HEADER_FMT, 2111 self._input_buf[0][:Message.HEADER_LEN], 2112 ) 2113 2114 if magic != Message.HEADER_MAGIC: 2115 LOG.error(self.corrupt_msg, self.stream.name, self._input_buf[0][:2048]) 2116 self.stream.on_disconnect(broker) 2117 return False 2118 2119 if msg_len > self._router.max_message_size: 2120 LOG.error('%r: Maximum message size exceeded (got %d, max %d)', 2121 self, msg_len, self._router.max_message_size) 2122 self.stream.on_disconnect(broker) 2123 return False 2124 2125 total_len = msg_len + Message.HEADER_LEN 2126 if self._input_buf_len < total_len: 2127 _vv and IOLOG.debug( 2128 '%r: Input too short (want %d, got %d)', 2129 self, msg_len, self._input_buf_len - Message.HEADER_LEN 2130 ) 2131 return False 2132 2133 start = Message.HEADER_LEN 2134 prev_start = start 2135 remain = total_len 2136 bits = [] 2137 while remain: 2138 buf = self._input_buf.popleft() 2139 bit = buf[start:remain] 2140 bits.append(bit) 2141 remain -= len(bit) + start 2142 prev_start = start 2143 start = 0 2144 2145 msg.data = b('').join(bits) 2146 self._input_buf.appendleft(buf[prev_start+len(bit):]) 2147 self._input_buf_len -= total_len 2148 self._router._async_route(msg, self.stream) 2149 return True 2150 2151 def pending_bytes(self): 2152 """ 2153 Return the number of bytes queued for transmission on this stream. This 2154 can be used to limit the amount of data buffered in RAM by an otherwise 2155 unlimited consumer. 2156 2157 For an accurate result, this method should be called from the Broker 2158 thread, for example by using :meth:`Broker.defer_sync`. 2159 """ 2160 return self._writer._len 2161 2162 def on_transmit(self, broker): 2163 """ 2164 Transmit buffered messages. 2165 """ 2166 _vv and IOLOG.debug('%r.on_transmit()', self) 2167 self._writer.on_transmit(broker) 2168 2169 def _send(self, msg): 2170 _vv and IOLOG.debug('%r._send(%r)', self, msg) 2171 self._writer.write(msg.pack()) 2172 2173 def send(self, msg): 2174 """ 2175 Send `data` to `handle`, and tell the broker we have output. May be 2176 called from any thread. 2177 """ 2178 self._router.broker.defer(self._send, msg) 2179 2180 def on_shutdown(self, broker): 2181 """ 2182 Disable :class:`Protocol` immediate disconnect behaviour. 2183 """ 2184 _v and LOG.debug('%r: shutting down', self) 2185 2186 2187class Context(object): 2188 """ 2189 Represent a remote context regardless of the underlying connection method. 2190 Context objects are simple facades that emit messages through an 2191 associated router, and have :ref:`signals` raised against them in response 2192 to various events relating to the context. 2193 2194 **Note:** This is the somewhat limited core version, used by child 2195 contexts. The master subclass is documented below this one. 2196 2197 Contexts maintain no internal state and are thread-safe. 2198 2199 Prefer :meth:`Router.context_by_id` over constructing context objects 2200 explicitly, as that method is deduplicating, and returns the only context 2201 instance :ref:`signals` will be raised on. 2202 2203 :param mitogen.core.Router router: 2204 Router to emit messages through. 2205 :param int context_id: 2206 Context ID. 2207 :param str name: 2208 Context name. 2209 """ 2210 name = None 2211 remote_name = None 2212 2213 def __init__(self, router, context_id, name=None): 2214 self.router = router 2215 self.context_id = context_id 2216 if name: 2217 self.name = to_text(name) 2218 2219 def __reduce__(self): 2220 return _unpickle_context, (self.context_id, self.name) 2221 2222 def on_disconnect(self): 2223 _v and LOG.debug('%r: disconnecting', self) 2224 fire(self, 'disconnect') 2225 2226 def send_async(self, msg, persist=False): 2227 """ 2228 Arrange for `msg` to be delivered to this context, with replies 2229 directed to a newly constructed receiver. :attr:`dst_id 2230 <Message.dst_id>` is set to the target context ID, and :attr:`reply_to 2231 <Message.reply_to>` is set to the newly constructed receiver's handle. 2232 2233 :param bool persist: 2234 If :data:`False`, the handler will be unregistered after a single 2235 message has been received. 2236 2237 :param mitogen.core.Message msg: 2238 The message. 2239 2240 :returns: 2241 :class:`Receiver` configured to receive any replies sent to the 2242 message's `reply_to` handle. 2243 """ 2244 receiver = Receiver(self.router, persist=persist, respondent=self) 2245 msg.dst_id = self.context_id 2246 msg.reply_to = receiver.handle 2247 2248 _v and LOG.debug('sending message to %r: %r', self, msg) 2249 self.send(msg) 2250 return receiver 2251 2252 def call_service_async(self, service_name, method_name, **kwargs): 2253 if isinstance(service_name, BytesType): 2254 service_name = service_name.encode('utf-8') 2255 elif not isinstance(service_name, UnicodeType): 2256 service_name = service_name.name() # Service.name() 2257 _v and LOG.debug('calling service %s.%s of %r, args: %r', 2258 service_name, method_name, self, kwargs) 2259 tup = (service_name, to_text(method_name), Kwargs(kwargs)) 2260 msg = Message.pickled(tup, handle=CALL_SERVICE) 2261 return self.send_async(msg) 2262 2263 def send(self, msg): 2264 """ 2265 Arrange for `msg` to be delivered to this context. :attr:`dst_id 2266 <Message.dst_id>` is set to the target context ID. 2267 2268 :param Message msg: 2269 Message. 2270 """ 2271 msg.dst_id = self.context_id 2272 self.router.route(msg) 2273 2274 def call_service(self, service_name, method_name, **kwargs): 2275 recv = self.call_service_async(service_name, method_name, **kwargs) 2276 return recv.get().unpickle() 2277 2278 def send_await(self, msg, deadline=None): 2279 """ 2280 Like :meth:`send_async`, but expect a single reply (`persist=False`) 2281 delivered within `deadline` seconds. 2282 2283 :param mitogen.core.Message msg: 2284 The message. 2285 :param float deadline: 2286 If not :data:`None`, seconds before timing out waiting for a reply. 2287 :returns: 2288 Deserialized reply. 2289 :raises TimeoutError: 2290 No message was received and `deadline` passed. 2291 """ 2292 receiver = self.send_async(msg) 2293 response = receiver.get(deadline) 2294 data = response.unpickle() 2295 _vv and IOLOG.debug('%r._send_await() -> %r', self, data) 2296 return data 2297 2298 def __repr__(self): 2299 return 'Context(%s, %r)' % (self.context_id, self.name) 2300 2301 2302def _unpickle_context(context_id, name, router=None): 2303 if not (isinstance(context_id, (int, long)) and context_id >= 0 and ( 2304 (name is None) or 2305 (isinstance(name, UnicodeType) and len(name) < 100)) 2306 ): 2307 raise TypeError('cannot unpickle Context: bad input') 2308 2309 if isinstance(router, Router): 2310 return router.context_by_id(context_id, name=name) 2311 return Context(None, context_id, name) # For plain Jane pickle. 2312 2313 2314class Poller(object): 2315 """ 2316 A poller manages OS file descriptors the user is waiting to become 2317 available for IO. The :meth:`poll` method blocks the calling thread 2318 until one or more become ready. The default implementation is based on 2319 :func:`select.poll`. 2320 2321 Each descriptor has an associated `data` element, which is unique for each 2322 readiness type, and defaults to being the same as the file descriptor. The 2323 :meth:`poll` method yields the data associated with a descriptor, rather 2324 than the descriptor itself, allowing concise loops like:: 2325 2326 p = Poller() 2327 p.start_receive(conn.fd, data=conn.on_read) 2328 p.start_transmit(conn.fd, data=conn.on_write) 2329 2330 for callback in p.poll(): 2331 callback() # invoke appropriate bound instance method 2332 2333 Pollers may be modified while :meth:`poll` is yielding results. Removals 2334 are processed immediately, causing pending events for the descriptor to be 2335 discarded. 2336 2337 The :meth:`close` method must be called when a poller is discarded to avoid 2338 a resource leak. 2339 2340 Pollers may only be used by one thread at a time. 2341 """ 2342 SUPPORTED = True 2343 2344 # This changed from select() to poll() in Mitogen 0.2.4. Since poll() has 2345 # no upper FD limit, it is suitable for use with Latch, which must handle 2346 # FDs larger than select's limit during many-host runs. We want this 2347 # because poll() requires no setup and teardown: just a single system call, 2348 # which is important because Latch.get() creates a Poller on each 2349 # invocation. In a microbenchmark, poll() vs. epoll_ctl() is 30% faster in 2350 # this scenario. If select() must return in future, it is important 2351 # Latch.poller_class is set from parent.py to point to the industrial 2352 # strength poller for the OS, otherwise Latch will fail randomly. 2353 2354 #: Increments on every poll(). Used to version _rfds and _wfds. 2355 _generation = 1 2356 2357 def __init__(self): 2358 self._rfds = {} 2359 self._wfds = {} 2360 2361 def __repr__(self): 2362 return '%s' % (type(self).__name__,) 2363 2364 def _update(self, fd): 2365 """ 2366 Required by PollPoller subclass. 2367 """ 2368 pass 2369 2370 @property 2371 def readers(self): 2372 """ 2373 Return a list of `(fd, data)` tuples for every FD registered for 2374 receive readiness. 2375 """ 2376 return list((fd, data) for fd, (data, gen) in self._rfds.items()) 2377 2378 @property 2379 def writers(self): 2380 """ 2381 Return a list of `(fd, data)` tuples for every FD registered for 2382 transmit readiness. 2383 """ 2384 return list((fd, data) for fd, (data, gen) in self._wfds.items()) 2385 2386 def close(self): 2387 """ 2388 Close any underlying OS resource used by the poller. 2389 """ 2390 pass 2391 2392 def start_receive(self, fd, data=None): 2393 """ 2394 Cause :meth:`poll` to yield `data` when `fd` is readable. 2395 """ 2396 self._rfds[fd] = (data or fd, self._generation) 2397 self._update(fd) 2398 2399 def stop_receive(self, fd): 2400 """ 2401 Stop yielding readability events for `fd`. 2402 2403 Redundant calls to :meth:`stop_receive` are silently ignored, this may 2404 change in future. 2405 """ 2406 self._rfds.pop(fd, None) 2407 self._update(fd) 2408 2409 def start_transmit(self, fd, data=None): 2410 """ 2411 Cause :meth:`poll` to yield `data` when `fd` is writeable. 2412 """ 2413 self._wfds[fd] = (data or fd, self._generation) 2414 self._update(fd) 2415 2416 def stop_transmit(self, fd): 2417 """ 2418 Stop yielding writeability events for `fd`. 2419 2420 Redundant calls to :meth:`stop_transmit` are silently ignored, this may 2421 change in future. 2422 """ 2423 self._wfds.pop(fd, None) 2424 self._update(fd) 2425 2426 def _poll(self, timeout): 2427 (rfds, wfds, _), _ = io_op(select.select, 2428 self._rfds, 2429 self._wfds, 2430 (), timeout 2431 ) 2432 2433 for fd in rfds: 2434 _vv and IOLOG.debug('%r: POLLIN for %r', self, fd) 2435 data, gen = self._rfds.get(fd, (None, None)) 2436 if gen and gen < self._generation: 2437 yield data 2438 2439 for fd in wfds: 2440 _vv and IOLOG.debug('%r: POLLOUT for %r', self, fd) 2441 data, gen = self._wfds.get(fd, (None, None)) 2442 if gen and gen < self._generation: 2443 yield data 2444 2445 def poll(self, timeout=None): 2446 """ 2447 Block the calling thread until one or more FDs are ready for IO. 2448 2449 :param float timeout: 2450 If not :data:`None`, seconds to wait without an event before 2451 returning an empty iterable. 2452 :returns: 2453 Iterable of `data` elements associated with ready FDs. 2454 """ 2455 _vv and IOLOG.debug('%r.poll(%r)', self, timeout) 2456 self._generation += 1 2457 return self._poll(timeout) 2458 2459 2460class Latch(object): 2461 """ 2462 A latch is a :class:`Queue.Queue`-like object that supports mutation and 2463 waiting from multiple threads, however unlike :class:`Queue.Queue`, 2464 waiting threads always remain interruptible, so CTRL+C always succeeds, and 2465 waits where a timeout is set experience no wake up latency. These 2466 properties are not possible in combination using the built-in threading 2467 primitives available in Python 2.x. 2468 2469 Latches implement queues using the UNIX self-pipe trick, and a per-thread 2470 :func:`socket.socketpair` that is lazily created the first time any 2471 latch attempts to sleep on a thread, and dynamically associated with the 2472 waiting Latch only for duration of the wait. 2473 2474 See :ref:`waking-sleeping-threads` for further discussion. 2475 """ 2476 #: The :class:`Poller` implementation to use for waiting. Since the poller 2477 #: will be very short-lived, we prefer :class:`mitogen.parent.PollPoller` 2478 #: if it is available, or :class:`mitogen.core.Poller` otherwise, since 2479 #: these implementations require no system calls to create, configure or 2480 #: destroy. 2481 poller_class = Poller 2482 2483 #: If not :data:`None`, a function invoked as `notify(latch)` after a 2484 #: successful call to :meth:`put`. The function is invoked on the 2485 #: :meth:`put` caller's thread, which may be the :class:`Broker` thread, 2486 #: therefore it must not block. Used by :class:`mitogen.select.Select` to 2487 #: efficiently implement waiting on multiple event sources. 2488 notify = None 2489 2490 # The _cls_ prefixes here are to make it crystal clear in the code which 2491 # state mutation isn't covered by :attr:`_lock`. 2492 2493 #: List of reusable :func:`socket.socketpair` tuples. The list is mutated 2494 #: from multiple threads, the only safe operations are `append()` and 2495 #: `pop()`. 2496 _cls_idle_socketpairs = [] 2497 2498 #: List of every socket object that must be closed by :meth:`_on_fork`. 2499 #: Inherited descriptors cannot be reused, as the duplicated handles 2500 #: reference the same underlying kernel object in use by the parent. 2501 _cls_all_sockets = [] 2502 2503 def __init__(self): 2504 self.closed = False 2505 self._lock = threading.Lock() 2506 #: List of unconsumed enqueued items. 2507 self._queue = [] 2508 #: List of `(wsock, cookie)` awaiting an element, where `wsock` is the 2509 #: socketpair's write side, and `cookie` is the string to write. 2510 self._sleeping = [] 2511 #: Number of elements of :attr:`_sleeping` that have already been 2512 #: woken, and have a corresponding element index from :attr:`_queue` 2513 #: assigned to them. 2514 self._waking = 0 2515 2516 @classmethod 2517 def _on_fork(cls): 2518 """ 2519 Clean up any files belonging to the parent process after a fork. 2520 """ 2521 cls._cls_idle_socketpairs = [] 2522 while cls._cls_all_sockets: 2523 cls._cls_all_sockets.pop().close() 2524 2525 def close(self): 2526 """ 2527 Mark the latch as closed, and cause every sleeping thread to be woken, 2528 with :class:`mitogen.core.LatchError` raised in each thread. 2529 """ 2530 self._lock.acquire() 2531 try: 2532 self.closed = True 2533 while self._waking < len(self._sleeping): 2534 wsock, cookie = self._sleeping[self._waking] 2535 self._wake(wsock, cookie) 2536 self._waking += 1 2537 finally: 2538 self._lock.release() 2539 2540 def size(self): 2541 """ 2542 Return the number of items currently buffered. 2543 2544 As with :class:`Queue.Queue`, `0` may be returned even though a 2545 subsequent call to :meth:`get` will succeed, since a message may be 2546 posted at any moment between :meth:`size` and :meth:`get`. 2547 2548 As with :class:`Queue.Queue`, `>0` may be returned even though a 2549 subsequent call to :meth:`get` will block, since another waiting thread 2550 may be woken at any moment between :meth:`size` and :meth:`get`. 2551 2552 :raises LatchError: 2553 The latch has already been marked closed. 2554 """ 2555 self._lock.acquire() 2556 try: 2557 if self.closed: 2558 raise LatchError() 2559 return len(self._queue) 2560 finally: 2561 self._lock.release() 2562 2563 def empty(self): 2564 """ 2565 Return `size() == 0`. 2566 2567 .. deprecated:: 0.2.8 2568 Use :meth:`size` instead. 2569 2570 :raises LatchError: 2571 The latch has already been marked closed. 2572 """ 2573 return self.size() == 0 2574 2575 def _get_socketpair(self): 2576 """ 2577 Return an unused socketpair, creating one if none exist. 2578 """ 2579 try: 2580 return self._cls_idle_socketpairs.pop() # pop() must be atomic 2581 except IndexError: 2582 rsock, wsock = socket.socketpair() 2583 rsock.setblocking(False) 2584 set_cloexec(rsock.fileno()) 2585 set_cloexec(wsock.fileno()) 2586 self._cls_all_sockets.extend((rsock, wsock)) 2587 return rsock, wsock 2588 2589 COOKIE_MAGIC, = struct.unpack('L', b('LTCH') * (struct.calcsize('L')//4)) 2590 COOKIE_FMT = '>Qqqq' # #545: id() and get_ident() may exceed long on armhfp. 2591 COOKIE_SIZE = struct.calcsize(COOKIE_FMT) 2592 2593 def _make_cookie(self): 2594 """ 2595 Return a string encoding the ID of the process, instance and thread. 2596 This disambiguates legitimate wake-ups, accidental writes to the FD, 2597 and buggy internal FD sharing. 2598 """ 2599 return struct.pack(self.COOKIE_FMT, self.COOKIE_MAGIC, 2600 os.getpid(), id(self), thread.get_ident()) 2601 2602 def get(self, timeout=None, block=True): 2603 """ 2604 Return the next enqueued object, or sleep waiting for one. 2605 2606 :param float timeout: 2607 If not :data:`None`, specifies a timeout in seconds. 2608 2609 :param bool block: 2610 If :data:`False`, immediately raise 2611 :class:`mitogen.core.TimeoutError` if the latch is empty. 2612 2613 :raises mitogen.core.LatchError: 2614 :meth:`close` has been called, and the object is no longer valid. 2615 2616 :raises mitogen.core.TimeoutError: 2617 Timeout was reached. 2618 2619 :returns: 2620 The de-queued object. 2621 """ 2622 _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', 2623 self, timeout, block) 2624 self._lock.acquire() 2625 try: 2626 if self.closed: 2627 raise LatchError() 2628 i = len(self._sleeping) 2629 if len(self._queue) > i: 2630 _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[i]) 2631 return self._queue.pop(i) 2632 if not block: 2633 raise TimeoutError() 2634 rsock, wsock = self._get_socketpair() 2635 cookie = self._make_cookie() 2636 self._sleeping.append((wsock, cookie)) 2637 finally: 2638 self._lock.release() 2639 2640 poller = self.poller_class() 2641 poller.start_receive(rsock.fileno()) 2642 try: 2643 return self._get_sleep(poller, timeout, block, rsock, wsock, cookie) 2644 finally: 2645 poller.close() 2646 2647 def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie): 2648 """ 2649 When a result is not immediately available, sleep waiting for 2650 :meth:`put` to write a byte to our socket pair. 2651 """ 2652 _vv and IOLOG.debug( 2653 '%r._get_sleep(timeout=%r, block=%r, fd=%d/%d)', 2654 self, timeout, block, rsock.fileno(), wsock.fileno() 2655 ) 2656 2657 e = None 2658 try: 2659 list(poller.poll(timeout)) 2660 except Exception: 2661 e = sys.exc_info()[1] 2662 2663 self._lock.acquire() 2664 try: 2665 i = self._sleeping.index((wsock, cookie)) 2666 del self._sleeping[i] 2667 2668 try: 2669 got_cookie = rsock.recv(self.COOKIE_SIZE) 2670 except socket.error: 2671 e2 = sys.exc_info()[1] 2672 if e2.args[0] == errno.EAGAIN: 2673 e = TimeoutError() 2674 else: 2675 e = e2 2676 2677 self._cls_idle_socketpairs.append((rsock, wsock)) 2678 if e: 2679 raise e 2680 2681 assert cookie == got_cookie, ( 2682 "Cookie incorrect; got %r, expected %r" \ 2683 % (binascii.hexlify(got_cookie), 2684 binascii.hexlify(cookie)) 2685 ) 2686 assert i < self._waking, ( 2687 "Cookie correct, but no queue element assigned." 2688 ) 2689 self._waking -= 1 2690 if self.closed: 2691 raise LatchError() 2692 _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i]) 2693 return self._queue.pop(i) 2694 finally: 2695 self._lock.release() 2696 2697 def put(self, obj=None): 2698 """ 2699 Enqueue an object, waking the first thread waiting for a result, if one 2700 exists. 2701 2702 :param obj: 2703 Object to enqueue. Defaults to :data:`None` as a convenience when 2704 using :class:`Latch` only for synchronization. 2705 :raises mitogen.core.LatchError: 2706 :meth:`close` has been called, and the object is no longer valid. 2707 """ 2708 _vv and IOLOG.debug('%r.put(%r)', self, obj) 2709 self._lock.acquire() 2710 try: 2711 if self.closed: 2712 raise LatchError() 2713 self._queue.append(obj) 2714 2715 wsock = None 2716 if self._waking < len(self._sleeping): 2717 wsock, cookie = self._sleeping[self._waking] 2718 self._waking += 1 2719 _vv and IOLOG.debug('%r.put() -> waking wfd=%r', 2720 self, wsock.fileno()) 2721 elif self.notify: 2722 self.notify(self) 2723 finally: 2724 self._lock.release() 2725 2726 if wsock: 2727 self._wake(wsock, cookie) 2728 2729 def _wake(self, wsock, cookie): 2730 written, disconnected = io_op(os.write, wsock.fileno(), cookie) 2731 assert written == len(cookie) and not disconnected 2732 2733 def __repr__(self): 2734 return 'Latch(%#x, size=%d, t=%r)' % ( 2735 id(self), 2736 len(self._queue), 2737 threading.currentThread().getName(), 2738 ) 2739 2740 2741class Waker(Protocol): 2742 """ 2743 :class:`Protocol` implementing the `UNIX self-pipe trick`_. Used to wake 2744 :class:`Broker` when another thread needs to modify its state, by enqueing 2745 a function call to run on the :class:`Broker` thread. 2746 2747 .. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html 2748 """ 2749 read_size = 1 2750 broker_ident = None 2751 2752 @classmethod 2753 def build_stream(cls, broker): 2754 stream = super(Waker, cls).build_stream(broker) 2755 stream.accept(*pipe()) 2756 return stream 2757 2758 def __init__(self, broker): 2759 self._broker = broker 2760 self._deferred = collections.deque() 2761 2762 def __repr__(self): 2763 return 'Waker(fd=%r/%r)' % ( 2764 self.stream.receive_side and self.stream.receive_side.fd, 2765 self.stream.transmit_side and self.stream.transmit_side.fd, 2766 ) 2767 2768 @property 2769 def keep_alive(self): 2770 """ 2771 Prevent immediate Broker shutdown while deferred functions remain. 2772 """ 2773 return len(self._deferred) 2774 2775 def on_receive(self, broker, buf): 2776 """ 2777 Drain the pipe and fire callbacks. Since :attr:`_deferred` is 2778 synchronized, :meth:`defer` and :meth:`on_receive` can conspire to 2779 ensure only one byte needs to be pending regardless of queue length. 2780 """ 2781 _vv and IOLOG.debug('%r.on_receive()', self) 2782 while True: 2783 try: 2784 func, args, kwargs = self._deferred.popleft() 2785 except IndexError: 2786 return 2787 2788 try: 2789 func(*args, **kwargs) 2790 except Exception: 2791 LOG.exception('defer() crashed: %r(*%r, **%r)', 2792 func, args, kwargs) 2793 broker.shutdown() 2794 2795 def _wake(self): 2796 """ 2797 Wake the multiplexer by writing a byte. If Broker is midway through 2798 teardown, the FD may already be closed, so ignore EBADF. 2799 """ 2800 try: 2801 self.stream.transmit_side.write(b(' ')) 2802 except OSError: 2803 e = sys.exc_info()[1] 2804 if e.args[0] in (errno.EBADF, errno.EWOULDBLOCK): 2805 raise 2806 2807 broker_shutdown_msg = ( 2808 "An attempt was made to enqueue a message with a Broker that has " 2809 "already exitted. It is likely your program called Broker.shutdown() " 2810 "too early." 2811 ) 2812 2813 def defer(self, func, *args, **kwargs): 2814 """ 2815 Arrange for `func()` to execute on the broker thread. This function 2816 returns immediately without waiting the result of `func()`. Use 2817 :meth:`defer_sync` to block until a result is available. 2818 2819 :raises mitogen.core.Error: 2820 :meth:`defer` was called after :class:`Broker` has begun shutdown. 2821 """ 2822 if thread.get_ident() == self.broker_ident: 2823 _vv and IOLOG.debug('%r.defer() [immediate]', self) 2824 return func(*args, **kwargs) 2825 if self._broker._exitted: 2826 raise Error(self.broker_shutdown_msg) 2827 2828 _vv and IOLOG.debug('%r.defer() [fd=%r]', self, 2829 self.stream.transmit_side.fd) 2830 self._deferred.append((func, args, kwargs)) 2831 self._wake() 2832 2833 2834class IoLoggerProtocol(DelimitedProtocol): 2835 """ 2836 Attached to one end of a socket pair whose other end overwrites one of the 2837 standard ``stdout`` or ``stderr`` file descriptors in a child context. 2838 Received data is split up into lines, decoded as UTF-8 and logged to the 2839 :mod:`logging` package as either the ``stdout`` or ``stderr`` logger. 2840 2841 Logging in child contexts is in turn forwarded to the master process using 2842 :class:`LogHandler`. 2843 """ 2844 @classmethod 2845 def build_stream(cls, name, dest_fd): 2846 """ 2847 Even though the file descriptor `dest_fd` will hold the opposite end of 2848 the socket open, we must keep a separate dup() of it (i.e. wsock) in 2849 case some code decides to overwrite `dest_fd` later, which would 2850 prevent break :meth:`on_shutdown` from calling :meth:`shutdown() 2851 <socket.socket.shutdown>` on it. 2852 """ 2853 rsock, wsock = socket.socketpair() 2854 os.dup2(wsock.fileno(), dest_fd) 2855 stream = super(IoLoggerProtocol, cls).build_stream(name) 2856 stream.name = name 2857 stream.accept(rsock, wsock) 2858 return stream 2859 2860 def __init__(self, name): 2861 self._log = logging.getLogger(name) 2862 # #453: prevent accidental log initialization in a child creating a 2863 # feedback loop. 2864 self._log.propagate = False 2865 self._log.handlers = logging.getLogger().handlers[:] 2866 2867 def on_shutdown(self, broker): 2868 """ 2869 Shut down the write end of the socket, preventing any further writes to 2870 it by this process, or subprocess that inherited it. This allows any 2871 remaining kernel-buffered data to be drained during graceful shutdown 2872 without the buffer continuously refilling due to some out of control 2873 child process. 2874 """ 2875 _v and LOG.debug('%r: shutting down', self) 2876 if not IS_WSL: 2877 # #333: WSL generates invalid readiness indication on shutdown(). 2878 # This modifies the *kernel object* inherited by children, causing 2879 # EPIPE on subsequent writes to any dupped FD in any process. The 2880 # read side can then drain completely of prior buffered data. 2881 self.stream.transmit_side.fp.shutdown(socket.SHUT_WR) 2882 self.stream.transmit_side.close() 2883 2884 def on_line_received(self, line): 2885 """ 2886 Decode the received line as UTF-8 and pass it to the logging framework. 2887 """ 2888 self._log.info('%s', line.decode('utf-8', 'replace')) 2889 2890 2891class Router(object): 2892 """ 2893 Route messages between contexts, and invoke local handlers for messages 2894 addressed to this context. :meth:`Router.route() <route>` straddles the 2895 :class:`Broker` thread and user threads, it is safe to call anywhere. 2896 2897 **Note:** This is the somewhat limited core version of the Router class 2898 used by child contexts. The master subclass is documented below this one. 2899 """ 2900 #: The :class:`mitogen.core.Context` subclass to use when constructing new 2901 #: :class:`Context` objects in :meth:`myself` and :meth:`context_by_id`. 2902 #: Permits :class:`Router` subclasses to extend the :class:`Context` 2903 #: interface, as done in :class:`mitogen.parent.Router`. 2904 context_class = Context 2905 2906 max_message_size = 128 * 1048576 2907 2908 #: When :data:`True`, permit children to only communicate with the current 2909 #: context or a parent of the current context. Routing between siblings or 2910 #: children of parents is prohibited, ensuring no communication is possible 2911 #: between intentionally partitioned networks, such as when a program 2912 #: simultaneously manipulates hosts spread across a corporate and a 2913 #: production network, or production networks that are otherwise 2914 #: air-gapped. 2915 #: 2916 #: Sending a prohibited message causes an error to be logged and a dead 2917 #: message to be sent in reply to the errant message, if that message has 2918 #: ``reply_to`` set. 2919 #: 2920 #: The value of :data:`unidirectional` becomes the default for the 2921 #: :meth:`local() <mitogen.master.Router.local>` `unidirectional` 2922 #: parameter. 2923 unidirectional = False 2924 2925 duplicate_handle_msg = 'cannot register a handle that already exists' 2926 refused_msg = 'refused by policy' 2927 invalid_handle_msg = 'invalid handle' 2928 too_large_msg = 'message too large (max %d bytes)' 2929 respondent_disconnect_msg = 'the respondent Context has disconnected' 2930 broker_exit_msg = 'Broker has exitted' 2931 no_route_msg = 'no route to %r, my ID is %r' 2932 unidirectional_msg = ( 2933 'routing mode prevents forward of message from context %d to ' 2934 'context %d via context %d' 2935 ) 2936 2937 def __init__(self, broker): 2938 self.broker = broker 2939 listen(broker, 'exit', self._on_broker_exit) 2940 self._setup_logging() 2941 2942 self._write_lock = threading.Lock() 2943 #: context ID -> Stream; must hold _write_lock to edit or iterate 2944 self._stream_by_id = {} 2945 #: List of contexts to notify of shutdown; must hold _write_lock 2946 self._context_by_id = {} 2947 self._last_handle = itertools.count(1000) 2948 #: handle -> (persistent?, func(msg)) 2949 self._handle_map = {} 2950 #: Context -> set { handle, .. } 2951 self._handles_by_respondent = {} 2952 self.add_handler(self._on_del_route, DEL_ROUTE) 2953 2954 def __repr__(self): 2955 return 'Router(%r)' % (self.broker,) 2956 2957 def _setup_logging(self): 2958 """ 2959 This is done in the :class:`Router` constructor for historical reasons. 2960 It must be called before ExternalContext logs its first messages, but 2961 after logging has been setup. It must also be called when any router is 2962 constructed for a consumer app. 2963 """ 2964 # Here seems as good a place as any. 2965 global _v, _vv 2966 _v = logging.getLogger().level <= logging.DEBUG 2967 _vv = IOLOG.level <= logging.DEBUG 2968 2969 def _on_del_route(self, msg): 2970 """ 2971 Stub :data:`DEL_ROUTE` handler; fires 'disconnect' events on the 2972 corresponding :attr:`_context_by_id` member. This is replaced by 2973 :class:`mitogen.parent.RouteMonitor` in an upgraded context. 2974 """ 2975 if msg.is_dead: 2976 return 2977 2978 target_id_s, _, name = bytes_partition(msg.data, b(':')) 2979 target_id = int(target_id_s, 10) 2980 LOG.error('%r: deleting route to %s (%d)', 2981 self, to_text(name), target_id) 2982 context = self._context_by_id.get(target_id) 2983 if context: 2984 fire(context, 'disconnect') 2985 else: 2986 LOG.debug('DEL_ROUTE for unknown ID %r: %r', target_id, msg) 2987 2988 def _on_stream_disconnect(self, stream): 2989 notify = [] 2990 self._write_lock.acquire() 2991 try: 2992 for context in list(self._context_by_id.values()): 2993 stream_ = self._stream_by_id.get(context.context_id) 2994 if stream_ is stream: 2995 del self._stream_by_id[context.context_id] 2996 notify.append(context) 2997 finally: 2998 self._write_lock.release() 2999 3000 # Happens outside lock as e.g. RouteMonitor wants the same lock. 3001 for context in notify: 3002 context.on_disconnect() 3003 3004 def _on_broker_exit(self): 3005 """ 3006 Called prior to broker exit, informs callbacks registered with 3007 :meth:`add_handler` the connection is dead. 3008 """ 3009 _v and LOG.debug('%r: broker has exitted', self) 3010 while self._handle_map: 3011 _, (_, func, _, _) = self._handle_map.popitem() 3012 func(Message.dead(self.broker_exit_msg)) 3013 3014 def myself(self): 3015 """ 3016 Return a :class:`Context` referring to the current process. Since 3017 :class:`Context` is serializable, this is convenient to use in remote 3018 function call parameter lists. 3019 """ 3020 return self.context_class( 3021 router=self, 3022 context_id=mitogen.context_id, 3023 name='self', 3024 ) 3025 3026 def context_by_id(self, context_id, via_id=None, create=True, name=None): 3027 """ 3028 Return or construct a :class:`Context` given its ID. An internal 3029 mapping of ID to the canonical :class:`Context` representing that ID, 3030 so that :ref:`signals` can be raised. 3031 3032 This may be called from any thread, lookup and construction are atomic. 3033 3034 :param int context_id: 3035 The context ID to look up. 3036 :param int via_id: 3037 If the :class:`Context` does not already exist, set its 3038 :attr:`Context.via` to the :class:`Context` matching this ID. 3039 :param bool create: 3040 If the :class:`Context` does not already exist, create it. 3041 :param str name: 3042 If the :class:`Context` does not already exist, set its name. 3043 3044 :returns: 3045 :class:`Context`, or return :data:`None` if `create` is 3046 :data:`False` and no :class:`Context` previously existed. 3047 """ 3048 context = self._context_by_id.get(context_id) 3049 if context: 3050 return context 3051 3052 if create and via_id is not None: 3053 via = self.context_by_id(via_id) 3054 else: 3055 via = None 3056 3057 self._write_lock.acquire() 3058 try: 3059 context = self._context_by_id.get(context_id) 3060 if create and not context: 3061 context = self.context_class(self, context_id, name=name) 3062 context.via = via 3063 self._context_by_id[context_id] = context 3064 finally: 3065 self._write_lock.release() 3066 3067 return context 3068 3069 def register(self, context, stream): 3070 """ 3071 Register a newly constructed context and its associated stream, and add 3072 the stream's receive side to the I/O multiplexer. This method remains 3073 public while the design has not yet settled. 3074 """ 3075 _v and LOG.debug('%s: registering %r to stream %r', 3076 self, context, stream) 3077 self._write_lock.acquire() 3078 try: 3079 self._stream_by_id[context.context_id] = stream 3080 self._context_by_id[context.context_id] = context 3081 finally: 3082 self._write_lock.release() 3083 3084 self.broker.start_receive(stream) 3085 listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) 3086 3087 def stream_by_id(self, dst_id): 3088 """ 3089 Return the :class:`Stream` that should be used to communicate with 3090 `dst_id`. If a specific route for `dst_id` is not known, a reference to 3091 the parent context's stream is returned. If the parent is disconnected, 3092 or when running in the master context, return :data:`None` instead. 3093 3094 This can be used from any thread, but its output is only meaningful 3095 from the context of the :class:`Broker` thread, as disconnection or 3096 replacement could happen in parallel on the broker thread at any 3097 moment. 3098 """ 3099 return ( 3100 self._stream_by_id.get(dst_id) or 3101 self._stream_by_id.get(mitogen.parent_id) 3102 ) 3103 3104 def del_handler(self, handle): 3105 """ 3106 Remove the handle registered for `handle` 3107 3108 :raises KeyError: 3109 The handle wasn't registered. 3110 """ 3111 _, _, _, respondent = self._handle_map.pop(handle) 3112 if respondent: 3113 self._handles_by_respondent[respondent].discard(handle) 3114 3115 def add_handler(self, fn, handle=None, persist=True, 3116 policy=None, respondent=None, 3117 overwrite=False): 3118 """ 3119 Invoke `fn(msg)` on the :class:`Broker` thread for each Message sent to 3120 `handle` from this context. Unregister after one invocation if 3121 `persist` is :data:`False`. If `handle` is :data:`None`, a new handle 3122 is allocated and returned. 3123 3124 :param int handle: 3125 If not :data:`None`, an explicit handle to register, usually one of 3126 the ``mitogen.core.*`` constants. If unspecified, a new unused 3127 handle will be allocated. 3128 3129 :param bool persist: 3130 If :data:`False`, the handler will be unregistered after a single 3131 message has been received. 3132 3133 :param mitogen.core.Context respondent: 3134 Context that messages to this handle are expected to be sent from. 3135 If specified, arranges for a dead message to be delivered to `fn` 3136 when disconnection of the context is detected. 3137 3138 In future `respondent` will likely also be used to prevent other 3139 contexts from sending messages to the handle. 3140 3141 :param function policy: 3142 Function invoked as `policy(msg, stream)` where `msg` is a 3143 :class:`mitogen.core.Message` about to be delivered, and `stream` 3144 is the :class:`mitogen.core.Stream` on which it was received. The 3145 function must return :data:`True`, otherwise an error is logged and 3146 delivery is refused. 3147 3148 Two built-in policy functions exist: 3149 3150 * :func:`has_parent_authority`: requires the message arrived from a 3151 parent context, or a context acting with a parent context's 3152 authority (``auth_id``). 3153 3154 * :func:`mitogen.parent.is_immediate_child`: requires the 3155 message arrived from an immediately connected child, for use in 3156 messaging patterns where either something becomes buggy or 3157 insecure by permitting indirect upstream communication. 3158 3159 In case of refusal, and the message's ``reply_to`` field is 3160 nonzero, a :class:`mitogen.core.CallError` is delivered to the 3161 sender indicating refusal occurred. 3162 3163 :param bool overwrite: 3164 If :data:`True`, allow existing handles to be silently overwritten. 3165 3166 :return: 3167 `handle`, or if `handle` was :data:`None`, the newly allocated 3168 handle. 3169 :raises Error: 3170 Attemp to register handle that was already registered. 3171 """ 3172 handle = handle or next(self._last_handle) 3173 _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) 3174 if handle in self._handle_map and not overwrite: 3175 raise Error(self.duplicate_handle_msg) 3176 3177 self._handle_map[handle] = persist, fn, policy, respondent 3178 if respondent: 3179 if respondent not in self._handles_by_respondent: 3180 self._handles_by_respondent[respondent] = set() 3181 listen(respondent, 'disconnect', 3182 lambda: self._on_respondent_disconnect(respondent)) 3183 self._handles_by_respondent[respondent].add(handle) 3184 3185 return handle 3186 3187 def _on_respondent_disconnect(self, context): 3188 for handle in self._handles_by_respondent.pop(context, ()): 3189 _, fn, _, _ = self._handle_map[handle] 3190 fn(Message.dead(self.respondent_disconnect_msg)) 3191 del self._handle_map[handle] 3192 3193 def _maybe_send_dead(self, unreachable, msg, reason, *args): 3194 """ 3195 Send a dead message to either the original sender or the intended 3196 recipient of `msg`, if the original sender was expecting a reply 3197 (because its `reply_to` was set), otherwise assume the message is a 3198 reply of some sort, and send the dead message to the original 3199 destination. 3200 3201 :param bool unreachable: 3202 If :data:`True`, the recipient is known to be dead or routing 3203 failed due to a security precaution, so don't attempt to fallback 3204 to sending the dead message to the recipient if the original sender 3205 did not include a reply address. 3206 :param mitogen.core.Message msg: 3207 Message that triggered the dead message. 3208 :param str reason: 3209 Human-readable error reason. 3210 :param tuple args: 3211 Elements to interpolate with `reason`. 3212 """ 3213 if args: 3214 reason %= args 3215 LOG.debug('%r: %r is dead: %r', self, msg, reason) 3216 if msg.reply_to and not msg.is_dead: 3217 msg.reply(Message.dead(reason=reason), router=self) 3218 elif not unreachable: 3219 self._async_route( 3220 Message.dead( 3221 dst_id=msg.dst_id, 3222 handle=msg.handle, 3223 reason=reason, 3224 ) 3225 ) 3226 3227 def _invoke(self, msg, stream): 3228 # IOLOG.debug('%r._invoke(%r)', self, msg) 3229 try: 3230 persist, fn, policy, respondent = self._handle_map[msg.handle] 3231 except KeyError: 3232 self._maybe_send_dead(True, msg, reason=self.invalid_handle_msg) 3233 return 3234 3235 if respondent and not (msg.is_dead or 3236 msg.src_id == respondent.context_id): 3237 self._maybe_send_dead(True, msg, 'reply from unexpected context') 3238 return 3239 3240 if policy and not policy(msg, stream): 3241 self._maybe_send_dead(True, msg, self.refused_msg) 3242 return 3243 3244 if not persist: 3245 self.del_handler(msg.handle) 3246 3247 try: 3248 fn(msg) 3249 except Exception: 3250 LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn) 3251 3252 def _async_route(self, msg, in_stream=None): 3253 """ 3254 Arrange for `msg` to be forwarded towards its destination. If its 3255 destination is the local context, then arrange for it to be dispatched 3256 using the local handlers. 3257 3258 This is a lower overhead version of :meth:`route` that may only be 3259 called from the :class:`Broker` thread. 3260 3261 :param Stream in_stream: 3262 If not :data:`None`, the stream the message arrived on. Used for 3263 performing source route verification, to ensure sensitive messages 3264 such as ``CALL_FUNCTION`` arrive only from trusted contexts. 3265 """ 3266 _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) 3267 3268 if len(msg.data) > self.max_message_size: 3269 self._maybe_send_dead(False, msg, self.too_large_msg % ( 3270 self.max_message_size, 3271 )) 3272 return 3273 3274 parent_stream = self._stream_by_id.get(mitogen.parent_id) 3275 src_stream = self._stream_by_id.get(msg.src_id, parent_stream) 3276 3277 # When the ingress stream is known, verify the message was received on 3278 # the same as the stream we would expect to receive messages from the 3279 # src_id and auth_id. This is like Reverse Path Filtering in IP, and 3280 # ensures messages from a privileged context cannot be spoofed by a 3281 # child. 3282 if in_stream: 3283 auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream) 3284 if in_stream != auth_stream: 3285 LOG.error('%r: bad auth_id: got %r via %r, not %r: %r', 3286 self, msg.auth_id, in_stream, auth_stream, msg) 3287 return 3288 3289 if msg.src_id != msg.auth_id and in_stream != src_stream: 3290 LOG.error('%r: bad src_id: got %r via %r, not %r: %r', 3291 self, msg.src_id, in_stream, src_stream, msg) 3292 return 3293 3294 # If the stream's MitogenProtocol has auth_id set, copy it to the 3295 # message. This allows subtrees to become privileged by stamping a 3296 # parent's context ID. It is used by mitogen.unix to mark client 3297 # streams (like Ansible WorkerProcess) as having the same rights as 3298 # the parent. 3299 if in_stream.protocol.auth_id is not None: 3300 msg.auth_id = in_stream.protocol.auth_id 3301 if in_stream.protocol.on_message is not None: 3302 in_stream.protocol.on_message(in_stream, msg) 3303 3304 # Record the IDs the source ever communicated with. 3305 in_stream.protocol.egress_ids.add(msg.dst_id) 3306 3307 if msg.dst_id == mitogen.context_id: 3308 return self._invoke(msg, in_stream) 3309 3310 out_stream = self._stream_by_id.get(msg.dst_id) 3311 if (not out_stream) and (parent_stream != src_stream or not in_stream): 3312 # No downstream route exists. The message could be from a child or 3313 # ourselves for a parent, in which case we must forward it 3314 # upstream, or it could be from a parent for a dead child, in which 3315 # case its src_id/auth_id would fail verification if returned to 3316 # the parent, so in that case reply with a dead message instead. 3317 out_stream = parent_stream 3318 3319 if out_stream is None: 3320 self._maybe_send_dead(True, msg, self.no_route_msg, 3321 msg.dst_id, mitogen.context_id) 3322 return 3323 3324 if in_stream and self.unidirectional and not \ 3325 (in_stream.protocol.is_privileged or 3326 out_stream.protocol.is_privileged): 3327 self._maybe_send_dead(True, msg, self.unidirectional_msg, 3328 in_stream.protocol.remote_id, 3329 out_stream.protocol.remote_id, 3330 mitogen.context_id) 3331 return 3332 3333 out_stream.protocol._send(msg) 3334 3335 def route(self, msg): 3336 """ 3337 Arrange for the :class:`Message` `msg` to be delivered to its 3338 destination using any relevant downstream context, or if none is found, 3339 by forwarding the message upstream towards the master context. If `msg` 3340 is destined for the local context, it is dispatched using the handles 3341 registered with :meth:`add_handler`. 3342 3343 This may be called from any thread. 3344 """ 3345 self.broker.defer(self._async_route, msg) 3346 3347 3348class NullTimerList(object): 3349 def get_timeout(self): 3350 return None 3351 3352 3353class Broker(object): 3354 """ 3355 Responsible for handling I/O multiplexing in a private thread. 3356 3357 **Note:** This somewhat limited core version is used by children. The 3358 master subclass is documented below. 3359 """ 3360 poller_class = Poller 3361 _waker = None 3362 _thread = None 3363 3364 # :func:`mitogen.parent._upgrade_broker` replaces this with 3365 # :class:`mitogen.parent.TimerList` during upgrade. 3366 timers = NullTimerList() 3367 3368 #: Seconds grace to allow :class:`streams <Stream>` to shutdown gracefully 3369 #: before force-disconnecting them during :meth:`shutdown`. 3370 shutdown_timeout = 3.0 3371 3372 def __init__(self, poller_class=None, activate_compat=True): 3373 self._alive = True 3374 self._exitted = False 3375 self._waker = Waker.build_stream(self) 3376 #: Arrange for `func(\*args, \**kwargs)` to be executed on the broker 3377 #: thread, or immediately if the current thread is the broker thread. 3378 #: Safe to call from any thread. 3379 self.defer = self._waker.protocol.defer 3380 self.poller = self.poller_class() 3381 self.poller.start_receive( 3382 self._waker.receive_side.fd, 3383 (self._waker.receive_side, self._waker.on_receive) 3384 ) 3385 self._thread = threading.Thread( 3386 target=self._broker_main, 3387 name='mitogen.broker' 3388 ) 3389 self._thread.start() 3390 if activate_compat: 3391 self._py24_25_compat() 3392 3393 def _py24_25_compat(self): 3394 """ 3395 Python 2.4/2.5 have grave difficulties with threads/fork. We 3396 mandatorily quiesce all running threads during fork using a 3397 monkey-patch there. 3398 """ 3399 if sys.version_info < (2, 6): 3400 # import_module() is used to avoid dep scanner. 3401 os_fork = import_module('mitogen.os_fork') 3402 os_fork._notice_broker_or_pool(self) 3403 3404 def start_receive(self, stream): 3405 """ 3406 Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as 3407 ready for reading. Safe to call from any thread. When the associated 3408 file descriptor becomes ready for reading, 3409 :meth:`BasicStream.on_receive` will be called. 3410 """ 3411 _vv and IOLOG.debug('%r.start_receive(%r)', self, stream) 3412 side = stream.receive_side 3413 assert side and not side.closed 3414 self.defer(self.poller.start_receive, 3415 side.fd, (side, stream.on_receive)) 3416 3417 def stop_receive(self, stream): 3418 """ 3419 Mark the :attr:`receive_side <Stream.receive_side>` on `stream` as not 3420 ready for reading. Safe to call from any thread. 3421 """ 3422 _vv and IOLOG.debug('%r.stop_receive(%r)', self, stream) 3423 self.defer(self.poller.stop_receive, stream.receive_side.fd) 3424 3425 def _start_transmit(self, stream): 3426 """ 3427 Mark the :attr:`transmit_side <Stream.transmit_side>` on `stream` as 3428 ready for writing. Must only be called from the Broker thread. When the 3429 associated file descriptor becomes ready for writing, 3430 :meth:`BasicStream.on_transmit` will be called. 3431 """ 3432 _vv and IOLOG.debug('%r._start_transmit(%r)', self, stream) 3433 side = stream.transmit_side 3434 assert side and not side.closed 3435 self.poller.start_transmit(side.fd, (side, stream.on_transmit)) 3436 3437 def _stop_transmit(self, stream): 3438 """ 3439 Mark the :attr:`transmit_side <Stream.receive_side>` on `stream` as not 3440 ready for writing. 3441 """ 3442 _vv and IOLOG.debug('%r._stop_transmit(%r)', self, stream) 3443 self.poller.stop_transmit(stream.transmit_side.fd) 3444 3445 def keep_alive(self): 3446 """ 3447 Return :data:`True` if any reader's :attr:`Side.keep_alive` attribute 3448 is :data:`True`, or any :class:`Context` is still registered that is 3449 not the master. Used to delay shutdown while some important work is in 3450 progress (e.g. log draining). 3451 """ 3452 it = (side.keep_alive for (_, (side, _)) in self.poller.readers) 3453 return sum(it, 0) > 0 or self.timers.get_timeout() is not None 3454 3455 def defer_sync(self, func): 3456 """ 3457 Arrange for `func()` to execute on :class:`Broker` thread, blocking the 3458 current thread until a result or exception is available. 3459 3460 :returns: 3461 Return value of `func()`. 3462 """ 3463 latch = Latch() 3464 def wrapper(): 3465 try: 3466 latch.put(func()) 3467 except Exception: 3468 latch.put(sys.exc_info()[1]) 3469 self.defer(wrapper) 3470 res = latch.get() 3471 if isinstance(res, Exception): 3472 raise res 3473 return res 3474 3475 def _call(self, stream, func): 3476 """ 3477 Call `func(self)`, catching any exception that might occur, logging it, 3478 and force-disconnecting the related `stream`. 3479 """ 3480 try: 3481 func(self) 3482 except Exception: 3483 LOG.exception('%r crashed', stream) 3484 stream.on_disconnect(self) 3485 3486 def _loop_once(self, timeout=None): 3487 """ 3488 Execute a single :class:`Poller` wait, dispatching any IO events that 3489 caused the wait to complete. 3490 3491 :param float timeout: 3492 If not :data:`None`, maximum time in seconds to wait for events. 3493 """ 3494 _vv and IOLOG.debug('%r._loop_once(%r, %r)', 3495 self, timeout, self.poller) 3496 3497 timer_to = self.timers.get_timeout() 3498 if timeout is None: 3499 timeout = timer_to 3500 elif timer_to is not None and timer_to < timeout: 3501 timeout = timer_to 3502 3503 #IOLOG.debug('readers =\n%s', pformat(self.poller.readers)) 3504 #IOLOG.debug('writers =\n%s', pformat(self.poller.writers)) 3505 for side, func in self.poller.poll(timeout): 3506 self._call(side.stream, func) 3507 if timer_to is not None: 3508 self.timers.expire() 3509 3510 def _broker_exit(self): 3511 """ 3512 Forcefully call :meth:`Stream.on_disconnect` on any streams that failed 3513 to shut down gracefully, then discard the :class:`Poller`. 3514 """ 3515 for _, (side, _) in self.poller.readers + self.poller.writers: 3516 LOG.debug('%r: force disconnecting %r', self, side) 3517 side.stream.on_disconnect(self) 3518 3519 self.poller.close() 3520 3521 def _broker_shutdown(self): 3522 """ 3523 Invoke :meth:`Stream.on_shutdown` for every active stream, then allow 3524 up to :attr:`shutdown_timeout` seconds for the streams to unregister 3525 themselves, logging an error if any did not unregister during the grace 3526 period. 3527 """ 3528 for _, (side, _) in self.poller.readers + self.poller.writers: 3529 self._call(side.stream, side.stream.on_shutdown) 3530 3531 deadline = now() + self.shutdown_timeout 3532 while self.keep_alive() and now() < deadline: 3533 self._loop_once(max(0, deadline - now())) 3534 3535 if self.keep_alive(): 3536 LOG.error('%r: pending work still existed %d seconds after ' 3537 'shutdown began. This may be due to a timer that is yet ' 3538 'to expire, or a child connection that did not fully ' 3539 'shut down.', self, self.shutdown_timeout) 3540 3541 def _do_broker_main(self): 3542 """ 3543 Broker thread main function. Dispatches IO events until 3544 :meth:`shutdown` is called. 3545 """ 3546 # For Python 2.4, no way to retrieve ident except on thread. 3547 self._waker.protocol.broker_ident = thread.get_ident() 3548 try: 3549 while self._alive: 3550 self._loop_once() 3551 3552 fire(self, 'before_shutdown') 3553 fire(self, 'shutdown') 3554 self._broker_shutdown() 3555 except Exception: 3556 e = sys.exc_info()[1] 3557 LOG.exception('broker crashed') 3558 syslog.syslog(syslog.LOG_ERR, 'broker crashed: %s' % (e,)) 3559 syslog.closelog() # prevent test 'fd leak'. 3560 3561 self._alive = False # Ensure _alive is consistent on crash. 3562 self._exitted = True 3563 self._broker_exit() 3564 3565 def _broker_main(self): 3566 try: 3567 _profile_hook('mitogen.broker', self._do_broker_main) 3568 finally: 3569 # 'finally' to ensure _on_broker_exit() can always SIGTERM. 3570 fire(self, 'exit') 3571 3572 def shutdown(self): 3573 """ 3574 Request broker gracefully disconnect streams and stop. Safe to call 3575 from any thread. 3576 """ 3577 _v and LOG.debug('%r: shutting down', self) 3578 def _shutdown(): 3579 self._alive = False 3580 if self._alive and not self._exitted: 3581 self.defer(_shutdown) 3582 3583 def join(self): 3584 """ 3585 Wait for the broker to stop, expected to be called after 3586 :meth:`shutdown`. 3587 """ 3588 self._thread.join() 3589 3590 def __repr__(self): 3591 return 'Broker(%04x)' % (id(self) & 0xffff,) 3592 3593 3594class Dispatcher(object): 3595 """ 3596 Implementation of the :data:`CALL_FUNCTION` handle for a child context. 3597 Listens on the child's main thread for messages sent by 3598 :class:`mitogen.parent.CallChain` and dispatches the function calls they 3599 describe. 3600 3601 If a :class:`mitogen.parent.CallChain` sending a message is in pipelined 3602 mode, any exception that occurs is recorded, and causes all subsequent 3603 calls with the same `chain_id` to fail with the same exception. 3604 """ 3605 _service_recv = None 3606 3607 def __repr__(self): 3608 return 'Dispatcher' 3609 3610 def __init__(self, econtext): 3611 self.econtext = econtext 3612 #: Chain ID -> CallError if prior call failed. 3613 self._error_by_chain_id = {} 3614 self.recv = Receiver( 3615 router=econtext.router, 3616 handle=CALL_FUNCTION, 3617 policy=has_parent_authority, 3618 ) 3619 #: The :data:`CALL_SERVICE` :class:`Receiver` that will eventually be 3620 #: reused by :class:`mitogen.service.Pool`, should it ever be loaded. 3621 #: This is necessary for race-free reception of all service requests 3622 #: delivered regardless of whether the stub or real service pool are 3623 #: loaded. See #547 for related sorrows. 3624 Dispatcher._service_recv = Receiver( 3625 router=econtext.router, 3626 handle=CALL_SERVICE, 3627 policy=has_parent_authority, 3628 ) 3629 self._service_recv.notify = self._on_call_service 3630 listen(econtext.broker, 'shutdown', self._on_broker_shutdown) 3631 3632 def _on_broker_shutdown(self): 3633 if self._service_recv.notify == self._on_call_service: 3634 self._service_recv.notify = None 3635 self.recv.close() 3636 3637 3638 @classmethod 3639 @takes_econtext 3640 def forget_chain(cls, chain_id, econtext): 3641 econtext.dispatcher._error_by_chain_id.pop(chain_id, None) 3642 3643 def _parse_request(self, msg): 3644 data = msg.unpickle(throw=False) 3645 _v and LOG.debug('%r: dispatching %r', self, data) 3646 3647 chain_id, modname, klass, func, args, kwargs = data 3648 obj = import_module(modname) 3649 if klass: 3650 obj = getattr(obj, klass) 3651 fn = getattr(obj, func) 3652 if getattr(fn, 'mitogen_takes_econtext', None): 3653 kwargs.setdefault('econtext', self.econtext) 3654 if getattr(fn, 'mitogen_takes_router', None): 3655 kwargs.setdefault('router', self.econtext.router) 3656 3657 return chain_id, fn, args, kwargs 3658 3659 def _dispatch_one(self, msg): 3660 try: 3661 chain_id, fn, args, kwargs = self._parse_request(msg) 3662 except Exception: 3663 return None, CallError(sys.exc_info()[1]) 3664 3665 if chain_id in self._error_by_chain_id: 3666 return chain_id, self._error_by_chain_id[chain_id] 3667 3668 try: 3669 return chain_id, fn(*args, **kwargs) 3670 except Exception: 3671 e = CallError(sys.exc_info()[1]) 3672 if chain_id is not None: 3673 self._error_by_chain_id[chain_id] = e 3674 return chain_id, e 3675 3676 def _on_call_service(self, recv): 3677 """ 3678 Notifier for the :data:`CALL_SERVICE` receiver. This is called on the 3679 :class:`Broker` thread for any service messages arriving at this 3680 context, for as long as no real service pool implementation is loaded. 3681 3682 In order to safely bootstrap the service pool implementation a sentinel 3683 message is enqueued on the :data:`CALL_FUNCTION` receiver in order to 3684 wake the main thread, where the importer can run without any 3685 possibility of suffering deadlock due to concurrent uses of the 3686 importer. 3687 3688 Should the main thread be blocked indefinitely, preventing the import 3689 from ever running, if it is blocked waiting on a service call, then it 3690 means :mod:`mitogen.service` has already been imported and 3691 :func:`mitogen.service.get_or_create_pool` has already run, meaning the 3692 service pool is already active and the duplicate initialization was not 3693 needed anyway. 3694 3695 #547: This trickery is needed to avoid the alternate option of spinning 3696 a temporary thread to import the service pool, which could deadlock if 3697 a custom import hook executing on the main thread (under the importer 3698 lock) would block waiting for some data that was in turn received by a 3699 service. Main thread import lock can't be released until service is 3700 running, service cannot satisfy request until import lock is released. 3701 """ 3702 self.recv._on_receive(Message(handle=STUB_CALL_SERVICE)) 3703 3704 def _init_service_pool(self): 3705 import mitogen.service 3706 mitogen.service.get_or_create_pool(router=self.econtext.router) 3707 3708 def _dispatch_calls(self): 3709 for msg in self.recv: 3710 if msg.handle == STUB_CALL_SERVICE: 3711 if msg.src_id == mitogen.context_id: 3712 self._init_service_pool() 3713 continue 3714 3715 chain_id, ret = self._dispatch_one(msg) 3716 _v and LOG.debug('%r: %r -> %r', self, msg, ret) 3717 if msg.reply_to: 3718 msg.reply(ret) 3719 elif isinstance(ret, CallError) and chain_id is None: 3720 LOG.error('No-reply function call failed: %s', ret) 3721 3722 def run(self): 3723 if self.econtext.config.get('on_start'): 3724 self.econtext.config['on_start'](self.econtext) 3725 3726 _profile_hook('mitogen.child_main', self._dispatch_calls) 3727 3728 3729class ExternalContext(object): 3730 """ 3731 External context implementation. 3732 3733 This class contains the main program implementation for new children. It is 3734 responsible for setting up everything about the process environment, import 3735 hooks, standard IO redirection, logging, configuring a :class:`Router` and 3736 :class:`Broker`, and finally arranging for :class:`Dispatcher` to take over 3737 the main thread after initialization is complete. 3738 3739 .. attribute:: broker 3740 3741 The :class:`mitogen.core.Broker` instance. 3742 3743 .. attribute:: context 3744 3745 The :class:`mitogen.core.Context` instance. 3746 3747 .. attribute:: channel 3748 3749 The :class:`mitogen.core.Channel` over which :data:`CALL_FUNCTION` 3750 requests are received. 3751 3752 .. attribute:: importer 3753 3754 The :class:`mitogen.core.Importer` instance. 3755 3756 .. attribute:: stdout_log 3757 3758 The :class:`IoLogger` connected to :data:`sys.stdout`. 3759 3760 .. attribute:: stderr_log 3761 3762 The :class:`IoLogger` connected to :data:`sys.stderr`. 3763 """ 3764 detached = False 3765 3766 def __init__(self, config): 3767 self.config = config 3768 3769 def _on_broker_exit(self): 3770 if not self.config['profiling']: 3771 os.kill(os.getpid(), signal.SIGTERM) 3772 3773 def _on_shutdown_msg(self, msg): 3774 if not msg.is_dead: 3775 _v and LOG.debug('shutdown request from context %d', msg.src_id) 3776 self.broker.shutdown() 3777 3778 def _on_parent_disconnect(self): 3779 if self.detached: 3780 mitogen.parent_ids = [] 3781 mitogen.parent_id = None 3782 LOG.info('Detachment complete') 3783 else: 3784 _v and LOG.debug('parent stream is gone, dying.') 3785 self.broker.shutdown() 3786 3787 def detach(self): 3788 self.detached = True 3789 stream = self.router.stream_by_id(mitogen.parent_id) 3790 if stream: # not double-detach()'d 3791 os.setsid() 3792 self.parent.send_await(Message(handle=DETACHING)) 3793 LOG.info('Detaching from %r; parent is %s', stream, self.parent) 3794 for x in range(20): 3795 pending = self.broker.defer_sync(stream.protocol.pending_bytes) 3796 if not pending: 3797 break 3798 time.sleep(0.05) 3799 if pending: 3800 LOG.error('Stream had %d bytes after 2000ms', pending) 3801 self.broker.defer(stream.on_disconnect, self.broker) 3802 3803 def _setup_master(self): 3804 Router.max_message_size = self.config['max_message_size'] 3805 if self.config['profiling']: 3806 enable_profiling() 3807 self.broker = Broker(activate_compat=False) 3808 self.router = Router(self.broker) 3809 self.router.debug = self.config.get('debug', False) 3810 self.router.unidirectional = self.config['unidirectional'] 3811 self.router.add_handler( 3812 fn=self._on_shutdown_msg, 3813 handle=SHUTDOWN, 3814 policy=has_parent_authority, 3815 ) 3816 self.master = Context(self.router, 0, 'master') 3817 parent_id = self.config['parent_ids'][0] 3818 if parent_id == 0: 3819 self.parent = self.master 3820 else: 3821 self.parent = Context(self.router, parent_id, 'parent') 3822 3823 in_fd = self.config.get('in_fd', 100) 3824 in_fp = os.fdopen(os.dup(in_fd), 'rb', 0) 3825 os.close(in_fd) 3826 3827 out_fp = os.fdopen(os.dup(self.config.get('out_fd', 1)), 'wb', 0) 3828 self.stream = MitogenProtocol.build_stream( 3829 self.router, 3830 parent_id, 3831 local_id=self.config['context_id'], 3832 parent_ids=self.config['parent_ids'] 3833 ) 3834 self.stream.accept(in_fp, out_fp) 3835 self.stream.name = 'parent' 3836 self.stream.receive_side.keep_alive = False 3837 3838 listen(self.stream, 'disconnect', self._on_parent_disconnect) 3839 listen(self.broker, 'exit', self._on_broker_exit) 3840 3841 def _reap_first_stage(self): 3842 try: 3843 os.wait() # Reap first stage. 3844 except OSError: 3845 pass # No first stage exists (e.g. fakessh) 3846 3847 def _setup_logging(self): 3848 self.log_handler = LogHandler(self.master) 3849 root = logging.getLogger() 3850 root.setLevel(self.config['log_level']) 3851 root.handlers = [self.log_handler] 3852 if self.config['debug']: 3853 enable_debug_logging() 3854 3855 def _setup_importer(self): 3856 importer = self.config.get('importer') 3857 if importer: 3858 importer._install_handler(self.router) 3859 importer._context = self.parent 3860 else: 3861 core_src_fd = self.config.get('core_src_fd', 101) 3862 if core_src_fd: 3863 fp = os.fdopen(core_src_fd, 'rb', 1) 3864 try: 3865 core_src = fp.read() 3866 # Strip "ExternalContext.main()" call from last line. 3867 core_src = b('\n').join(core_src.splitlines()[:-1]) 3868 finally: 3869 fp.close() 3870 else: 3871 core_src = None 3872 3873 importer = Importer( 3874 self.router, 3875 self.parent, 3876 core_src, 3877 self.config.get('whitelist', ()), 3878 self.config.get('blacklist', ()), 3879 ) 3880 3881 self.importer = importer 3882 self.router.importer = importer 3883 sys.meta_path.insert(0, self.importer) 3884 3885 def _setup_package(self): 3886 global mitogen 3887 mitogen = imp.new_module('mitogen') 3888 mitogen.__package__ = 'mitogen' 3889 mitogen.__path__ = [] 3890 mitogen.__loader__ = self.importer 3891 mitogen.main = lambda *args, **kwargs: (lambda func: None) 3892 mitogen.core = sys.modules['__main__'] 3893 mitogen.core.__file__ = 'x/mitogen/core.py' # For inspect.getsource() 3894 mitogen.core.__loader__ = self.importer 3895 sys.modules['mitogen'] = mitogen 3896 sys.modules['mitogen.core'] = mitogen.core 3897 del sys.modules['__main__'] 3898 3899 def _setup_globals(self): 3900 mitogen.is_master = False 3901 mitogen.__version__ = self.config['version'] 3902 mitogen.context_id = self.config['context_id'] 3903 mitogen.parent_ids = self.config['parent_ids'][:] 3904 mitogen.parent_id = mitogen.parent_ids[0] 3905 3906 def _nullify_stdio(self): 3907 """ 3908 Open /dev/null to replace stdio temporarily. In case of odd startup, 3909 assume we may be allocated a standard handle. 3910 """ 3911 for stdfd, mode in ((0, os.O_RDONLY), (1, os.O_RDWR), (2, os.O_RDWR)): 3912 fd = os.open('/dev/null', mode) 3913 if fd != stdfd: 3914 os.dup2(fd, stdfd) 3915 os.close(fd) 3916 3917 def _preserve_tty_fp(self): 3918 """ 3919 #481: when stderr is a TTY due to being started via tty_create_child() 3920 or hybrid_tty_create_child(), and some privilege escalation tool like 3921 prehistoric versions of sudo exec this process over the top of itself, 3922 there is nothing left to keep the slave PTY open after we replace our 3923 stdio. Therefore if stderr is a TTY, keep around a permanent dup() to 3924 avoid receiving SIGHUP. 3925 """ 3926 try: 3927 if os.isatty(2): 3928 self.reserve_tty_fp = os.fdopen(os.dup(2), 'r+b', 0) 3929 set_cloexec(self.reserve_tty_fp.fileno()) 3930 except OSError: 3931 pass 3932 3933 def _setup_stdio(self): 3934 self._preserve_tty_fp() 3935 # When sys.stdout was opened by the runtime, overwriting it will not 3936 # close FD 1. However when forking from a child that previously used 3937 # fdopen(), overwriting it /will/ close FD 1. So we must swallow the 3938 # close before IoLogger overwrites FD 1, otherwise its new FD 1 will be 3939 # clobbered. Additionally, stdout must be replaced with /dev/null prior 3940 # to stdout.close(), since if block buffering was active in the parent, 3941 # any pre-fork buffered data will be flushed on close(), corrupting the 3942 # connection to the parent. 3943 self._nullify_stdio() 3944 sys.stdout.close() 3945 self._nullify_stdio() 3946 3947 self.loggers = [] 3948 for name, fd in (('stdout', 1), ('stderr', 2)): 3949 log = IoLoggerProtocol.build_stream(name, fd) 3950 self.broker.start_receive(log) 3951 self.loggers.append(log) 3952 3953 # Reopen with line buffering. 3954 sys.stdout = os.fdopen(1, 'w', 1) 3955 3956 def main(self): 3957 self._setup_master() 3958 try: 3959 try: 3960 self._setup_logging() 3961 self._setup_importer() 3962 self._reap_first_stage() 3963 if self.config.get('setup_package', True): 3964 self._setup_package() 3965 self._setup_globals() 3966 if self.config.get('setup_stdio', True): 3967 self._setup_stdio() 3968 3969 self.dispatcher = Dispatcher(self) 3970 self.router.register(self.parent, self.stream) 3971 self.router._setup_logging() 3972 3973 _v and LOG.debug('Python version is %s', sys.version) 3974 _v and LOG.debug('Parent is context %r (%s); my ID is %r', 3975 self.parent.context_id, self.parent.name, 3976 mitogen.context_id) 3977 _v and LOG.debug('pid:%r ppid:%r uid:%r/%r, gid:%r/%r host:%r', 3978 os.getpid(), os.getppid(), os.geteuid(), 3979 os.getuid(), os.getegid(), os.getgid(), 3980 socket.gethostname()) 3981 3982 sys.executable = os.environ.pop('ARGV0', sys.executable) 3983 _v and LOG.debug('Recovered sys.executable: %r', sys.executable) 3984 3985 if self.config.get('send_ec2', True): 3986 self.stream.transmit_side.write(b('MITO002\n')) 3987 self.broker._py24_25_compat() 3988 self.log_handler.uncork() 3989 self.dispatcher.run() 3990 _v and LOG.debug('ExternalContext.main() normal exit') 3991 except KeyboardInterrupt: 3992 LOG.debug('KeyboardInterrupt received, exiting gracefully.') 3993 except BaseException: 3994 LOG.exception('ExternalContext.main() crashed') 3995 raise 3996 finally: 3997 self.broker.shutdown() 3998