1# 2# Module for starting a process object using os.fork() or CreateProcess() 3# 4# multiprocessing/forking.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35import os 36import sys 37import signal 38import errno 39 40from multiprocess import util, process 41 42__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] 43 44# 45# Check that the current thread is spawning a child process 46# 47 48def assert_spawning(self): 49 if not Popen.thread_is_spawning(): 50 raise RuntimeError( 51 '%s objects should only be shared between processes' 52 ' through inheritance' % type(self).__name__ 53 ) 54 55# 56# Try making some callable types picklable 57# 58 59try: 60 from dill import Pickler 61except ImportError: 62 from pickle import Pickler 63class ForkingPickler(Pickler): 64 dispatch = Pickler.dispatch.copy() 65 66 def __init__(self, *args, **kwds): 67 Pickler.__init__(self, *args, **kwds) 68 69 @classmethod 70 def register(cls, type, reduce): 71 def dispatcher(self, obj): 72 rv = reduce(obj) 73 self.save_reduce(obj=obj, *rv) 74 cls.dispatch[type] = dispatcher 75 76def _reduce_method(m): 77 if m.im_self is None: 78 return getattr, (m.im_class, m.im_func.func_name) 79 else: 80 return getattr, (m.im_self, m.im_func.func_name) 81ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 82 83if type(list.append) is not type(ForkingPickler.save): 84 # Some python implementations have unbound methods even for builtin types 85 def _reduce_method_descriptor(m): 86 return getattr, (m.__objclass__, m.__name__) 87 ForkingPickler.register(type(list.append), _reduce_method_descriptor) 88 ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) 89 90try: 91 from functools import partial 92except ImportError: 93 pass 94else: 95 def _reduce_partial(p): 96 return _rebuild_partial, (p.func, p.args, p.keywords or {}) 97 def _rebuild_partial(func, args, keywords): 98 return partial(func, *args, **keywords) 99 ForkingPickler.register(partial, _reduce_partial) 100 101# 102# Unix 103# 104 105if sys.platform != 'win32': 106 import time 107 108 exit = os._exit 109 duplicate = os.dup 110 close = os.close 111 112 # 113 # We define a Popen class similar to the one from subprocess, but 114 # whose constructor takes a process object as its argument. 115 # 116 117 class Popen(object): 118 119 def __init__(self, process_obj): 120 sys.stdout.flush() 121 sys.stderr.flush() 122 self.returncode = None 123 124 self.pid = os.fork() 125 if self.pid == 0: 126 if 'random' in sys.modules: 127 import random 128 random.seed() 129 code = process_obj._bootstrap() 130 sys.stdout.flush() 131 sys.stderr.flush() 132 os._exit(code) 133 134 def poll(self, flag=os.WNOHANG): 135 if self.returncode is None: 136 while True: 137 try: 138 pid, sts = os.waitpid(self.pid, flag) 139 except os.error as e: 140 if e.errno == errno.EINTR: 141 continue 142 # Child process not yet created. See #1731717 143 # e.errno == errno.ECHILD == 10 144 return None 145 else: 146 break 147 if pid == self.pid: 148 if os.WIFSIGNALED(sts): 149 self.returncode = -os.WTERMSIG(sts) 150 else: 151 assert os.WIFEXITED(sts) 152 self.returncode = os.WEXITSTATUS(sts) 153 return self.returncode 154 155 def wait(self, timeout=None): 156 if timeout is None: 157 return self.poll(0) 158 deadline = time.time() + timeout 159 delay = 0.0005 160 while 1: 161 res = self.poll() 162 if res is not None: 163 break 164 remaining = deadline - time.time() 165 if remaining <= 0: 166 break 167 delay = min(delay * 2, remaining, 0.05) 168 time.sleep(delay) 169 return res 170 171 def terminate(self): 172 if self.returncode is None: 173 try: 174 os.kill(self.pid, signal.SIGTERM) 175 except OSError, e: 176 if self.wait(timeout=0.1) is None: 177 raise 178 179 @staticmethod 180 def thread_is_spawning(): 181 return False 182 183# 184# Windows 185# 186 187else: 188 import thread 189 import msvcrt 190 import _subprocess 191 import time 192 193 try: 194 from _multiprocess import win32, Connection, PipeConnection 195 except ImportError: 196 from _multiprocessing import win32, Connection, PipeConnection 197 from .util import Finalize 198 199 try: 200 # from cPickle import dump, load, HIGHEST_PROTOCOL 201 from dill import load, DEFAULT_PROTOCOL as HIGHEST_PROTOCOL 202 except ImportError: 203 from pickle import load, HIGHEST_PROTOCOL 204 205 def dump(obj, file, protocol=None, *args, **kwds): 206 ForkingPickler(file, protocol, *args, **kwds).dump(obj) 207 208 # 209 # 210 # 211 212 TERMINATE = 0x10000 213 WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) 214 WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") 215 216 exit = win32.ExitProcess 217 close = win32.CloseHandle 218 219 # 220 # _python_exe is the assumed path to the python executable. 221 # People embedding Python want to modify it. 222 # 223 224 if WINSERVICE: 225 _python_exe = os.path.join(sys.exec_prefix, 'python.exe') 226 else: 227 _python_exe = sys.executable 228 229 def set_executable(exe): 230 global _python_exe 231 _python_exe = exe 232 233 # 234 # 235 # 236 237 def duplicate(handle, target_process=None, inheritable=False): 238 if target_process is None: 239 target_process = _subprocess.GetCurrentProcess() 240 return _subprocess.DuplicateHandle( 241 _subprocess.GetCurrentProcess(), handle, target_process, 242 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS 243 ).Detach() 244 245 # 246 # We define a Popen class similar to the one from subprocess, but 247 # whose constructor takes a process object as its argument. 248 # 249 250 class Popen(object): 251 ''' 252 Start a subprocess to run the code of a process object 253 ''' 254 _tls = thread._local() 255 256 def __init__(self, process_obj): 257 # create pipe for communication with child 258 rfd, wfd = os.pipe() 259 260 # get handle for read end of the pipe and make it inheritable 261 rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) 262 os.close(rfd) 263 264 # start process 265 cmd = get_command_line() + [rhandle] 266 cmd = ' '.join('"%s"' % x for x in cmd) 267 hp, ht, pid, tid = _subprocess.CreateProcess( 268 _python_exe, cmd, None, None, 1, 0, None, None, None 269 ) 270 ht.Close() 271 close(rhandle) 272 273 # set attributes of self 274 self.pid = pid 275 self.returncode = None 276 self._handle = hp 277 278 # send information to child 279 prep_data = get_preparation_data(process_obj._name) 280 to_child = os.fdopen(wfd, 'wb') 281 Popen._tls.process_handle = int(hp) 282 try: 283 dump(prep_data, to_child, HIGHEST_PROTOCOL) 284 dump(process_obj, to_child, HIGHEST_PROTOCOL) 285 finally: 286 del Popen._tls.process_handle 287 to_child.close() 288 289 @staticmethod 290 def thread_is_spawning(): 291 return getattr(Popen._tls, 'process_handle', None) is not None 292 293 @staticmethod 294 def duplicate_for_child(handle): 295 return duplicate(handle, Popen._tls.process_handle) 296 297 def wait(self, timeout=None): 298 if self.returncode is None: 299 if timeout is None: 300 msecs = _subprocess.INFINITE 301 else: 302 msecs = max(0, int(timeout * 1000 + 0.5)) 303 304 res = _subprocess.WaitForSingleObject(int(self._handle), msecs) 305 if res == _subprocess.WAIT_OBJECT_0: 306 code = _subprocess.GetExitCodeProcess(self._handle) 307 if code == TERMINATE: 308 code = -signal.SIGTERM 309 self.returncode = code 310 311 return self.returncode 312 313 def poll(self): 314 return self.wait(timeout=0) 315 316 def terminate(self): 317 if self.returncode is None: 318 try: 319 _subprocess.TerminateProcess(int(self._handle), TERMINATE) 320 except WindowsError: 321 if self.wait(timeout=0.1) is None: 322 raise 323 324 # 325 # 326 # 327 328 def is_forking(argv): 329 ''' 330 Return whether commandline indicates we are forking 331 ''' 332 if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': 333 assert len(argv) == 3 334 return True 335 else: 336 return False 337 338 339 def freeze_support(): 340 ''' 341 Run code for process object if this in not the main process 342 ''' 343 if is_forking(sys.argv): 344 main() 345 sys.exit() 346 347 348 def get_command_line(): 349 ''' 350 Returns prefix of command line used for spawning a child process 351 ''' 352 if getattr(process.current_process(), '_inheriting', False): 353 raise RuntimeError(''' 354 Attempt to start a new process before the current process 355 has finished its bootstrapping phase. 356 357 This probably means that you are on Windows and you have 358 forgotten to use the proper idiom in the main module: 359 360 if __name__ == '__main__': 361 freeze_support() 362 ... 363 364 The "freeze_support()" line can be omitted if the program 365 is not going to be frozen to produce a Windows executable.''') 366 367 if getattr(sys, 'frozen', False): 368 return [sys.executable, '--multiprocessing-fork'] 369 else: 370 prog = 'from multiprocess.forking import main; main()' 371 opts = util._args_from_interpreter_flags() 372 return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'] 373 374 375 def main(): 376 ''' 377 Run code specified by data received over pipe 378 ''' 379 assert is_forking(sys.argv) 380 381 handle = int(sys.argv[-1]) 382 fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) 383 from_parent = os.fdopen(fd, 'rb') 384 385 process.current_process()._inheriting = True 386 preparation_data = load(from_parent) 387 prepare(preparation_data) 388 self = load(from_parent) 389 process.current_process()._inheriting = False 390 391 from_parent.close() 392 393 exitcode = self._bootstrap() 394 exit(exitcode) 395 396 397 def get_preparation_data(name): 398 ''' 399 Return info about parent needed by child to unpickle process object 400 ''' 401 from .util import _logger, _log_to_stderr 402 403 d = dict( 404 name=name, 405 sys_path=sys.path, 406 sys_argv=sys.argv, 407 log_to_stderr=_log_to_stderr, 408 orig_dir=process.ORIGINAL_DIR, 409 authkey=process.current_process().authkey, 410 ) 411 412 if _logger is not None: 413 d['log_level'] = _logger.getEffectiveLevel() 414 415 if not WINEXE and not WINSERVICE and \ 416 not d['sys_argv'][0].lower().endswith('pythonservice.exe'): 417 main_path = getattr(sys.modules['__main__'], '__file__', None) 418 if not main_path and sys.argv[0] not in ('', '-c'): 419 main_path = sys.argv[0] 420 if main_path is not None: 421 if not os.path.isabs(main_path) and \ 422 process.ORIGINAL_DIR is not None: 423 main_path = os.path.join(process.ORIGINAL_DIR, main_path) 424 d['main_path'] = os.path.normpath(main_path) 425 426 return d 427 428 # 429 # Make (Pipe)Connection picklable 430 # 431 432 def reduce_connection(conn): 433 if not Popen.thread_is_spawning(): 434 raise RuntimeError( 435 'By default %s objects can only be shared between processes\n' 436 'using inheritance' % type(conn).__name__ 437 ) 438 return type(conn), (Popen.duplicate_for_child(conn.fileno()), 439 conn.readable, conn.writable) 440 441 ForkingPickler.register(Connection, reduce_connection) 442 ForkingPickler.register(PipeConnection, reduce_connection) 443 444# 445# Prepare current process 446# 447 448old_main_modules = [] 449 450def prepare(data): 451 ''' 452 Try to get current process ready to unpickle process object 453 ''' 454 old_main_modules.append(sys.modules['__main__']) 455 456 if 'name' in data: 457 process.current_process().name = data['name'] 458 459 if 'authkey' in data: 460 process.current_process()._authkey = data['authkey'] 461 462 if 'log_to_stderr' in data and data['log_to_stderr']: 463 util.log_to_stderr() 464 465 if 'log_level' in data: 466 util.get_logger().setLevel(data['log_level']) 467 468 if 'sys_path' in data: 469 sys.path = data['sys_path'] 470 471 if 'sys_argv' in data: 472 sys.argv = data['sys_argv'] 473 474 if 'dir' in data: 475 os.chdir(data['dir']) 476 477 if 'orig_dir' in data: 478 process.ORIGINAL_DIR = data['orig_dir'] 479 480 if 'main_path' in data: 481 # XXX (ncoghlan): The following code makes several bogus 482 # assumptions regarding the relationship between __file__ 483 # and a module's real name. See PEP 302 and issue #10845 484 # The problem is resolved properly in Python 3.4+, as 485 # described in issue #19946 486 487 main_path = data['main_path'] 488 main_name = os.path.splitext(os.path.basename(main_path))[0] 489 if main_name == '__init__': 490 main_name = os.path.basename(os.path.dirname(main_path)) 491 492 if main_name == '__main__': 493 # For directory and zipfile execution, we assume an implicit 494 # "if __name__ == '__main__':" around the module, and don't 495 # rerun the main module code in spawned processes 496 main_module = sys.modules['__main__'] 497 main_module.__file__ = main_path 498 elif main_name != 'ipython': 499 # Main modules not actually called __main__.py may 500 # contain additional code that should still be executed 501 import imp 502 503 if main_path is None: 504 dirs = None 505 elif os.path.basename(main_path).startswith('__init__.py'): 506 dirs = [os.path.dirname(os.path.dirname(main_path))] 507 else: 508 dirs = [os.path.dirname(main_path)] 509 510 assert main_name not in sys.modules, main_name 511 file, path_name, etc = imp.find_module(main_name, dirs) 512 try: 513 # We would like to do "imp.load_module('__main__', ...)" 514 # here. However, that would cause 'if __name__ == 515 # "__main__"' clauses to be executed. 516 main_module = imp.load_module( 517 '__parents_main__', file, path_name, etc 518 ) 519 finally: 520 if file: 521 file.close() 522 523 sys.modules['__main__'] = main_module 524 main_module.__name__ = '__main__' 525 526 # Try to make the potentially picklable objects in 527 # sys.modules['__main__'] realize they are in the main 528 # module -- somewhat ugly. 529 for obj in main_module.__dict__.values(): 530 try: 531 if obj.__module__ == '__parents_main__': 532 obj.__module__ = '__main__' 533 except Exception: 534 pass 535