1from __future__ import absolute_import
2
3import os
4import sys
5import threading
6import warnings
7
8from . import process
9
10__all__ = []            # things are copied from here to __init__.py
11
12
13W_NO_EXECV = """\
14force_execv is not supported as the billiard C extension \
15is not installed\
16"""
17
18
19#
20# Exceptions
21#
22
23from .exceptions import (  # noqa
24    ProcessError,
25    BufferTooShort,
26    TimeoutError,
27    AuthenticationError,
28    TimeLimitExceeded,
29    SoftTimeLimitExceeded,
30    WorkerLostError,
31)
32
33
34#
35# Base type for contexts
36#
37
38class BaseContext(object):
39
40    ProcessError = ProcessError
41    BufferTooShort = BufferTooShort
42    TimeoutError = TimeoutError
43    AuthenticationError = AuthenticationError
44    TimeLimitExceeded = TimeLimitExceeded
45    SoftTimeLimitExceeded = SoftTimeLimitExceeded
46    WorkerLostError = WorkerLostError
47
48    current_process = staticmethod(process.current_process)
49    active_children = staticmethod(process.active_children)
50
51    if hasattr(os, 'cpu_count'):
52        def cpu_count(self):
53            '''Returns the number of CPUs in the system'''
54            num = os.cpu_count()
55            if num is None:
56                raise NotImplementedError('cannot determine number of cpus')
57            else:
58                return num
59    else:
60        def cpu_count(self):  # noqa
61            if sys.platform == 'win32':
62                try:
63                    num = int(os.environ['NUMBER_OF_PROCESSORS'])
64                except (ValueError, KeyError):
65                    num = 0
66            elif 'bsd' in sys.platform or sys.platform == 'darwin':
67                comm = '/sbin/sysctl -n hw.ncpu'
68                if sys.platform == 'darwin':
69                    comm = '/usr' + comm
70                try:
71                    with os.popen(comm) as p:
72                        num = int(p.read())
73                except ValueError:
74                    num = 0
75            else:
76                try:
77                    num = os.sysconf('SC_NPROCESSORS_ONLN')
78                except (ValueError, OSError, AttributeError):
79                    num = 0
80
81            if num >= 1:
82                return num
83            else:
84                raise NotImplementedError('cannot determine number of cpus')
85
86    def Manager(self):
87        '''Returns a manager associated with a running server process
88
89        The managers methods such as `Lock()`, `Condition()` and `Queue()`
90        can be used to create shared objects.
91        '''
92        from .managers import SyncManager
93        m = SyncManager(ctx=self.get_context())
94        m.start()
95        return m
96
97    def Pipe(self, duplex=True, rnonblock=False, wnonblock=False):
98        '''Returns two connection object connected by a pipe'''
99        from .connection import Pipe
100        return Pipe(duplex, rnonblock, wnonblock)
101
102    def Lock(self):
103        '''Returns a non-recursive lock object'''
104        from .synchronize import Lock
105        return Lock(ctx=self.get_context())
106
107    def RLock(self):
108        '''Returns a recursive lock object'''
109        from .synchronize import RLock
110        return RLock(ctx=self.get_context())
111
112    def Condition(self, lock=None):
113        '''Returns a condition object'''
114        from .synchronize import Condition
115        return Condition(lock, ctx=self.get_context())
116
117    def Semaphore(self, value=1):
118        '''Returns a semaphore object'''
119        from .synchronize import Semaphore
120        return Semaphore(value, ctx=self.get_context())
121
122    def BoundedSemaphore(self, value=1):
123        '''Returns a bounded semaphore object'''
124        from .synchronize import BoundedSemaphore
125        return BoundedSemaphore(value, ctx=self.get_context())
126
127    def Event(self):
128        '''Returns an event object'''
129        from .synchronize import Event
130        return Event(ctx=self.get_context())
131
132    def Barrier(self, parties, action=None, timeout=None):
133        '''Returns a barrier object'''
134        from .synchronize import Barrier
135        return Barrier(parties, action, timeout, ctx=self.get_context())
136
137    def Queue(self, maxsize=0):
138        '''Returns a queue object'''
139        from .queues import Queue
140        return Queue(maxsize, ctx=self.get_context())
141
142    def JoinableQueue(self, maxsize=0):
143        '''Returns a queue object'''
144        from .queues import JoinableQueue
145        return JoinableQueue(maxsize, ctx=self.get_context())
146
147    def SimpleQueue(self):
148        '''Returns a queue object'''
149        from .queues import SimpleQueue
150        return SimpleQueue(ctx=self.get_context())
151
152    def Pool(self, processes=None, initializer=None, initargs=(),
153             maxtasksperchild=None, timeout=None, soft_timeout=None,
154             lost_worker_timeout=None, max_restarts=None,
155             max_restart_freq=1, on_process_up=None, on_process_down=None,
156             on_timeout_set=None, on_timeout_cancel=None, threads=True,
157             semaphore=None, putlocks=False, allow_restart=False):
158        '''Returns a process pool object'''
159        from .pool import Pool
160        return Pool(processes, initializer, initargs, maxtasksperchild,
161                    timeout, soft_timeout, lost_worker_timeout,
162                    max_restarts, max_restart_freq, on_process_up,
163                    on_process_down, on_timeout_set, on_timeout_cancel,
164                    threads, semaphore, putlocks, allow_restart,
165                    context=self.get_context())
166
167    def RawValue(self, typecode_or_type, *args):
168        '''Returns a shared object'''
169        from .sharedctypes import RawValue
170        return RawValue(typecode_or_type, *args)
171
172    def RawArray(self, typecode_or_type, size_or_initializer):
173        '''Returns a shared array'''
174        from .sharedctypes import RawArray
175        return RawArray(typecode_or_type, size_or_initializer)
176
177    def Value(self, typecode_or_type, *args, **kwargs):
178        '''Returns a synchronized shared object'''
179        from .sharedctypes import Value
180        lock = kwargs.get('lock', True)
181        return Value(typecode_or_type, *args, lock=lock,
182                     ctx=self.get_context())
183
184    def Array(self, typecode_or_type, size_or_initializer, *args, **kwargs):
185        '''Returns a synchronized shared array'''
186        from .sharedctypes import Array
187        lock = kwargs.get('lock', True)
188        return Array(typecode_or_type, size_or_initializer, lock=lock,
189                     ctx=self.get_context())
190
191    def freeze_support(self):
192        '''Check whether this is a fake forked process in a frozen executable.
193        If so then run code specified by commandline and exit.
194        '''
195        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
196            from .spawn import freeze_support
197            freeze_support()
198
199    def get_logger(self):
200        '''Return package logger -- if it does not already exist then
201        it is created.
202        '''
203        from .util import get_logger
204        return get_logger()
205
206    def log_to_stderr(self, level=None):
207        '''Turn on logging and add a handler which prints to stderr'''
208        from .util import log_to_stderr
209        return log_to_stderr(level)
210
211    def allow_connection_pickling(self):
212        '''Install support for sending connections and sockets
213        between processes
214        '''
215        # This is undocumented.  In previous versions of multiprocessing
216        # its only effect was to make socket objects inheritable on Windows.
217        from . import connection  # noqa
218
219    def set_executable(self, executable):
220        '''Sets the path to a python.exe or pythonw.exe binary used to run
221        child processes instead of sys.executable when using the 'spawn'
222        start method.  Useful for people embedding Python.
223        '''
224        from .spawn import set_executable
225        set_executable(executable)
226
227    def set_forkserver_preload(self, module_names):
228        '''Set list of module names to try to load in forkserver process.
229        This is really just a hint.
230        '''
231        from .forkserver import set_forkserver_preload
232        set_forkserver_preload(module_names)
233
234    def get_context(self, method=None):
235        if method is None:
236            return self
237        try:
238            ctx = _concrete_contexts[method]
239        except KeyError:
240            raise ValueError('cannot find context for %r' % method)
241        ctx._check_available()
242        return ctx
243
244    def get_start_method(self, allow_none=False):
245        return self._name
246
247    def set_start_method(self, method=None):
248        raise ValueError('cannot set start method of concrete context')
249
250    def forking_is_enabled(self):
251        # XXX for compatibility with billiard <3.4
252        return (self.get_start_method() or 'fork') == 'fork'
253
254    def forking_enable(self, value):
255        # XXX for compatibility with billiard <3.4
256        if not value:
257            from ._ext import supports_exec
258            if supports_exec:
259                self.set_start_method('spawn', force=True)
260            else:
261                warnings.warn(RuntimeWarning(W_NO_EXECV))
262
263    def _check_available(self):
264        pass
265
266#
267# Type of default context -- underlying context can be set at most once
268#
269
270
271class Process(process.BaseProcess):
272    _start_method = None
273
274    @staticmethod
275    def _Popen(process_obj):
276        return _default_context.get_context().Process._Popen(process_obj)
277
278
279class DefaultContext(BaseContext):
280    Process = Process
281
282    def __init__(self, context):
283        self._default_context = context
284        self._actual_context = None
285
286    def get_context(self, method=None):
287        if method is None:
288            if self._actual_context is None:
289                self._actual_context = self._default_context
290            return self._actual_context
291        else:
292            return super(DefaultContext, self).get_context(method)
293
294    def set_start_method(self, method, force=False):
295        if self._actual_context is not None and not force:
296            raise RuntimeError('context has already been set')
297        if method is None and force:
298            self._actual_context = None
299            return
300        self._actual_context = self.get_context(method)
301
302    def get_start_method(self, allow_none=False):
303        if self._actual_context is None:
304            if allow_none:
305                return None
306            self._actual_context = self._default_context
307        return self._actual_context._name
308
309    def get_all_start_methods(self):
310        if sys.platform == 'win32':
311            return ['spawn']
312        else:
313            from . import reduction
314            if reduction.HAVE_SEND_HANDLE:
315                return ['fork', 'spawn', 'forkserver']
316            else:
317                return ['fork', 'spawn']
318
319DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
320
321#
322# Context types for fixed start method
323#
324
325if sys.platform != 'win32':
326
327    class ForkProcess(process.BaseProcess):
328        _start_method = 'fork'
329
330        @staticmethod
331        def _Popen(process_obj):
332            from .popen_fork import Popen
333            return Popen(process_obj)
334
335    class SpawnProcess(process.BaseProcess):
336        _start_method = 'spawn'
337
338        @staticmethod
339        def _Popen(process_obj):
340            from .popen_spawn_posix import Popen
341            return Popen(process_obj)
342
343    class ForkServerProcess(process.BaseProcess):
344        _start_method = 'forkserver'
345
346        @staticmethod
347        def _Popen(process_obj):
348            from .popen_forkserver import Popen
349            return Popen(process_obj)
350
351    class ForkContext(BaseContext):
352        _name = 'fork'
353        Process = ForkProcess
354
355    class SpawnContext(BaseContext):
356        _name = 'spawn'
357        Process = SpawnProcess
358
359    class ForkServerContext(BaseContext):
360        _name = 'forkserver'
361        Process = ForkServerProcess
362
363        def _check_available(self):
364            from . import reduction
365            if not reduction.HAVE_SEND_HANDLE:
366                raise ValueError('forkserver start method not available')
367
368    _concrete_contexts = {
369        'fork': ForkContext(),
370        'spawn': SpawnContext(),
371        'forkserver': ForkServerContext(),
372    }
373    _default_context = DefaultContext(_concrete_contexts['fork'])
374
375else:
376
377    class SpawnProcess(process.BaseProcess):
378        _start_method = 'spawn'
379
380        @staticmethod
381        def _Popen(process_obj):
382            from .popen_spawn_win32 import Popen
383            return Popen(process_obj)
384
385    class SpawnContext(BaseContext):
386        _name = 'spawn'
387        Process = SpawnProcess
388
389    _concrete_contexts = {
390        'spawn': SpawnContext(),
391    }
392    _default_context = DefaultContext(_concrete_contexts['spawn'])
393
394#
395# Force the start method
396#
397
398
399def _force_start_method(method):
400    _default_context._actual_context = _concrete_contexts[method]
401
402#
403# Check that the current thread is spawning a child process
404#
405
406_tls = threading.local()
407
408
409def get_spawning_popen():
410    return getattr(_tls, 'spawning_popen', None)
411
412
413def set_spawning_popen(popen):
414    _tls.spawning_popen = popen
415
416
417def assert_spawning(obj):
418    if get_spawning_popen() is None:
419        raise RuntimeError(
420            '%s objects should only be shared between processes'
421            ' through inheritance' % type(obj).__name__
422        )
423