1#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14#    notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16#    notice, this list of conditions and the following disclaimer in the
17#    documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19#    used to endorse or promote products derived from this software
20#    without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
33#
34
35import os
36import sys
37import signal
38import errno
39
40from multiprocess import util, process
41
42__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
43
44#
45# Check that the current thread is spawning a child process
46#
47
48def assert_spawning(self):
49    if not Popen.thread_is_spawning():
50        raise RuntimeError(
51            '%s objects should only be shared between processes'
52            ' through inheritance' % type(self).__name__
53            )
54
55#
56# Try making some callable types picklable
57#
58
59try:
60    from dill import Pickler
61except ImportError:
62    from pickle import Pickler
63class ForkingPickler(Pickler):
64    dispatch = Pickler.dispatch.copy()
65
66    def __init__(self, *args, **kwds):
67        Pickler.__init__(self, *args, **kwds)
68
69    @classmethod
70    def register(cls, type, reduce):
71        def dispatcher(self, obj):
72            rv = reduce(obj)
73            self.save_reduce(obj=obj, *rv)
74        cls.dispatch[type] = dispatcher
75
76def _reduce_method(m):
77    if m.im_self is None:
78        return getattr, (m.im_class, m.im_func.func_name)
79    else:
80        return getattr, (m.im_self, m.im_func.func_name)
81ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
82
83if type(list.append) is not type(ForkingPickler.save):
84    # Some python implementations have unbound methods even for builtin types
85    def _reduce_method_descriptor(m):
86        return getattr, (m.__objclass__, m.__name__)
87    ForkingPickler.register(type(list.append), _reduce_method_descriptor)
88    ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
89
90try:
91    from functools import partial
92except ImportError:
93    pass
94else:
95    def _reduce_partial(p):
96        return _rebuild_partial, (p.func, p.args, p.keywords or {})
97    def _rebuild_partial(func, args, keywords):
98        return partial(func, *args, **keywords)
99    ForkingPickler.register(partial, _reduce_partial)
100
101#
102# Unix
103#
104
105if sys.platform != 'win32':
106    import time
107
108    exit = os._exit
109    duplicate = os.dup
110    close = os.close
111
112    #
113    # We define a Popen class similar to the one from subprocess, but
114    # whose constructor takes a process object as its argument.
115    #
116
117    class Popen(object):
118
119        def __init__(self, process_obj):
120            sys.stdout.flush()
121            sys.stderr.flush()
122            self.returncode = None
123
124            self.pid = os.fork()
125            if self.pid == 0:
126                if 'random' in sys.modules:
127                    import random
128                    random.seed()
129                code = process_obj._bootstrap()
130                sys.stdout.flush()
131                sys.stderr.flush()
132                os._exit(code)
133
134        def poll(self, flag=os.WNOHANG):
135            if self.returncode is None:
136                while True:
137                    try:
138                        pid, sts = os.waitpid(self.pid, flag)
139                    except os.error as e:
140                        if e.errno == errno.EINTR:
141                            continue
142                        # Child process not yet created. See #1731717
143                        # e.errno == errno.ECHILD == 10
144                        return None
145                    else:
146                        break
147                if pid == self.pid:
148                    if os.WIFSIGNALED(sts):
149                        self.returncode = -os.WTERMSIG(sts)
150                    else:
151                        assert os.WIFEXITED(sts)
152                        self.returncode = os.WEXITSTATUS(sts)
153            return self.returncode
154
155        def wait(self, timeout=None):
156            if timeout is None:
157                return self.poll(0)
158            deadline = time.time() + timeout
159            delay = 0.0005
160            while 1:
161                res = self.poll()
162                if res is not None:
163                    break
164                remaining = deadline - time.time()
165                if remaining <= 0:
166                    break
167                delay = min(delay * 2, remaining, 0.05)
168                time.sleep(delay)
169            return res
170
171        def terminate(self):
172            if self.returncode is None:
173                try:
174                    os.kill(self.pid, signal.SIGTERM)
175                except OSError, e:
176                    if self.wait(timeout=0.1) is None:
177                        raise
178
179        @staticmethod
180        def thread_is_spawning():
181            return False
182
183#
184# Windows
185#
186
187else:
188    import thread
189    import msvcrt
190    import _subprocess
191    import time
192
193    try:
194        from _multiprocess import win32, Connection, PipeConnection
195    except ImportError:
196        from _multiprocessing import win32, Connection, PipeConnection
197    from .util import Finalize
198
199    try:
200    #   from cPickle import dump, load, HIGHEST_PROTOCOL
201        from dill import load, DEFAULT_PROTOCOL as HIGHEST_PROTOCOL
202    except ImportError:
203        from pickle import load, HIGHEST_PROTOCOL
204
205    def dump(obj, file, protocol=None, *args, **kwds):
206        ForkingPickler(file, protocol, *args, **kwds).dump(obj)
207
208    #
209    #
210    #
211
212    TERMINATE = 0x10000
213    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
214    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
215
216    exit = win32.ExitProcess
217    close = win32.CloseHandle
218
219    #
220    # _python_exe is the assumed path to the python executable.
221    # People embedding Python want to modify it.
222    #
223
224    if WINSERVICE:
225        _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
226    else:
227        _python_exe = sys.executable
228
229    def set_executable(exe):
230        global _python_exe
231        _python_exe = exe
232
233    #
234    #
235    #
236
237    def duplicate(handle, target_process=None, inheritable=False):
238        if target_process is None:
239            target_process = _subprocess.GetCurrentProcess()
240        return _subprocess.DuplicateHandle(
241            _subprocess.GetCurrentProcess(), handle, target_process,
242            0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
243            ).Detach()
244
245    #
246    # We define a Popen class similar to the one from subprocess, but
247    # whose constructor takes a process object as its argument.
248    #
249
250    class Popen(object):
251        '''
252        Start a subprocess to run the code of a process object
253        '''
254        _tls = thread._local()
255
256        def __init__(self, process_obj):
257            # create pipe for communication with child
258            rfd, wfd = os.pipe()
259
260            # get handle for read end of the pipe and make it inheritable
261            rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
262            os.close(rfd)
263
264            # start process
265            cmd = get_command_line() + [rhandle]
266            cmd = ' '.join('"%s"' % x for x in cmd)
267            hp, ht, pid, tid = _subprocess.CreateProcess(
268                _python_exe, cmd, None, None, 1, 0, None, None, None
269                )
270            ht.Close()
271            close(rhandle)
272
273            # set attributes of self
274            self.pid = pid
275            self.returncode = None
276            self._handle = hp
277
278            # send information to child
279            prep_data = get_preparation_data(process_obj._name)
280            to_child = os.fdopen(wfd, 'wb')
281            Popen._tls.process_handle = int(hp)
282            try:
283                dump(prep_data, to_child, HIGHEST_PROTOCOL)
284                dump(process_obj, to_child, HIGHEST_PROTOCOL)
285            finally:
286                del Popen._tls.process_handle
287                to_child.close()
288
289        @staticmethod
290        def thread_is_spawning():
291            return getattr(Popen._tls, 'process_handle', None) is not None
292
293        @staticmethod
294        def duplicate_for_child(handle):
295            return duplicate(handle, Popen._tls.process_handle)
296
297        def wait(self, timeout=None):
298            if self.returncode is None:
299                if timeout is None:
300                    msecs = _subprocess.INFINITE
301                else:
302                    msecs = max(0, int(timeout * 1000 + 0.5))
303
304                res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
305                if res == _subprocess.WAIT_OBJECT_0:
306                    code = _subprocess.GetExitCodeProcess(self._handle)
307                    if code == TERMINATE:
308                        code = -signal.SIGTERM
309                    self.returncode = code
310
311            return self.returncode
312
313        def poll(self):
314            return self.wait(timeout=0)
315
316        def terminate(self):
317            if self.returncode is None:
318                try:
319                    _subprocess.TerminateProcess(int(self._handle), TERMINATE)
320                except WindowsError:
321                    if self.wait(timeout=0.1) is None:
322                        raise
323
324    #
325    #
326    #
327
328    def is_forking(argv):
329        '''
330        Return whether commandline indicates we are forking
331        '''
332        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
333            assert len(argv) == 3
334            return True
335        else:
336            return False
337
338
339    def freeze_support():
340        '''
341        Run code for process object if this in not the main process
342        '''
343        if is_forking(sys.argv):
344            main()
345            sys.exit()
346
347
348    def get_command_line():
349        '''
350        Returns prefix of command line used for spawning a child process
351        '''
352        if getattr(process.current_process(), '_inheriting', False):
353            raise RuntimeError('''
354            Attempt to start a new process before the current process
355            has finished its bootstrapping phase.
356
357            This probably means that you are on Windows and you have
358            forgotten to use the proper idiom in the main module:
359
360                if __name__ == '__main__':
361                    freeze_support()
362                    ...
363
364            The "freeze_support()" line can be omitted if the program
365            is not going to be frozen to produce a Windows executable.''')
366
367        if getattr(sys, 'frozen', False):
368            return [sys.executable, '--multiprocessing-fork']
369        else:
370            prog = 'from multiprocess.forking import main; main()'
371            opts = util._args_from_interpreter_flags()
372            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
373
374
375    def main():
376        '''
377        Run code specified by data received over pipe
378        '''
379        assert is_forking(sys.argv)
380
381        handle = int(sys.argv[-1])
382        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
383        from_parent = os.fdopen(fd, 'rb')
384
385        process.current_process()._inheriting = True
386        preparation_data = load(from_parent)
387        prepare(preparation_data)
388        self = load(from_parent)
389        process.current_process()._inheriting = False
390
391        from_parent.close()
392
393        exitcode = self._bootstrap()
394        exit(exitcode)
395
396
397    def get_preparation_data(name):
398        '''
399        Return info about parent needed by child to unpickle process object
400        '''
401        from .util import _logger, _log_to_stderr
402
403        d = dict(
404            name=name,
405            sys_path=sys.path,
406            sys_argv=sys.argv,
407            log_to_stderr=_log_to_stderr,
408            orig_dir=process.ORIGINAL_DIR,
409            authkey=process.current_process().authkey,
410            )
411
412        if _logger is not None:
413            d['log_level'] = _logger.getEffectiveLevel()
414
415        if not WINEXE and not WINSERVICE and \
416           not d['sys_argv'][0].lower().endswith('pythonservice.exe'):
417            main_path = getattr(sys.modules['__main__'], '__file__', None)
418            if not main_path and sys.argv[0] not in ('', '-c'):
419                main_path = sys.argv[0]
420            if main_path is not None:
421                if not os.path.isabs(main_path) and \
422                                          process.ORIGINAL_DIR is not None:
423                    main_path = os.path.join(process.ORIGINAL_DIR, main_path)
424                d['main_path'] = os.path.normpath(main_path)
425
426        return d
427
428    #
429    # Make (Pipe)Connection picklable
430    #
431
432    def reduce_connection(conn):
433        if not Popen.thread_is_spawning():
434            raise RuntimeError(
435                'By default %s objects can only be shared between processes\n'
436                'using inheritance' % type(conn).__name__
437                )
438        return type(conn), (Popen.duplicate_for_child(conn.fileno()),
439                            conn.readable, conn.writable)
440
441    ForkingPickler.register(Connection, reduce_connection)
442    ForkingPickler.register(PipeConnection, reduce_connection)
443
444#
445# Prepare current process
446#
447
448old_main_modules = []
449
450def prepare(data):
451    '''
452    Try to get current process ready to unpickle process object
453    '''
454    old_main_modules.append(sys.modules['__main__'])
455
456    if 'name' in data:
457        process.current_process().name = data['name']
458
459    if 'authkey' in data:
460        process.current_process()._authkey = data['authkey']
461
462    if 'log_to_stderr' in data and data['log_to_stderr']:
463        util.log_to_stderr()
464
465    if 'log_level' in data:
466        util.get_logger().setLevel(data['log_level'])
467
468    if 'sys_path' in data:
469        sys.path = data['sys_path']
470
471    if 'sys_argv' in data:
472        sys.argv = data['sys_argv']
473
474    if 'dir' in data:
475        os.chdir(data['dir'])
476
477    if 'orig_dir' in data:
478        process.ORIGINAL_DIR = data['orig_dir']
479
480    if 'main_path' in data:
481        # XXX (ncoghlan): The following code makes several bogus
482        # assumptions regarding the relationship between __file__
483        # and a module's real name. See PEP 302 and issue #10845
484        # The problem is resolved properly in Python 3.4+, as
485        # described in issue #19946
486
487        main_path = data['main_path']
488        main_name = os.path.splitext(os.path.basename(main_path))[0]
489        if main_name == '__init__':
490            main_name = os.path.basename(os.path.dirname(main_path))
491
492        if main_name == '__main__':
493            # For directory and zipfile execution, we assume an implicit
494            # "if __name__ == '__main__':" around the module, and don't
495            # rerun the main module code in spawned processes
496            main_module = sys.modules['__main__']
497            main_module.__file__ = main_path
498        elif main_name != 'ipython':
499            # Main modules not actually called __main__.py may
500            # contain additional code that should still be executed
501            import imp
502
503            if main_path is None:
504                dirs = None
505            elif os.path.basename(main_path).startswith('__init__.py'):
506                dirs = [os.path.dirname(os.path.dirname(main_path))]
507            else:
508                dirs = [os.path.dirname(main_path)]
509
510            assert main_name not in sys.modules, main_name
511            file, path_name, etc = imp.find_module(main_name, dirs)
512            try:
513                # We would like to do "imp.load_module('__main__', ...)"
514                # here.  However, that would cause 'if __name__ ==
515                # "__main__"' clauses to be executed.
516                main_module = imp.load_module(
517                    '__parents_main__', file, path_name, etc
518                    )
519            finally:
520                if file:
521                    file.close()
522
523            sys.modules['__main__'] = main_module
524            main_module.__name__ = '__main__'
525
526            # Try to make the potentially picklable objects in
527            # sys.modules['__main__'] realize they are in the main
528            # module -- somewhat ugly.
529            for obj in main_module.__dict__.values():
530                try:
531                    if obj.__module__ == '__parents_main__':
532                        obj.__module__ = '__main__'
533                except Exception:
534                    pass
535