1:mod:`concurrent.futures` --- Launching parallel tasks 2====================================================== 3 4.. module:: concurrent.futures 5 :synopsis: Execute computations concurrently using threads or processes. 6 7.. versionadded:: 3.2 8 9**Source code:** :source:`Lib/concurrent/futures/thread.py` 10and :source:`Lib/concurrent/futures/process.py` 11 12-------------- 13 14The :mod:`concurrent.futures` module provides a high-level interface for 15asynchronously executing callables. 16 17The asynchronous execution can be performed with threads, using 18:class:`ThreadPoolExecutor`, or separate processes, using 19:class:`ProcessPoolExecutor`. Both implement the same interface, which is 20defined by the abstract :class:`Executor` class. 21 22 23Executor Objects 24---------------- 25 26.. class:: Executor 27 28 An abstract class that provides methods to execute calls asynchronously. It 29 should not be used directly, but through its concrete subclasses. 30 31 .. method:: submit(fn, *args, **kwargs) 32 33 Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)`` 34 and returns a :class:`Future` object representing the execution of the 35 callable. :: 36 37 with ThreadPoolExecutor(max_workers=1) as executor: 38 future = executor.submit(pow, 323, 1235) 39 print(future.result()) 40 41 .. method:: map(func, *iterables, timeout=None, chunksize=1) 42 43 Similar to :func:`map(func, *iterables) <map>` except: 44 45 * the *iterables* are collected immediately rather than lazily; 46 47 * *func* is executed asynchronously and several calls to 48 *func* may be made concurrently. 49 50 The returned iterator raises a :exc:`concurrent.futures.TimeoutError` 51 if :meth:`~iterator.__next__` is called and the result isn't available 52 after *timeout* seconds from the original call to :meth:`Executor.map`. 53 *timeout* can be an int or a float. If *timeout* is not specified or 54 ``None``, there is no limit to the wait time. 55 56 If a *func* call raises an exception, then that exception will be 57 raised when its value is retrieved from the iterator. 58 59 When using :class:`ProcessPoolExecutor`, this method chops *iterables* 60 into a number of chunks which it submits to the pool as separate 61 tasks. The (approximate) size of these chunks can be specified by 62 setting *chunksize* to a positive integer. For very long iterables, 63 using a large value for *chunksize* can significantly improve 64 performance compared to the default size of 1. With 65 :class:`ThreadPoolExecutor`, *chunksize* has no effect. 66 67 .. versionchanged:: 3.5 68 Added the *chunksize* argument. 69 70 .. method:: shutdown(wait=True) 71 72 Signal the executor that it should free any resources that it is using 73 when the currently pending futures are done executing. Calls to 74 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will 75 raise :exc:`RuntimeError`. 76 77 If *wait* is ``True`` then this method will not return until all the 78 pending futures are done executing and the resources associated with the 79 executor have been freed. If *wait* is ``False`` then this method will 80 return immediately and the resources associated with the executor will be 81 freed when all pending futures are done executing. Regardless of the 82 value of *wait*, the entire Python program will not exit until all 83 pending futures are done executing. 84 85 You can avoid having to call this method explicitly if you use the 86 :keyword:`with` statement, which will shutdown the :class:`Executor` 87 (waiting as if :meth:`Executor.shutdown` were called with *wait* set to 88 ``True``):: 89 90 import shutil 91 with ThreadPoolExecutor(max_workers=4) as e: 92 e.submit(shutil.copy, 'src1.txt', 'dest1.txt') 93 e.submit(shutil.copy, 'src2.txt', 'dest2.txt') 94 e.submit(shutil.copy, 'src3.txt', 'dest3.txt') 95 e.submit(shutil.copy, 'src4.txt', 'dest4.txt') 96 97 98ThreadPoolExecutor 99------------------ 100 101:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of 102threads to execute calls asynchronously. 103 104Deadlocks can occur when the callable associated with a :class:`Future` waits on 105the results of another :class:`Future`. For example:: 106 107 import time 108 def wait_on_b(): 109 time.sleep(5) 110 print(b.result()) # b will never complete because it is waiting on a. 111 return 5 112 113 def wait_on_a(): 114 time.sleep(5) 115 print(a.result()) # a will never complete because it is waiting on b. 116 return 6 117 118 119 executor = ThreadPoolExecutor(max_workers=2) 120 a = executor.submit(wait_on_b) 121 b = executor.submit(wait_on_a) 122 123And:: 124 125 def wait_on_future(): 126 f = executor.submit(pow, 5, 2) 127 # This will never complete because there is only one worker thread and 128 # it is executing this function. 129 print(f.result()) 130 131 executor = ThreadPoolExecutor(max_workers=1) 132 executor.submit(wait_on_future) 133 134 135.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()) 136 137 An :class:`Executor` subclass that uses a pool of at most *max_workers* 138 threads to execute calls asynchronously. 139 140 *initializer* is an optional callable that is called at the start of 141 each worker thread; *initargs* is a tuple of arguments passed to the 142 initializer. Should *initializer* raise an exception, all currently 143 pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`, 144 as well as any attempt to submit more jobs to the pool. 145 146 .. versionchanged:: 3.5 147 If *max_workers* is ``None`` or 148 not given, it will default to the number of processors on the machine, 149 multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often 150 used to overlap I/O instead of CPU work and the number of workers 151 should be higher than the number of workers 152 for :class:`ProcessPoolExecutor`. 153 154 .. versionadded:: 3.6 155 The *thread_name_prefix* argument was added to allow users to 156 control the :class:`threading.Thread` names for worker threads created by 157 the pool for easier debugging. 158 159 .. versionchanged:: 3.7 160 Added the *initializer* and *initargs* arguments. 161 162 .. versionchanged:: 3.8 163 Default value of *max_workers* is changed to ``min(32, os.cpu_count() + 4)``. 164 This default value preserves at least 5 workers for I/O bound tasks. 165 It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. 166 And it avoids using very large resources implicitly on many-core machines. 167 168 ThreadPoolExecutor now reuses idle worker threads before starting 169 *max_workers* worker threads too. 170 171 172.. _threadpoolexecutor-example: 173 174ThreadPoolExecutor Example 175~~~~~~~~~~~~~~~~~~~~~~~~~~ 176:: 177 178 import concurrent.futures 179 import urllib.request 180 181 URLS = ['http://www.foxnews.com/', 182 'http://www.cnn.com/', 183 'http://europe.wsj.com/', 184 'http://www.bbc.co.uk/', 185 'http://some-made-up-domain.com/'] 186 187 # Retrieve a single page and report the URL and contents 188 def load_url(url, timeout): 189 with urllib.request.urlopen(url, timeout=timeout) as conn: 190 return conn.read() 191 192 # We can use a with statement to ensure threads are cleaned up promptly 193 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 194 # Start the load operations and mark each future with its URL 195 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} 196 for future in concurrent.futures.as_completed(future_to_url): 197 url = future_to_url[future] 198 try: 199 data = future.result() 200 except Exception as exc: 201 print('%r generated an exception: %s' % (url, exc)) 202 else: 203 print('%r page is %d bytes' % (url, len(data))) 204 205 206ProcessPoolExecutor 207------------------- 208 209The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that 210uses a pool of processes to execute calls asynchronously. 211:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which 212allows it to side-step the :term:`Global Interpreter Lock 213<global interpreter lock>` but also means that 214only picklable objects can be executed and returned. 215 216The ``__main__`` module must be importable by worker subprocesses. This means 217that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. 218 219Calling :class:`Executor` or :class:`Future` methods from a callable submitted 220to a :class:`ProcessPoolExecutor` will result in deadlock. 221 222.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) 223 224 An :class:`Executor` subclass that executes calls asynchronously using a pool 225 of at most *max_workers* processes. If *max_workers* is ``None`` or not 226 given, it will default to the number of processors on the machine. 227 If *max_workers* is less than or equal to ``0``, then a :exc:`ValueError` 228 will be raised. 229 On Windows, *max_workers* must be less than or equal to ``61``. If it is not 230 then :exc:`ValueError` will be raised. If *max_workers* is ``None``, then 231 the default chosen will be at most ``61``, even if more processors are 232 available. 233 *mp_context* can be a multiprocessing context or None. It will be used to 234 launch the workers. If *mp_context* is ``None`` or not given, the default 235 multiprocessing context is used. 236 237 *initializer* is an optional callable that is called at the start of 238 each worker process; *initargs* is a tuple of arguments passed to the 239 initializer. Should *initializer* raise an exception, all currently 240 pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, 241 as well as any attempt to submit more jobs to the pool. 242 243 .. versionchanged:: 3.3 244 When one of the worker processes terminates abruptly, a 245 :exc:`BrokenProcessPool` error is now raised. Previously, behaviour 246 was undefined but operations on the executor or its futures would often 247 freeze or deadlock. 248 249 .. versionchanged:: 3.7 250 The *mp_context* argument was added to allow users to control the 251 start_method for worker processes created by the pool. 252 253 Added the *initializer* and *initargs* arguments. 254 255 256.. _processpoolexecutor-example: 257 258ProcessPoolExecutor Example 259~~~~~~~~~~~~~~~~~~~~~~~~~~~ 260:: 261 262 import concurrent.futures 263 import math 264 265 PRIMES = [ 266 112272535095293, 267 112582705942171, 268 112272535095293, 269 115280095190773, 270 115797848077099, 271 1099726899285419] 272 273 def is_prime(n): 274 if n < 2: 275 return False 276 if n == 2: 277 return True 278 if n % 2 == 0: 279 return False 280 281 sqrt_n = int(math.floor(math.sqrt(n))) 282 for i in range(3, sqrt_n + 1, 2): 283 if n % i == 0: 284 return False 285 return True 286 287 def main(): 288 with concurrent.futures.ProcessPoolExecutor() as executor: 289 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 290 print('%d is prime: %s' % (number, prime)) 291 292 if __name__ == '__main__': 293 main() 294 295 296Future Objects 297-------------- 298 299The :class:`Future` class encapsulates the asynchronous execution of a callable. 300:class:`Future` instances are created by :meth:`Executor.submit`. 301 302.. class:: Future 303 304 Encapsulates the asynchronous execution of a callable. :class:`Future` 305 instances are created by :meth:`Executor.submit` and should not be created 306 directly except for testing. 307 308 .. method:: cancel() 309 310 Attempt to cancel the call. If the call is currently being executed or 311 finished running and cannot be cancelled then the method will return 312 ``False``, otherwise the call will be cancelled and the method will 313 return ``True``. 314 315 .. method:: cancelled() 316 317 Return ``True`` if the call was successfully cancelled. 318 319 .. method:: running() 320 321 Return ``True`` if the call is currently being executed and cannot be 322 cancelled. 323 324 .. method:: done() 325 326 Return ``True`` if the call was successfully cancelled or finished 327 running. 328 329 .. method:: result(timeout=None) 330 331 Return the value returned by the call. If the call hasn't yet completed 332 then this method will wait up to *timeout* seconds. If the call hasn't 333 completed in *timeout* seconds, then a 334 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 335 an int or float. If *timeout* is not specified or ``None``, there is no 336 limit to the wait time. 337 338 If the future is cancelled before completing then :exc:`.CancelledError` 339 will be raised. 340 341 If the call raised, this method will raise the same exception. 342 343 .. method:: exception(timeout=None) 344 345 Return the exception raised by the call. If the call hasn't yet 346 completed then this method will wait up to *timeout* seconds. If the 347 call hasn't completed in *timeout* seconds, then a 348 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 349 an int or float. If *timeout* is not specified or ``None``, there is no 350 limit to the wait time. 351 352 If the future is cancelled before completing then :exc:`.CancelledError` 353 will be raised. 354 355 If the call completed without raising, ``None`` is returned. 356 357 .. method:: add_done_callback(fn) 358 359 Attaches the callable *fn* to the future. *fn* will be called, with the 360 future as its only argument, when the future is cancelled or finishes 361 running. 362 363 Added callables are called in the order that they were added and are 364 always called in a thread belonging to the process that added them. If 365 the callable raises an :exc:`Exception` subclass, it will be logged and 366 ignored. If the callable raises a :exc:`BaseException` subclass, the 367 behavior is undefined. 368 369 If the future has already completed or been cancelled, *fn* will be 370 called immediately. 371 372 The following :class:`Future` methods are meant for use in unit tests and 373 :class:`Executor` implementations. 374 375 .. method:: set_running_or_notify_cancel() 376 377 This method should only be called by :class:`Executor` implementations 378 before executing the work associated with the :class:`Future` and by unit 379 tests. 380 381 If the method returns ``False`` then the :class:`Future` was cancelled, 382 i.e. :meth:`Future.cancel` was called and returned `True`. Any threads 383 waiting on the :class:`Future` completing (i.e. through 384 :func:`as_completed` or :func:`wait`) will be woken up. 385 386 If the method returns ``True`` then the :class:`Future` was not cancelled 387 and has been put in the running state, i.e. calls to 388 :meth:`Future.running` will return `True`. 389 390 This method can only be called once and cannot be called after 391 :meth:`Future.set_result` or :meth:`Future.set_exception` have been 392 called. 393 394 .. method:: set_result(result) 395 396 Sets the result of the work associated with the :class:`Future` to 397 *result*. 398 399 This method should only be used by :class:`Executor` implementations and 400 unit tests. 401 402 .. versionchanged:: 3.8 403 This method raises 404 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 405 already done. 406 407 .. method:: set_exception(exception) 408 409 Sets the result of the work associated with the :class:`Future` to the 410 :class:`Exception` *exception*. 411 412 This method should only be used by :class:`Executor` implementations and 413 unit tests. 414 415 .. versionchanged:: 3.8 416 This method raises 417 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 418 already done. 419 420Module Functions 421---------------- 422 423.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) 424 425 Wait for the :class:`Future` instances (possibly created by different 426 :class:`Executor` instances) given by *fs* to complete. Returns a named 427 2-tuple of sets. The first set, named ``done``, contains the futures that 428 completed (finished or cancelled futures) before the wait completed. The 429 second set, named ``not_done``, contains the futures that did not complete 430 (pending or running futures). 431 432 *timeout* can be used to control the maximum number of seconds to wait before 433 returning. *timeout* can be an int or float. If *timeout* is not specified 434 or ``None``, there is no limit to the wait time. 435 436 *return_when* indicates when this function should return. It must be one of 437 the following constants: 438 439 .. tabularcolumns:: |l|L| 440 441 +-----------------------------+----------------------------------------+ 442 | Constant | Description | 443 +=============================+========================================+ 444 | :const:`FIRST_COMPLETED` | The function will return when any | 445 | | future finishes or is cancelled. | 446 +-----------------------------+----------------------------------------+ 447 | :const:`FIRST_EXCEPTION` | The function will return when any | 448 | | future finishes by raising an | 449 | | exception. If no future raises an | 450 | | exception then it is equivalent to | 451 | | :const:`ALL_COMPLETED`. | 452 +-----------------------------+----------------------------------------+ 453 | :const:`ALL_COMPLETED` | The function will return when all | 454 | | futures finish or are cancelled. | 455 +-----------------------------+----------------------------------------+ 456 457.. function:: as_completed(fs, timeout=None) 458 459 Returns an iterator over the :class:`Future` instances (possibly created by 460 different :class:`Executor` instances) given by *fs* that yields futures as 461 they complete (finished or cancelled futures). Any futures given by *fs* that 462 are duplicated will be returned once. Any futures that completed before 463 :func:`as_completed` is called will be yielded first. The returned iterator 464 raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__` 465 is called and the result isn't available after *timeout* seconds from the 466 original call to :func:`as_completed`. *timeout* can be an int or float. If 467 *timeout* is not specified or ``None``, there is no limit to the wait time. 468 469 470.. seealso:: 471 472 :pep:`3148` -- futures - execute computations asynchronously 473 The proposal which described this feature for inclusion in the Python 474 standard library. 475 476 477Exception classes 478----------------- 479 480.. currentmodule:: concurrent.futures 481 482.. exception:: CancelledError 483 484 Raised when a future is cancelled. 485 486.. exception:: TimeoutError 487 488 Raised when a future operation exceeds the given timeout. 489 490.. exception:: BrokenExecutor 491 492 Derived from :exc:`RuntimeError`, this exception class is raised 493 when an executor is broken for some reason, and cannot be used 494 to submit or execute new tasks. 495 496 .. versionadded:: 3.7 497 498.. exception:: InvalidStateError 499 500 Raised when an operation is performed on a future that is not allowed 501 in the current state. 502 503 .. versionadded:: 3.8 504 505.. currentmodule:: concurrent.futures.thread 506 507.. exception:: BrokenThreadPool 508 509 Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception 510 class is raised when one of the workers of a :class:`ThreadPoolExecutor` 511 has failed initializing. 512 513 .. versionadded:: 3.7 514 515.. currentmodule:: concurrent.futures.process 516 517.. exception:: BrokenProcessPool 518 519 Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly 520 :exc:`RuntimeError`), this exception class is raised when one of the 521 workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean 522 fashion (for example, if it was killed from the outside). 523 524 .. versionadded:: 3.3 525