1import os
2import sys
3import threading
4
5from . import process
6from . import reduction
7
8__all__ = ()
9
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15    pass
16
17class BufferTooShort(ProcessError):
18    pass
19
20class TimeoutError(ProcessError):
21    pass
22
23class AuthenticationError(ProcessError):
24    pass
25
26#
27# Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py
28#
29
30class BaseContext(object):
31
32    ProcessError = ProcessError
33    BufferTooShort = BufferTooShort
34    TimeoutError = TimeoutError
35    AuthenticationError = AuthenticationError
36
37    current_process = staticmethod(process.current_process)
38    parent_process = staticmethod(process.parent_process)
39    active_children = staticmethod(process.active_children)
40
41    def cpu_count(self):
42        '''Returns the number of CPUs in the system'''
43        num = os.cpu_count()
44        if num is None:
45            raise NotImplementedError('cannot determine number of cpus')
46        else:
47            return num
48
49    def Manager(self):
50        '''Returns a manager associated with a running server process
51
52        The managers methods such as `Lock()`, `Condition()` and `Queue()`
53        can be used to create shared objects.
54        '''
55        from .managers import SyncManager
56        m = SyncManager(ctx=self.get_context())
57        m.start()
58        return m
59
60    def Pipe(self, duplex=True):
61        '''Returns two connection object connected by a pipe'''
62        from .connection import Pipe
63        return Pipe(duplex)
64
65    def Lock(self):
66        '''Returns a non-recursive lock object'''
67        from .synchronize import Lock
68        return Lock(ctx=self.get_context())
69
70    def RLock(self):
71        '''Returns a recursive lock object'''
72        from .synchronize import RLock
73        return RLock(ctx=self.get_context())
74
75    def Condition(self, lock=None):
76        '''Returns a condition object'''
77        from .synchronize import Condition
78        return Condition(lock, ctx=self.get_context())
79
80    def Semaphore(self, value=1):
81        '''Returns a semaphore object'''
82        from .synchronize import Semaphore
83        return Semaphore(value, ctx=self.get_context())
84
85    def BoundedSemaphore(self, value=1):
86        '''Returns a bounded semaphore object'''
87        from .synchronize import BoundedSemaphore
88        return BoundedSemaphore(value, ctx=self.get_context())
89
90    def Event(self):
91        '''Returns an event object'''
92        from .synchronize import Event
93        return Event(ctx=self.get_context())
94
95    def Barrier(self, parties, action=None, timeout=None):
96        '''Returns a barrier object'''
97        from .synchronize import Barrier
98        return Barrier(parties, action, timeout, ctx=self.get_context())
99
100    def Queue(self, maxsize=0):
101        '''Returns a queue object'''
102        from .queues import Queue
103        return Queue(maxsize, ctx=self.get_context())
104
105    def JoinableQueue(self, maxsize=0):
106        '''Returns a queue object'''
107        from .queues import JoinableQueue
108        return JoinableQueue(maxsize, ctx=self.get_context())
109
110    def SimpleQueue(self):
111        '''Returns a queue object'''
112        from .queues import SimpleQueue
113        return SimpleQueue(ctx=self.get_context())
114
115    def Pool(self, processes=None, initializer=None, initargs=(),
116             maxtasksperchild=None):
117        '''Returns a process pool object'''
118        from .pool import Pool
119        return Pool(processes, initializer, initargs, maxtasksperchild,
120                    context=self.get_context())
121
122    def RawValue(self, typecode_or_type, *args):
123        '''Returns a shared object'''
124        from .sharedctypes import RawValue
125        return RawValue(typecode_or_type, *args)
126
127    def RawArray(self, typecode_or_type, size_or_initializer):
128        '''Returns a shared array'''
129        from .sharedctypes import RawArray
130        return RawArray(typecode_or_type, size_or_initializer)
131
132    def Value(self, typecode_or_type, *args, lock=True):
133        '''Returns a synchronized shared object'''
134        from .sharedctypes import Value
135        return Value(typecode_or_type, *args, lock=lock,
136                     ctx=self.get_context())
137
138    def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
139        '''Returns a synchronized shared array'''
140        from .sharedctypes import Array
141        return Array(typecode_or_type, size_or_initializer, lock=lock,
142                     ctx=self.get_context())
143
144    def freeze_support(self):
145        '''Check whether this is a fake forked process in a frozen executable.
146        If so then run code specified by commandline and exit.
147        '''
148        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
149            from .spawn import freeze_support
150            freeze_support()
151
152    def get_logger(self):
153        '''Return package logger -- if it does not already exist then
154        it is created.
155        '''
156        from .util import get_logger
157        return get_logger()
158
159    def log_to_stderr(self, level=None):
160        '''Turn on logging and add a handler which prints to stderr'''
161        from .util import log_to_stderr
162        return log_to_stderr(level)
163
164    def allow_connection_pickling(self):
165        '''Install support for sending connections and sockets
166        between processes
167        '''
168        # This is undocumented.  In previous versions of multiprocessing
169        # its only effect was to make socket objects inheritable on Windows.
170        from . import connection
171
172    def set_executable(self, executable):
173        '''Sets the path to a python.exe or pythonw.exe binary used to run
174        child processes instead of sys.executable when using the 'spawn'
175        start method.  Useful for people embedding Python.
176        '''
177        from .spawn import set_executable
178        set_executable(executable)
179
180    def set_forkserver_preload(self, module_names):
181        '''Set list of module names to try to load in forkserver process.
182        This is really just a hint.
183        '''
184        from .forkserver import set_forkserver_preload
185        set_forkserver_preload(module_names)
186
187    def get_context(self, method=None):
188        if method is None:
189            return self
190        try:
191            ctx = _concrete_contexts[method]
192        except KeyError:
193            raise ValueError('cannot find context for %r' % method) from None
194        ctx._check_available()
195        return ctx
196
197    def get_start_method(self, allow_none=False):
198        return self._name
199
200    def set_start_method(self, method, force=False):
201        raise ValueError('cannot set start method of concrete context')
202
203    @property
204    def reducer(self):
205        '''Controls how objects will be reduced to a form that can be
206        shared with other processes.'''
207        return globals().get('reduction')
208
209    @reducer.setter
210    def reducer(self, reduction):
211        globals()['reduction'] = reduction
212
213    def _check_available(self):
214        pass
215
216#
217# Type of default context -- underlying context can be set at most once
218#
219
220class Process(process.BaseProcess):
221    _start_method = None
222    @staticmethod
223    def _Popen(process_obj):
224        return _default_context.get_context().Process._Popen(process_obj)
225
226class DefaultContext(BaseContext):
227    Process = Process
228
229    def __init__(self, context):
230        self._default_context = context
231        self._actual_context = None
232
233    def get_context(self, method=None):
234        if method is None:
235            if self._actual_context is None:
236                self._actual_context = self._default_context
237            return self._actual_context
238        else:
239            return super().get_context(method)
240
241    def set_start_method(self, method, force=False):
242        if self._actual_context is not None and not force:
243            raise RuntimeError('context has already been set')
244        if method is None and force:
245            self._actual_context = None
246            return
247        self._actual_context = self.get_context(method)
248
249    def get_start_method(self, allow_none=False):
250        if self._actual_context is None:
251            if allow_none:
252                return None
253            self._actual_context = self._default_context
254        return self._actual_context._name
255
256    def get_all_start_methods(self):
257        if sys.platform == 'win32':
258            return ['spawn']
259        else:
260            methods = ['spawn', 'fork'] if sys.platform == 'darwin' else ['fork', 'spawn']
261            if reduction.HAVE_SEND_HANDLE:
262                methods.append('forkserver')
263            return methods
264
265
266#
267# Context types for fixed start method
268#
269
270if sys.platform != 'win32':
271
272    class ForkProcess(process.BaseProcess):
273        _start_method = 'fork'
274        @staticmethod
275        def _Popen(process_obj):
276            from .popen_fork import Popen
277            return Popen(process_obj)
278
279    class SpawnProcess(process.BaseProcess):
280        _start_method = 'spawn'
281        @staticmethod
282        def _Popen(process_obj):
283            from .popen_spawn_posix import Popen
284            return Popen(process_obj)
285
286    class ForkServerProcess(process.BaseProcess):
287        _start_method = 'forkserver'
288        @staticmethod
289        def _Popen(process_obj):
290            from .popen_forkserver import Popen
291            return Popen(process_obj)
292
293    class ForkContext(BaseContext):
294        _name = 'fork'
295        Process = ForkProcess
296
297    class SpawnContext(BaseContext):
298        _name = 'spawn'
299        Process = SpawnProcess
300
301    class ForkServerContext(BaseContext):
302        _name = 'forkserver'
303        Process = ForkServerProcess
304        def _check_available(self):
305            if not reduction.HAVE_SEND_HANDLE:
306                raise ValueError('forkserver start method not available')
307
308    _concrete_contexts = {
309        'fork': ForkContext(),
310        'spawn': SpawnContext(),
311        'forkserver': ForkServerContext(),
312    }
313    if sys.platform == 'darwin':
314        # bpo-33725: running arbitrary code after fork() is no longer reliable
315        # on macOS since macOS 10.14 (Mojave). Use spawn by default instead.
316        _default_context = DefaultContext(_concrete_contexts['spawn'])
317    else:
318        _default_context = DefaultContext(_concrete_contexts['fork'])
319
320else:
321
322    class SpawnProcess(process.BaseProcess):
323        _start_method = 'spawn'
324        @staticmethod
325        def _Popen(process_obj):
326            from .popen_spawn_win32 import Popen
327            return Popen(process_obj)
328
329    class SpawnContext(BaseContext):
330        _name = 'spawn'
331        Process = SpawnProcess
332
333    _concrete_contexts = {
334        'spawn': SpawnContext(),
335    }
336    _default_context = DefaultContext(_concrete_contexts['spawn'])
337
338#
339# Force the start method
340#
341
342def _force_start_method(method):
343    _default_context._actual_context = _concrete_contexts[method]
344
345#
346# Check that the current thread is spawning a child process
347#
348
349_tls = threading.local()
350
351def get_spawning_popen():
352    return getattr(_tls, 'spawning_popen', None)
353
354def set_spawning_popen(popen):
355    _tls.spawning_popen = popen
356
357def assert_spawning(obj):
358    if get_spawning_popen() is None:
359        raise RuntimeError(
360            '%s objects should only be shared between processes'
361            ' through inheritance' % type(obj).__name__
362            )
363