1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4__author__ = 'Brian Quinlan (brian@sweetapp.com)' 5 6import collections 7import logging 8import threading 9import time 10import types 11 12FIRST_COMPLETED = 'FIRST_COMPLETED' 13FIRST_EXCEPTION = 'FIRST_EXCEPTION' 14ALL_COMPLETED = 'ALL_COMPLETED' 15_AS_COMPLETED = '_AS_COMPLETED' 16 17# Possible future states (for internal use by the futures package). 18PENDING = 'PENDING' 19RUNNING = 'RUNNING' 20# The future was cancelled by the user... 21CANCELLED = 'CANCELLED' 22# ...and _Waiter.add_cancelled() was called by a worker. 23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 24FINISHED = 'FINISHED' 25 26_FUTURE_STATES = [ 27 PENDING, 28 RUNNING, 29 CANCELLED, 30 CANCELLED_AND_NOTIFIED, 31 FINISHED 32] 33 34_STATE_TO_DESCRIPTION_MAP = { 35 PENDING: "pending", 36 RUNNING: "running", 37 CANCELLED: "cancelled", 38 CANCELLED_AND_NOTIFIED: "cancelled", 39 FINISHED: "finished" 40} 41 42# Logger for internal use by the futures package. 43LOGGER = logging.getLogger("concurrent.futures") 44 45class Error(Exception): 46 """Base class for all future-related exceptions.""" 47 pass 48 49class CancelledError(Error): 50 """The Future was cancelled.""" 51 pass 52 53class TimeoutError(Error): 54 """The operation exceeded the given deadline.""" 55 pass 56 57class InvalidStateError(Error): 58 """The operation is not allowed in this state.""" 59 pass 60 61class _Waiter(object): 62 """Provides the event that wait() and as_completed() block on.""" 63 def __init__(self): 64 self.event = threading.Event() 65 self.finished_futures = [] 66 67 def add_result(self, future): 68 self.finished_futures.append(future) 69 70 def add_exception(self, future): 71 self.finished_futures.append(future) 72 73 def add_cancelled(self, future): 74 self.finished_futures.append(future) 75 76class _AsCompletedWaiter(_Waiter): 77 """Used by as_completed().""" 78 79 def __init__(self): 80 super(_AsCompletedWaiter, self).__init__() 81 self.lock = threading.Lock() 82 83 def add_result(self, future): 84 with self.lock: 85 super(_AsCompletedWaiter, self).add_result(future) 86 self.event.set() 87 88 def add_exception(self, future): 89 with self.lock: 90 super(_AsCompletedWaiter, self).add_exception(future) 91 self.event.set() 92 93 def add_cancelled(self, future): 94 with self.lock: 95 super(_AsCompletedWaiter, self).add_cancelled(future) 96 self.event.set() 97 98class _FirstCompletedWaiter(_Waiter): 99 """Used by wait(return_when=FIRST_COMPLETED).""" 100 101 def add_result(self, future): 102 super().add_result(future) 103 self.event.set() 104 105 def add_exception(self, future): 106 super().add_exception(future) 107 self.event.set() 108 109 def add_cancelled(self, future): 110 super().add_cancelled(future) 111 self.event.set() 112 113class _AllCompletedWaiter(_Waiter): 114 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 115 116 def __init__(self, num_pending_calls, stop_on_exception): 117 self.num_pending_calls = num_pending_calls 118 self.stop_on_exception = stop_on_exception 119 self.lock = threading.Lock() 120 super().__init__() 121 122 def _decrement_pending_calls(self): 123 with self.lock: 124 self.num_pending_calls -= 1 125 if not self.num_pending_calls: 126 self.event.set() 127 128 def add_result(self, future): 129 super().add_result(future) 130 self._decrement_pending_calls() 131 132 def add_exception(self, future): 133 super().add_exception(future) 134 if self.stop_on_exception: 135 self.event.set() 136 else: 137 self._decrement_pending_calls() 138 139 def add_cancelled(self, future): 140 super().add_cancelled(future) 141 self._decrement_pending_calls() 142 143class _AcquireFutures(object): 144 """A context manager that does an ordered acquire of Future conditions.""" 145 146 def __init__(self, futures): 147 self.futures = sorted(futures, key=id) 148 149 def __enter__(self): 150 for future in self.futures: 151 future._condition.acquire() 152 153 def __exit__(self, *args): 154 for future in self.futures: 155 future._condition.release() 156 157def _create_and_install_waiters(fs, return_when): 158 if return_when == _AS_COMPLETED: 159 waiter = _AsCompletedWaiter() 160 elif return_when == FIRST_COMPLETED: 161 waiter = _FirstCompletedWaiter() 162 else: 163 pending_count = sum( 164 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 165 166 if return_when == FIRST_EXCEPTION: 167 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 168 elif return_when == ALL_COMPLETED: 169 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 170 else: 171 raise ValueError("Invalid return condition: %r" % return_when) 172 173 for f in fs: 174 f._waiters.append(waiter) 175 176 return waiter 177 178 179def _yield_finished_futures(fs, waiter, ref_collect): 180 """ 181 Iterate on the list *fs*, yielding finished futures one by one in 182 reverse order. 183 Before yielding a future, *waiter* is removed from its waiters 184 and the future is removed from each set in the collection of sets 185 *ref_collect*. 186 187 The aim of this function is to avoid keeping stale references after 188 the future is yielded and before the iterator resumes. 189 """ 190 while fs: 191 f = fs[-1] 192 for futures_set in ref_collect: 193 futures_set.remove(f) 194 with f._condition: 195 f._waiters.remove(waiter) 196 del f 197 # Careful not to keep a reference to the popped value 198 yield fs.pop() 199 200 201def as_completed(fs, timeout=None): 202 """An iterator over the given futures that yields each as it completes. 203 204 Args: 205 fs: The sequence of Futures (possibly created by different Executors) to 206 iterate over. 207 timeout: The maximum number of seconds to wait. If None, then there 208 is no limit on the wait time. 209 210 Returns: 211 An iterator that yields the given Futures as they complete (finished or 212 cancelled). If any given Futures are duplicated, they will be returned 213 once. 214 215 Raises: 216 TimeoutError: If the entire result iterator could not be generated 217 before the given timeout. 218 """ 219 if timeout is not None: 220 end_time = timeout + time.monotonic() 221 222 fs = set(fs) 223 total_futures = len(fs) 224 with _AcquireFutures(fs): 225 finished = set( 226 f for f in fs 227 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 228 pending = fs - finished 229 waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 230 finished = list(finished) 231 try: 232 yield from _yield_finished_futures(finished, waiter, 233 ref_collect=(fs,)) 234 235 while pending: 236 if timeout is None: 237 wait_timeout = None 238 else: 239 wait_timeout = end_time - time.monotonic() 240 if wait_timeout < 0: 241 raise TimeoutError( 242 '%d (of %d) futures unfinished' % ( 243 len(pending), total_futures)) 244 245 waiter.event.wait(wait_timeout) 246 247 with waiter.lock: 248 finished = waiter.finished_futures 249 waiter.finished_futures = [] 250 waiter.event.clear() 251 252 # reverse to keep finishing order 253 finished.reverse() 254 yield from _yield_finished_futures(finished, waiter, 255 ref_collect=(fs, pending)) 256 257 finally: 258 # Remove waiter from unfinished futures 259 for f in fs: 260 with f._condition: 261 f._waiters.remove(waiter) 262 263DoneAndNotDoneFutures = collections.namedtuple( 264 'DoneAndNotDoneFutures', 'done not_done') 265def wait(fs, timeout=None, return_when=ALL_COMPLETED): 266 """Wait for the futures in the given sequence to complete. 267 268 Args: 269 fs: The sequence of Futures (possibly created by different Executors) to 270 wait upon. 271 timeout: The maximum number of seconds to wait. If None, then there 272 is no limit on the wait time. 273 return_when: Indicates when this function should return. The options 274 are: 275 276 FIRST_COMPLETED - Return when any future finishes or is 277 cancelled. 278 FIRST_EXCEPTION - Return when any future finishes by raising an 279 exception. If no future raises an exception 280 then it is equivalent to ALL_COMPLETED. 281 ALL_COMPLETED - Return when all futures finish or are cancelled. 282 283 Returns: 284 A named 2-tuple of sets. The first set, named 'done', contains the 285 futures that completed (is finished or cancelled) before the wait 286 completed. The second set, named 'not_done', contains uncompleted 287 futures. 288 """ 289 with _AcquireFutures(fs): 290 done = set(f for f in fs 291 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 292 not_done = set(fs) - done 293 294 if (return_when == FIRST_COMPLETED) and done: 295 return DoneAndNotDoneFutures(done, not_done) 296 elif (return_when == FIRST_EXCEPTION) and done: 297 if any(f for f in done 298 if not f.cancelled() and f.exception() is not None): 299 return DoneAndNotDoneFutures(done, not_done) 300 301 if len(done) == len(fs): 302 return DoneAndNotDoneFutures(done, not_done) 303 304 waiter = _create_and_install_waiters(fs, return_when) 305 306 waiter.event.wait(timeout) 307 for f in fs: 308 with f._condition: 309 f._waiters.remove(waiter) 310 311 done.update(waiter.finished_futures) 312 return DoneAndNotDoneFutures(done, set(fs) - done) 313 314class Future(object): 315 """Represents the result of an asynchronous computation.""" 316 317 def __init__(self): 318 """Initializes the future. Should not be called by clients.""" 319 self._condition = threading.Condition() 320 self._state = PENDING 321 self._result = None 322 self._exception = None 323 self._waiters = [] 324 self._done_callbacks = [] 325 326 def _invoke_callbacks(self): 327 for callback in self._done_callbacks: 328 try: 329 callback(self) 330 except Exception: 331 LOGGER.exception('exception calling callback for %r', self) 332 333 def __repr__(self): 334 with self._condition: 335 if self._state == FINISHED: 336 if self._exception: 337 return '<%s at %#x state=%s raised %s>' % ( 338 self.__class__.__name__, 339 id(self), 340 _STATE_TO_DESCRIPTION_MAP[self._state], 341 self._exception.__class__.__name__) 342 else: 343 return '<%s at %#x state=%s returned %s>' % ( 344 self.__class__.__name__, 345 id(self), 346 _STATE_TO_DESCRIPTION_MAP[self._state], 347 self._result.__class__.__name__) 348 return '<%s at %#x state=%s>' % ( 349 self.__class__.__name__, 350 id(self), 351 _STATE_TO_DESCRIPTION_MAP[self._state]) 352 353 def cancel(self): 354 """Cancel the future if possible. 355 356 Returns True if the future was cancelled, False otherwise. A future 357 cannot be cancelled if it is running or has already completed. 358 """ 359 with self._condition: 360 if self._state in [RUNNING, FINISHED]: 361 return False 362 363 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 364 return True 365 366 self._state = CANCELLED 367 self._condition.notify_all() 368 369 self._invoke_callbacks() 370 return True 371 372 def cancelled(self): 373 """Return True if the future was cancelled.""" 374 with self._condition: 375 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 376 377 def running(self): 378 """Return True if the future is currently executing.""" 379 with self._condition: 380 return self._state == RUNNING 381 382 def done(self): 383 """Return True of the future was cancelled or finished executing.""" 384 with self._condition: 385 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 386 387 def __get_result(self): 388 if self._exception: 389 try: 390 raise self._exception 391 finally: 392 # Break a reference cycle with the exception in self._exception 393 self = None 394 else: 395 return self._result 396 397 def add_done_callback(self, fn): 398 """Attaches a callable that will be called when the future finishes. 399 400 Args: 401 fn: A callable that will be called with this future as its only 402 argument when the future completes or is cancelled. The callable 403 will always be called by a thread in the same process in which 404 it was added. If the future has already completed or been 405 cancelled then the callable will be called immediately. These 406 callables are called in the order that they were added. 407 """ 408 with self._condition: 409 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 410 self._done_callbacks.append(fn) 411 return 412 try: 413 fn(self) 414 except Exception: 415 LOGGER.exception('exception calling callback for %r', self) 416 417 def result(self, timeout=None): 418 """Return the result of the call that the future represents. 419 420 Args: 421 timeout: The number of seconds to wait for the result if the future 422 isn't done. If None, then there is no limit on the wait time. 423 424 Returns: 425 The result of the call that the future represents. 426 427 Raises: 428 CancelledError: If the future was cancelled. 429 TimeoutError: If the future didn't finish executing before the given 430 timeout. 431 Exception: If the call raised then that exception will be raised. 432 """ 433 try: 434 with self._condition: 435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 436 raise CancelledError() 437 elif self._state == FINISHED: 438 return self.__get_result() 439 440 self._condition.wait(timeout) 441 442 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 443 raise CancelledError() 444 elif self._state == FINISHED: 445 return self.__get_result() 446 else: 447 raise TimeoutError() 448 finally: 449 # Break a reference cycle with the exception in self._exception 450 self = None 451 452 def exception(self, timeout=None): 453 """Return the exception raised by the call that the future represents. 454 455 Args: 456 timeout: The number of seconds to wait for the exception if the 457 future isn't done. If None, then there is no limit on the wait 458 time. 459 460 Returns: 461 The exception raised by the call that the future represents or None 462 if the call completed without raising. 463 464 Raises: 465 CancelledError: If the future was cancelled. 466 TimeoutError: If the future didn't finish executing before the given 467 timeout. 468 """ 469 470 with self._condition: 471 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 472 raise CancelledError() 473 elif self._state == FINISHED: 474 return self._exception 475 476 self._condition.wait(timeout) 477 478 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 479 raise CancelledError() 480 elif self._state == FINISHED: 481 return self._exception 482 else: 483 raise TimeoutError() 484 485 # The following methods should only be used by Executors and in tests. 486 def set_running_or_notify_cancel(self): 487 """Mark the future as running or process any cancel notifications. 488 489 Should only be used by Executor implementations and unit tests. 490 491 If the future has been cancelled (cancel() was called and returned 492 True) then any threads waiting on the future completing (though calls 493 to as_completed() or wait()) are notified and False is returned. 494 495 If the future was not cancelled then it is put in the running state 496 (future calls to running() will return True) and True is returned. 497 498 This method should be called by Executor implementations before 499 executing the work associated with this future. If this method returns 500 False then the work should not be executed. 501 502 Returns: 503 False if the Future was cancelled, True otherwise. 504 505 Raises: 506 RuntimeError: if this method was already called or if set_result() 507 or set_exception() was called. 508 """ 509 with self._condition: 510 if self._state == CANCELLED: 511 self._state = CANCELLED_AND_NOTIFIED 512 for waiter in self._waiters: 513 waiter.add_cancelled(self) 514 # self._condition.notify_all() is not necessary because 515 # self.cancel() triggers a notification. 516 return False 517 elif self._state == PENDING: 518 self._state = RUNNING 519 return True 520 else: 521 LOGGER.critical('Future %s in unexpected state: %s', 522 id(self), 523 self._state) 524 raise RuntimeError('Future in unexpected state') 525 526 def set_result(self, result): 527 """Sets the return value of work associated with the future. 528 529 Should only be used by Executor implementations and unit tests. 530 """ 531 with self._condition: 532 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 533 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 534 self._result = result 535 self._state = FINISHED 536 for waiter in self._waiters: 537 waiter.add_result(self) 538 self._condition.notify_all() 539 self._invoke_callbacks() 540 541 def set_exception(self, exception): 542 """Sets the result of the future as being the given exception. 543 544 Should only be used by Executor implementations and unit tests. 545 """ 546 with self._condition: 547 if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 548 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 549 self._exception = exception 550 self._state = FINISHED 551 for waiter in self._waiters: 552 waiter.add_exception(self) 553 self._condition.notify_all() 554 self._invoke_callbacks() 555 556 __class_getitem__ = classmethod(types.GenericAlias) 557 558class Executor(object): 559 """This is an abstract base class for concrete asynchronous executors.""" 560 561 def submit(self, fn, /, *args, **kwargs): 562 """Submits a callable to be executed with the given arguments. 563 564 Schedules the callable to be executed as fn(*args, **kwargs) and returns 565 a Future instance representing the execution of the callable. 566 567 Returns: 568 A Future representing the given call. 569 """ 570 raise NotImplementedError() 571 572 def map(self, fn, *iterables, timeout=None, chunksize=1): 573 """Returns an iterator equivalent to map(fn, iter). 574 575 Args: 576 fn: A callable that will take as many arguments as there are 577 passed iterables. 578 timeout: The maximum number of seconds to wait. If None, then there 579 is no limit on the wait time. 580 chunksize: The size of the chunks the iterable will be broken into 581 before being passed to a child process. This argument is only 582 used by ProcessPoolExecutor; it is ignored by 583 ThreadPoolExecutor. 584 585 Returns: 586 An iterator equivalent to: map(func, *iterables) but the calls may 587 be evaluated out-of-order. 588 589 Raises: 590 TimeoutError: If the entire result iterator could not be generated 591 before the given timeout. 592 Exception: If fn(*args) raises for any values. 593 """ 594 if timeout is not None: 595 end_time = timeout + time.monotonic() 596 597 fs = [self.submit(fn, *args) for args in zip(*iterables)] 598 599 # Yield must be hidden in closure so that the futures are submitted 600 # before the first iterator value is required. 601 def result_iterator(): 602 try: 603 # reverse to keep finishing order 604 fs.reverse() 605 while fs: 606 # Careful not to keep a reference to the popped future 607 if timeout is None: 608 yield fs.pop().result() 609 else: 610 yield fs.pop().result(end_time - time.monotonic()) 611 finally: 612 for future in fs: 613 future.cancel() 614 return result_iterator() 615 616 def shutdown(self, wait=True, *, cancel_futures=False): 617 """Clean-up the resources associated with the Executor. 618 619 It is safe to call this method several times. Otherwise, no other 620 methods can be called after this one. 621 622 Args: 623 wait: If True then shutdown will not return until all running 624 futures have finished executing and the resources used by the 625 executor have been reclaimed. 626 cancel_futures: If True then shutdown will cancel all pending 627 futures. Futures that are completed or running will not be 628 cancelled. 629 """ 630 pass 631 632 def __enter__(self): 633 return self 634 635 def __exit__(self, exc_type, exc_val, exc_tb): 636 self.shutdown(wait=True) 637 return False 638 639 640class BrokenExecutor(RuntimeError): 641 """ 642 Raised when a executor has become non-functional after a severe failure. 643 """ 644