1# 2# Module to allow connection and socket objects to be transferred 3# between processes 4# 5# multiprocessing/reduction.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# All rights reserved. 9# 10# Redistribution and use in source and binary forms, with or without 11# modification, are permitted provided that the following conditions 12# are met: 13# 14# 1. Redistributions of source code must retain the above copyright 15# notice, this list of conditions and the following disclaimer. 16# 2. Redistributions in binary form must reproduce the above copyright 17# notice, this list of conditions and the following disclaimer in the 18# documentation and/or other materials provided with the distribution. 19# 3. Neither the name of author nor the names of any contributors may be 20# used to endorse or promote products derived from this software 21# without specific prior written permission. 22# 23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33# SUCH DAMAGE. 34# 35 36__all__ = [] 37 38import os 39import sys 40import socket 41import threading 42 43try: 44 import _multiprocess as _multiprocessing 45except ImportError: 46 import _multiprocessing 47from multiprocess import current_process 48from multiprocess.forking import Popen, duplicate, close, ForkingPickler 49from multiprocess.util import register_after_fork, debug, sub_debug 50from multiprocess.connection import Client, Listener 51 52 53# 54# 55# 56 57if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): 58 raise ImportError('pickling of connections not supported') 59 60# 61# Platform specific definitions 62# 63 64if sys.platform == 'win32': 65 import _subprocess 66 try: 67 from _multiprocess import win32 68 except ImportError: 69 from _multiprocessing import win32 70 71 def send_handle(conn, handle, destination_pid): 72 process_handle = win32.OpenProcess( 73 win32.PROCESS_ALL_ACCESS, False, destination_pid 74 ) 75 try: 76 new_handle = duplicate(handle, process_handle) 77 conn.send(new_handle) 78 finally: 79 close(process_handle) 80 81 def recv_handle(conn): 82 return conn.recv() 83 84else: 85 def send_handle(conn, handle, destination_pid): 86 _multiprocessing.sendfd(conn.fileno(), handle) 87 88 def recv_handle(conn): 89 return _multiprocessing.recvfd(conn.fileno()) 90 91# 92# Support for a per-process server thread which caches pickled handles 93# 94 95_cache = set() 96 97def _reset(obj): 98 global _lock, _listener, _cache 99 for h in _cache: 100 close(h) 101 _cache.clear() 102 _lock = threading.Lock() 103 _listener = None 104 105_reset(None) 106register_after_fork(_reset, _reset) 107 108def _get_listener(): 109 global _listener 110 111 if _listener is None: 112 _lock.acquire() 113 try: 114 if _listener is None: 115 debug('starting listener and thread for sending handles') 116 _listener = Listener(authkey=current_process().authkey) 117 t = threading.Thread(target=_serve) 118 t.daemon = True 119 t.start() 120 finally: 121 _lock.release() 122 123 return _listener 124 125def _serve(): 126 from .util import is_exiting, sub_warning 127 128 while 1: 129 try: 130 conn = _listener.accept() 131 handle_wanted, destination_pid = conn.recv() 132 _cache.remove(handle_wanted) 133 send_handle(conn, handle_wanted, destination_pid) 134 close(handle_wanted) 135 conn.close() 136 except: 137 if not is_exiting(): 138 import traceback 139 sub_warning( 140 'thread for sharing handles raised exception :\n' + 141 '-'*79 + '\n' + traceback.format_exc() + '-'*79 142 ) 143 144# 145# Functions to be used for pickling/unpickling objects with handles 146# 147 148def reduce_handle(handle): 149 if Popen.thread_is_spawning(): 150 return (None, Popen.duplicate_for_child(handle), True) 151 dup_handle = duplicate(handle) 152 _cache.add(dup_handle) 153 sub_debug('reducing handle %d', handle) 154 return (_get_listener().address, dup_handle, False) 155 156def rebuild_handle(pickled_data): 157 address, handle, inherited = pickled_data 158 if inherited: 159 return handle 160 sub_debug('rebuilding handle %d', handle) 161 conn = Client(address, authkey=current_process().authkey) 162 conn.send((handle, os.getpid())) 163 new_handle = recv_handle(conn) 164 conn.close() 165 return new_handle 166 167# 168# Register `_multiprocessing.Connection` with `ForkingPickler` 169# 170 171def reduce_connection(conn): 172 rh = reduce_handle(conn.fileno()) 173 return rebuild_connection, (rh, conn.readable, conn.writable) 174 175def rebuild_connection(reduced_handle, readable, writable): 176 handle = rebuild_handle(reduced_handle) 177 return _multiprocessing.Connection( 178 handle, readable=readable, writable=writable 179 ) 180 181ForkingPickler.register(_multiprocessing.Connection, reduce_connection) 182 183# 184# Register `socket.socket` with `ForkingPickler` 185# 186 187def fromfd(fd, family, type_, proto=0): 188 s = socket.fromfd(fd, family, type_, proto) 189 if s.__class__ is not socket.socket: 190 s = socket.socket(_sock=s) 191 return s 192 193def reduce_socket(s): 194 reduced_handle = reduce_handle(s.fileno()) 195 return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) 196 197def rebuild_socket(reduced_handle, family, type_, proto): 198 fd = rebuild_handle(reduced_handle) 199 _sock = fromfd(fd, family, type_, proto) 200 close(fd) 201 return _sock 202 203ForkingPickler.register(socket.socket, reduce_socket) 204 205# 206# Register `_multiprocessing.PipeConnection` with `ForkingPickler` 207# 208 209if sys.platform == 'win32': 210 211 def reduce_pipe_connection(conn): 212 rh = reduce_handle(conn.fileno()) 213 return rebuild_pipe_connection, (rh, conn.readable, conn.writable) 214 215 def rebuild_pipe_connection(reduced_handle, readable, writable): 216 handle = rebuild_handle(reduced_handle) 217 return _multiprocessing.PipeConnection( 218 handle, readable=readable, writable=writable 219 ) 220 221 ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) 222