1from __future__ import absolute_import 2 3import os 4import sys 5import threading 6import warnings 7 8from . import process 9 10__all__ = [] # things are copied from here to __init__.py 11 12 13W_NO_EXECV = """\ 14force_execv is not supported as the billiard C extension \ 15is not installed\ 16""" 17 18 19# 20# Exceptions 21# 22 23from .exceptions import ( # noqa 24 ProcessError, 25 BufferTooShort, 26 TimeoutError, 27 AuthenticationError, 28 TimeLimitExceeded, 29 SoftTimeLimitExceeded, 30 WorkerLostError, 31) 32 33 34# 35# Base type for contexts 36# 37 38class BaseContext(object): 39 40 ProcessError = ProcessError 41 BufferTooShort = BufferTooShort 42 TimeoutError = TimeoutError 43 AuthenticationError = AuthenticationError 44 TimeLimitExceeded = TimeLimitExceeded 45 SoftTimeLimitExceeded = SoftTimeLimitExceeded 46 WorkerLostError = WorkerLostError 47 48 current_process = staticmethod(process.current_process) 49 active_children = staticmethod(process.active_children) 50 51 if hasattr(os, 'cpu_count'): 52 def cpu_count(self): 53 '''Returns the number of CPUs in the system''' 54 num = os.cpu_count() 55 if num is None: 56 raise NotImplementedError('cannot determine number of cpus') 57 else: 58 return num 59 else: 60 def cpu_count(self): # noqa 61 if sys.platform == 'win32': 62 try: 63 num = int(os.environ['NUMBER_OF_PROCESSORS']) 64 except (ValueError, KeyError): 65 num = 0 66 elif 'bsd' in sys.platform or sys.platform == 'darwin': 67 comm = '/sbin/sysctl -n hw.ncpu' 68 if sys.platform == 'darwin': 69 comm = '/usr' + comm 70 try: 71 with os.popen(comm) as p: 72 num = int(p.read()) 73 except ValueError: 74 num = 0 75 else: 76 try: 77 num = os.sysconf('SC_NPROCESSORS_ONLN') 78 except (ValueError, OSError, AttributeError): 79 num = 0 80 81 if num >= 1: 82 return num 83 else: 84 raise NotImplementedError('cannot determine number of cpus') 85 86 def Manager(self): 87 '''Returns a manager associated with a running server process 88 89 The managers methods such as `Lock()`, `Condition()` and `Queue()` 90 can be used to create shared objects. 91 ''' 92 from .managers import SyncManager 93 m = SyncManager(ctx=self.get_context()) 94 m.start() 95 return m 96 97 def Pipe(self, duplex=True, rnonblock=False, wnonblock=False): 98 '''Returns two connection object connected by a pipe''' 99 from .connection import Pipe 100 return Pipe(duplex, rnonblock, wnonblock) 101 102 def Lock(self): 103 '''Returns a non-recursive lock object''' 104 from .synchronize import Lock 105 return Lock(ctx=self.get_context()) 106 107 def RLock(self): 108 '''Returns a recursive lock object''' 109 from .synchronize import RLock 110 return RLock(ctx=self.get_context()) 111 112 def Condition(self, lock=None): 113 '''Returns a condition object''' 114 from .synchronize import Condition 115 return Condition(lock, ctx=self.get_context()) 116 117 def Semaphore(self, value=1): 118 '''Returns a semaphore object''' 119 from .synchronize import Semaphore 120 return Semaphore(value, ctx=self.get_context()) 121 122 def BoundedSemaphore(self, value=1): 123 '''Returns a bounded semaphore object''' 124 from .synchronize import BoundedSemaphore 125 return BoundedSemaphore(value, ctx=self.get_context()) 126 127 def Event(self): 128 '''Returns an event object''' 129 from .synchronize import Event 130 return Event(ctx=self.get_context()) 131 132 def Barrier(self, parties, action=None, timeout=None): 133 '''Returns a barrier object''' 134 from .synchronize import Barrier 135 return Barrier(parties, action, timeout, ctx=self.get_context()) 136 137 def Queue(self, maxsize=0): 138 '''Returns a queue object''' 139 from .queues import Queue 140 return Queue(maxsize, ctx=self.get_context()) 141 142 def JoinableQueue(self, maxsize=0): 143 '''Returns a queue object''' 144 from .queues import JoinableQueue 145 return JoinableQueue(maxsize, ctx=self.get_context()) 146 147 def SimpleQueue(self): 148 '''Returns a queue object''' 149 from .queues import SimpleQueue 150 return SimpleQueue(ctx=self.get_context()) 151 152 def Pool(self, processes=None, initializer=None, initargs=(), 153 maxtasksperchild=None, timeout=None, soft_timeout=None, 154 lost_worker_timeout=None, max_restarts=None, 155 max_restart_freq=1, on_process_up=None, on_process_down=None, 156 on_timeout_set=None, on_timeout_cancel=None, threads=True, 157 semaphore=None, putlocks=False, allow_restart=False): 158 '''Returns a process pool object''' 159 from .pool import Pool 160 return Pool(processes, initializer, initargs, maxtasksperchild, 161 timeout, soft_timeout, lost_worker_timeout, 162 max_restarts, max_restart_freq, on_process_up, 163 on_process_down, on_timeout_set, on_timeout_cancel, 164 threads, semaphore, putlocks, allow_restart, 165 context=self.get_context()) 166 167 def RawValue(self, typecode_or_type, *args): 168 '''Returns a shared object''' 169 from .sharedctypes import RawValue 170 return RawValue(typecode_or_type, *args) 171 172 def RawArray(self, typecode_or_type, size_or_initializer): 173 '''Returns a shared array''' 174 from .sharedctypes import RawArray 175 return RawArray(typecode_or_type, size_or_initializer) 176 177 def Value(self, typecode_or_type, *args, **kwargs): 178 '''Returns a synchronized shared object''' 179 from .sharedctypes import Value 180 lock = kwargs.get('lock', True) 181 return Value(typecode_or_type, *args, lock=lock, 182 ctx=self.get_context()) 183 184 def Array(self, typecode_or_type, size_or_initializer, *args, **kwargs): 185 '''Returns a synchronized shared array''' 186 from .sharedctypes import Array 187 lock = kwargs.get('lock', True) 188 return Array(typecode_or_type, size_or_initializer, lock=lock, 189 ctx=self.get_context()) 190 191 def freeze_support(self): 192 '''Check whether this is a fake forked process in a frozen executable. 193 If so then run code specified by commandline and exit. 194 ''' 195 if sys.platform == 'win32' and getattr(sys, 'frozen', False): 196 from .spawn import freeze_support 197 freeze_support() 198 199 def get_logger(self): 200 '''Return package logger -- if it does not already exist then 201 it is created. 202 ''' 203 from .util import get_logger 204 return get_logger() 205 206 def log_to_stderr(self, level=None): 207 '''Turn on logging and add a handler which prints to stderr''' 208 from .util import log_to_stderr 209 return log_to_stderr(level) 210 211 def allow_connection_pickling(self): 212 '''Install support for sending connections and sockets 213 between processes 214 ''' 215 # This is undocumented. In previous versions of multiprocessing 216 # its only effect was to make socket objects inheritable on Windows. 217 from . import connection # noqa 218 219 def set_executable(self, executable): 220 '''Sets the path to a python.exe or pythonw.exe binary used to run 221 child processes instead of sys.executable when using the 'spawn' 222 start method. Useful for people embedding Python. 223 ''' 224 from .spawn import set_executable 225 set_executable(executable) 226 227 def set_forkserver_preload(self, module_names): 228 '''Set list of module names to try to load in forkserver process. 229 This is really just a hint. 230 ''' 231 from .forkserver import set_forkserver_preload 232 set_forkserver_preload(module_names) 233 234 def get_context(self, method=None): 235 if method is None: 236 return self 237 try: 238 ctx = _concrete_contexts[method] 239 except KeyError: 240 raise ValueError('cannot find context for %r' % method) 241 ctx._check_available() 242 return ctx 243 244 def get_start_method(self, allow_none=False): 245 return self._name 246 247 def set_start_method(self, method=None): 248 raise ValueError('cannot set start method of concrete context') 249 250 def forking_is_enabled(self): 251 # XXX for compatibility with billiard <3.4 252 return (self.get_start_method() or 'fork') == 'fork' 253 254 def forking_enable(self, value): 255 # XXX for compatibility with billiard <3.4 256 if not value: 257 from ._ext import supports_exec 258 if supports_exec: 259 self.set_start_method('spawn', force=True) 260 else: 261 warnings.warn(RuntimeWarning(W_NO_EXECV)) 262 263 def _check_available(self): 264 pass 265 266# 267# Type of default context -- underlying context can be set at most once 268# 269 270 271class Process(process.BaseProcess): 272 _start_method = None 273 274 @staticmethod 275 def _Popen(process_obj): 276 return _default_context.get_context().Process._Popen(process_obj) 277 278 279class DefaultContext(BaseContext): 280 Process = Process 281 282 def __init__(self, context): 283 self._default_context = context 284 self._actual_context = None 285 286 def get_context(self, method=None): 287 if method is None: 288 if self._actual_context is None: 289 self._actual_context = self._default_context 290 return self._actual_context 291 else: 292 return super(DefaultContext, self).get_context(method) 293 294 def set_start_method(self, method, force=False): 295 if self._actual_context is not None and not force: 296 raise RuntimeError('context has already been set') 297 if method is None and force: 298 self._actual_context = None 299 return 300 self._actual_context = self.get_context(method) 301 302 def get_start_method(self, allow_none=False): 303 if self._actual_context is None: 304 if allow_none: 305 return None 306 self._actual_context = self._default_context 307 return self._actual_context._name 308 309 def get_all_start_methods(self): 310 if sys.platform == 'win32': 311 return ['spawn'] 312 else: 313 from . import reduction 314 if reduction.HAVE_SEND_HANDLE: 315 return ['fork', 'spawn', 'forkserver'] 316 else: 317 return ['fork', 'spawn'] 318 319DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_') 320 321# 322# Context types for fixed start method 323# 324 325if sys.platform != 'win32': 326 327 class ForkProcess(process.BaseProcess): 328 _start_method = 'fork' 329 330 @staticmethod 331 def _Popen(process_obj): 332 from .popen_fork import Popen 333 return Popen(process_obj) 334 335 class SpawnProcess(process.BaseProcess): 336 _start_method = 'spawn' 337 338 @staticmethod 339 def _Popen(process_obj): 340 from .popen_spawn_posix import Popen 341 return Popen(process_obj) 342 343 class ForkServerProcess(process.BaseProcess): 344 _start_method = 'forkserver' 345 346 @staticmethod 347 def _Popen(process_obj): 348 from .popen_forkserver import Popen 349 return Popen(process_obj) 350 351 class ForkContext(BaseContext): 352 _name = 'fork' 353 Process = ForkProcess 354 355 class SpawnContext(BaseContext): 356 _name = 'spawn' 357 Process = SpawnProcess 358 359 class ForkServerContext(BaseContext): 360 _name = 'forkserver' 361 Process = ForkServerProcess 362 363 def _check_available(self): 364 from . import reduction 365 if not reduction.HAVE_SEND_HANDLE: 366 raise ValueError('forkserver start method not available') 367 368 _concrete_contexts = { 369 'fork': ForkContext(), 370 'spawn': SpawnContext(), 371 'forkserver': ForkServerContext(), 372 } 373 _default_context = DefaultContext(_concrete_contexts['fork']) 374 375else: 376 377 class SpawnProcess(process.BaseProcess): 378 _start_method = 'spawn' 379 380 @staticmethod 381 def _Popen(process_obj): 382 from .popen_spawn_win32 import Popen 383 return Popen(process_obj) 384 385 class SpawnContext(BaseContext): 386 _name = 'spawn' 387 Process = SpawnProcess 388 389 _concrete_contexts = { 390 'spawn': SpawnContext(), 391 } 392 _default_context = DefaultContext(_concrete_contexts['spawn']) 393 394# 395# Force the start method 396# 397 398 399def _force_start_method(method): 400 _default_context._actual_context = _concrete_contexts[method] 401 402# 403# Check that the current thread is spawning a child process 404# 405 406_tls = threading.local() 407 408 409def get_spawning_popen(): 410 return getattr(_tls, 'spawning_popen', None) 411 412 413def set_spawning_popen(popen): 414 _tls.spawning_popen = popen 415 416 417def assert_spawning(obj): 418 if get_spawning_popen() is None: 419 raise RuntimeError( 420 '%s objects should only be shared between processes' 421 ' through inheritance' % type(obj).__name__ 422 ) 423