1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import logging
8import threading
9import time
10import types
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
15_AS_COMPLETED = '_AS_COMPLETED'
16
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27    PENDING,
28    RUNNING,
29    CANCELLED,
30    CANCELLED_AND_NOTIFIED,
31    FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35    PENDING: "pending",
36    RUNNING: "running",
37    CANCELLED: "cancelled",
38    CANCELLED_AND_NOTIFIED: "cancelled",
39    FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
44
45class Error(Exception):
46    """Base class for all future-related exceptions."""
47    pass
48
49class CancelledError(Error):
50    """The Future was cancelled."""
51    pass
52
53class TimeoutError(Error):
54    """The operation exceeded the given deadline."""
55    pass
56
57class InvalidStateError(Error):
58    """The operation is not allowed in this state."""
59    pass
60
61class _Waiter(object):
62    """Provides the event that wait() and as_completed() block on."""
63    def __init__(self):
64        self.event = threading.Event()
65        self.finished_futures = []
66
67    def add_result(self, future):
68        self.finished_futures.append(future)
69
70    def add_exception(self, future):
71        self.finished_futures.append(future)
72
73    def add_cancelled(self, future):
74        self.finished_futures.append(future)
75
76class _AsCompletedWaiter(_Waiter):
77    """Used by as_completed()."""
78
79    def __init__(self):
80        super(_AsCompletedWaiter, self).__init__()
81        self.lock = threading.Lock()
82
83    def add_result(self, future):
84        with self.lock:
85            super(_AsCompletedWaiter, self).add_result(future)
86            self.event.set()
87
88    def add_exception(self, future):
89        with self.lock:
90            super(_AsCompletedWaiter, self).add_exception(future)
91            self.event.set()
92
93    def add_cancelled(self, future):
94        with self.lock:
95            super(_AsCompletedWaiter, self).add_cancelled(future)
96            self.event.set()
97
98class _FirstCompletedWaiter(_Waiter):
99    """Used by wait(return_when=FIRST_COMPLETED)."""
100
101    def add_result(self, future):
102        super().add_result(future)
103        self.event.set()
104
105    def add_exception(self, future):
106        super().add_exception(future)
107        self.event.set()
108
109    def add_cancelled(self, future):
110        super().add_cancelled(future)
111        self.event.set()
112
113class _AllCompletedWaiter(_Waiter):
114    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
115
116    def __init__(self, num_pending_calls, stop_on_exception):
117        self.num_pending_calls = num_pending_calls
118        self.stop_on_exception = stop_on_exception
119        self.lock = threading.Lock()
120        super().__init__()
121
122    def _decrement_pending_calls(self):
123        with self.lock:
124            self.num_pending_calls -= 1
125            if not self.num_pending_calls:
126                self.event.set()
127
128    def add_result(self, future):
129        super().add_result(future)
130        self._decrement_pending_calls()
131
132    def add_exception(self, future):
133        super().add_exception(future)
134        if self.stop_on_exception:
135            self.event.set()
136        else:
137            self._decrement_pending_calls()
138
139    def add_cancelled(self, future):
140        super().add_cancelled(future)
141        self._decrement_pending_calls()
142
143class _AcquireFutures(object):
144    """A context manager that does an ordered acquire of Future conditions."""
145
146    def __init__(self, futures):
147        self.futures = sorted(futures, key=id)
148
149    def __enter__(self):
150        for future in self.futures:
151            future._condition.acquire()
152
153    def __exit__(self, *args):
154        for future in self.futures:
155            future._condition.release()
156
157def _create_and_install_waiters(fs, return_when):
158    if return_when == _AS_COMPLETED:
159        waiter = _AsCompletedWaiter()
160    elif return_when == FIRST_COMPLETED:
161        waiter = _FirstCompletedWaiter()
162    else:
163        pending_count = sum(
164                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
165
166        if return_when == FIRST_EXCEPTION:
167            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
168        elif return_when == ALL_COMPLETED:
169            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
170        else:
171            raise ValueError("Invalid return condition: %r" % return_when)
172
173    for f in fs:
174        f._waiters.append(waiter)
175
176    return waiter
177
178
179def _yield_finished_futures(fs, waiter, ref_collect):
180    """
181    Iterate on the list *fs*, yielding finished futures one by one in
182    reverse order.
183    Before yielding a future, *waiter* is removed from its waiters
184    and the future is removed from each set in the collection of sets
185    *ref_collect*.
186
187    The aim of this function is to avoid keeping stale references after
188    the future is yielded and before the iterator resumes.
189    """
190    while fs:
191        f = fs[-1]
192        for futures_set in ref_collect:
193            futures_set.remove(f)
194        with f._condition:
195            f._waiters.remove(waiter)
196        del f
197        # Careful not to keep a reference to the popped value
198        yield fs.pop()
199
200
201def as_completed(fs, timeout=None):
202    """An iterator over the given futures that yields each as it completes.
203
204    Args:
205        fs: The sequence of Futures (possibly created by different Executors) to
206            iterate over.
207        timeout: The maximum number of seconds to wait. If None, then there
208            is no limit on the wait time.
209
210    Returns:
211        An iterator that yields the given Futures as they complete (finished or
212        cancelled). If any given Futures are duplicated, they will be returned
213        once.
214
215    Raises:
216        TimeoutError: If the entire result iterator could not be generated
217            before the given timeout.
218    """
219    if timeout is not None:
220        end_time = timeout + time.monotonic()
221
222    fs = set(fs)
223    total_futures = len(fs)
224    with _AcquireFutures(fs):
225        finished = set(
226                f for f in fs
227                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
228        pending = fs - finished
229        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
230    finished = list(finished)
231    try:
232        yield from _yield_finished_futures(finished, waiter,
233                                           ref_collect=(fs,))
234
235        while pending:
236            if timeout is None:
237                wait_timeout = None
238            else:
239                wait_timeout = end_time - time.monotonic()
240                if wait_timeout < 0:
241                    raise TimeoutError(
242                            '%d (of %d) futures unfinished' % (
243                            len(pending), total_futures))
244
245            waiter.event.wait(wait_timeout)
246
247            with waiter.lock:
248                finished = waiter.finished_futures
249                waiter.finished_futures = []
250                waiter.event.clear()
251
252            # reverse to keep finishing order
253            finished.reverse()
254            yield from _yield_finished_futures(finished, waiter,
255                                               ref_collect=(fs, pending))
256
257    finally:
258        # Remove waiter from unfinished futures
259        for f in fs:
260            with f._condition:
261                f._waiters.remove(waiter)
262
263DoneAndNotDoneFutures = collections.namedtuple(
264        'DoneAndNotDoneFutures', 'done not_done')
265def wait(fs, timeout=None, return_when=ALL_COMPLETED):
266    """Wait for the futures in the given sequence to complete.
267
268    Args:
269        fs: The sequence of Futures (possibly created by different Executors) to
270            wait upon.
271        timeout: The maximum number of seconds to wait. If None, then there
272            is no limit on the wait time.
273        return_when: Indicates when this function should return. The options
274            are:
275
276            FIRST_COMPLETED - Return when any future finishes or is
277                              cancelled.
278            FIRST_EXCEPTION - Return when any future finishes by raising an
279                              exception. If no future raises an exception
280                              then it is equivalent to ALL_COMPLETED.
281            ALL_COMPLETED -   Return when all futures finish or are cancelled.
282
283    Returns:
284        A named 2-tuple of sets. The first set, named 'done', contains the
285        futures that completed (is finished or cancelled) before the wait
286        completed. The second set, named 'not_done', contains uncompleted
287        futures.
288    """
289    with _AcquireFutures(fs):
290        done = set(f for f in fs
291                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
292        not_done = set(fs) - done
293
294        if (return_when == FIRST_COMPLETED) and done:
295            return DoneAndNotDoneFutures(done, not_done)
296        elif (return_when == FIRST_EXCEPTION) and done:
297            if any(f for f in done
298                   if not f.cancelled() and f.exception() is not None):
299                return DoneAndNotDoneFutures(done, not_done)
300
301        if len(done) == len(fs):
302            return DoneAndNotDoneFutures(done, not_done)
303
304        waiter = _create_and_install_waiters(fs, return_when)
305
306    waiter.event.wait(timeout)
307    for f in fs:
308        with f._condition:
309            f._waiters.remove(waiter)
310
311    done.update(waiter.finished_futures)
312    return DoneAndNotDoneFutures(done, set(fs) - done)
313
314class Future(object):
315    """Represents the result of an asynchronous computation."""
316
317    def __init__(self):
318        """Initializes the future. Should not be called by clients."""
319        self._condition = threading.Condition()
320        self._state = PENDING
321        self._result = None
322        self._exception = None
323        self._waiters = []
324        self._done_callbacks = []
325
326    def _invoke_callbacks(self):
327        for callback in self._done_callbacks:
328            try:
329                callback(self)
330            except Exception:
331                LOGGER.exception('exception calling callback for %r', self)
332
333    def __repr__(self):
334        with self._condition:
335            if self._state == FINISHED:
336                if self._exception:
337                    return '<%s at %#x state=%s raised %s>' % (
338                        self.__class__.__name__,
339                        id(self),
340                        _STATE_TO_DESCRIPTION_MAP[self._state],
341                        self._exception.__class__.__name__)
342                else:
343                    return '<%s at %#x state=%s returned %s>' % (
344                        self.__class__.__name__,
345                        id(self),
346                        _STATE_TO_DESCRIPTION_MAP[self._state],
347                        self._result.__class__.__name__)
348            return '<%s at %#x state=%s>' % (
349                    self.__class__.__name__,
350                    id(self),
351                   _STATE_TO_DESCRIPTION_MAP[self._state])
352
353    def cancel(self):
354        """Cancel the future if possible.
355
356        Returns True if the future was cancelled, False otherwise. A future
357        cannot be cancelled if it is running or has already completed.
358        """
359        with self._condition:
360            if self._state in [RUNNING, FINISHED]:
361                return False
362
363            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
364                return True
365
366            self._state = CANCELLED
367            self._condition.notify_all()
368
369        self._invoke_callbacks()
370        return True
371
372    def cancelled(self):
373        """Return True if the future was cancelled."""
374        with self._condition:
375            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
376
377    def running(self):
378        """Return True if the future is currently executing."""
379        with self._condition:
380            return self._state == RUNNING
381
382    def done(self):
383        """Return True of the future was cancelled or finished executing."""
384        with self._condition:
385            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
386
387    def __get_result(self):
388        if self._exception:
389            try:
390                raise self._exception
391            finally:
392                # Break a reference cycle with the exception in self._exception
393                self = None
394        else:
395            return self._result
396
397    def add_done_callback(self, fn):
398        """Attaches a callable that will be called when the future finishes.
399
400        Args:
401            fn: A callable that will be called with this future as its only
402                argument when the future completes or is cancelled. The callable
403                will always be called by a thread in the same process in which
404                it was added. If the future has already completed or been
405                cancelled then the callable will be called immediately. These
406                callables are called in the order that they were added.
407        """
408        with self._condition:
409            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
410                self._done_callbacks.append(fn)
411                return
412        try:
413            fn(self)
414        except Exception:
415            LOGGER.exception('exception calling callback for %r', self)
416
417    def result(self, timeout=None):
418        """Return the result of the call that the future represents.
419
420        Args:
421            timeout: The number of seconds to wait for the result if the future
422                isn't done. If None, then there is no limit on the wait time.
423
424        Returns:
425            The result of the call that the future represents.
426
427        Raises:
428            CancelledError: If the future was cancelled.
429            TimeoutError: If the future didn't finish executing before the given
430                timeout.
431            Exception: If the call raised then that exception will be raised.
432        """
433        try:
434            with self._condition:
435                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436                    raise CancelledError()
437                elif self._state == FINISHED:
438                    return self.__get_result()
439
440                self._condition.wait(timeout)
441
442                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
443                    raise CancelledError()
444                elif self._state == FINISHED:
445                    return self.__get_result()
446                else:
447                    raise TimeoutError()
448        finally:
449            # Break a reference cycle with the exception in self._exception
450            self = None
451
452    def exception(self, timeout=None):
453        """Return the exception raised by the call that the future represents.
454
455        Args:
456            timeout: The number of seconds to wait for the exception if the
457                future isn't done. If None, then there is no limit on the wait
458                time.
459
460        Returns:
461            The exception raised by the call that the future represents or None
462            if the call completed without raising.
463
464        Raises:
465            CancelledError: If the future was cancelled.
466            TimeoutError: If the future didn't finish executing before the given
467                timeout.
468        """
469
470        with self._condition:
471            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
472                raise CancelledError()
473            elif self._state == FINISHED:
474                return self._exception
475
476            self._condition.wait(timeout)
477
478            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
479                raise CancelledError()
480            elif self._state == FINISHED:
481                return self._exception
482            else:
483                raise TimeoutError()
484
485    # The following methods should only be used by Executors and in tests.
486    def set_running_or_notify_cancel(self):
487        """Mark the future as running or process any cancel notifications.
488
489        Should only be used by Executor implementations and unit tests.
490
491        If the future has been cancelled (cancel() was called and returned
492        True) then any threads waiting on the future completing (though calls
493        to as_completed() or wait()) are notified and False is returned.
494
495        If the future was not cancelled then it is put in the running state
496        (future calls to running() will return True) and True is returned.
497
498        This method should be called by Executor implementations before
499        executing the work associated with this future. If this method returns
500        False then the work should not be executed.
501
502        Returns:
503            False if the Future was cancelled, True otherwise.
504
505        Raises:
506            RuntimeError: if this method was already called or if set_result()
507                or set_exception() was called.
508        """
509        with self._condition:
510            if self._state == CANCELLED:
511                self._state = CANCELLED_AND_NOTIFIED
512                for waiter in self._waiters:
513                    waiter.add_cancelled(self)
514                # self._condition.notify_all() is not necessary because
515                # self.cancel() triggers a notification.
516                return False
517            elif self._state == PENDING:
518                self._state = RUNNING
519                return True
520            else:
521                LOGGER.critical('Future %s in unexpected state: %s',
522                                id(self),
523                                self._state)
524                raise RuntimeError('Future in unexpected state')
525
526    def set_result(self, result):
527        """Sets the return value of work associated with the future.
528
529        Should only be used by Executor implementations and unit tests.
530        """
531        with self._condition:
532            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
533                raise InvalidStateError('{}: {!r}'.format(self._state, self))
534            self._result = result
535            self._state = FINISHED
536            for waiter in self._waiters:
537                waiter.add_result(self)
538            self._condition.notify_all()
539        self._invoke_callbacks()
540
541    def set_exception(self, exception):
542        """Sets the result of the future as being the given exception.
543
544        Should only be used by Executor implementations and unit tests.
545        """
546        with self._condition:
547            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
548                raise InvalidStateError('{}: {!r}'.format(self._state, self))
549            self._exception = exception
550            self._state = FINISHED
551            for waiter in self._waiters:
552                waiter.add_exception(self)
553            self._condition.notify_all()
554        self._invoke_callbacks()
555
556    __class_getitem__ = classmethod(types.GenericAlias)
557
558class Executor(object):
559    """This is an abstract base class for concrete asynchronous executors."""
560
561    def submit(self, fn, /, *args, **kwargs):
562        """Submits a callable to be executed with the given arguments.
563
564        Schedules the callable to be executed as fn(*args, **kwargs) and returns
565        a Future instance representing the execution of the callable.
566
567        Returns:
568            A Future representing the given call.
569        """
570        raise NotImplementedError()
571
572    def map(self, fn, *iterables, timeout=None, chunksize=1):
573        """Returns an iterator equivalent to map(fn, iter).
574
575        Args:
576            fn: A callable that will take as many arguments as there are
577                passed iterables.
578            timeout: The maximum number of seconds to wait. If None, then there
579                is no limit on the wait time.
580            chunksize: The size of the chunks the iterable will be broken into
581                before being passed to a child process. This argument is only
582                used by ProcessPoolExecutor; it is ignored by
583                ThreadPoolExecutor.
584
585        Returns:
586            An iterator equivalent to: map(func, *iterables) but the calls may
587            be evaluated out-of-order.
588
589        Raises:
590            TimeoutError: If the entire result iterator could not be generated
591                before the given timeout.
592            Exception: If fn(*args) raises for any values.
593        """
594        if timeout is not None:
595            end_time = timeout + time.monotonic()
596
597        fs = [self.submit(fn, *args) for args in zip(*iterables)]
598
599        # Yield must be hidden in closure so that the futures are submitted
600        # before the first iterator value is required.
601        def result_iterator():
602            try:
603                # reverse to keep finishing order
604                fs.reverse()
605                while fs:
606                    # Careful not to keep a reference to the popped future
607                    if timeout is None:
608                        yield fs.pop().result()
609                    else:
610                        yield fs.pop().result(end_time - time.monotonic())
611            finally:
612                for future in fs:
613                    future.cancel()
614        return result_iterator()
615
616    def shutdown(self, wait=True, *, cancel_futures=False):
617        """Clean-up the resources associated with the Executor.
618
619        It is safe to call this method several times. Otherwise, no other
620        methods can be called after this one.
621
622        Args:
623            wait: If True then shutdown will not return until all running
624                futures have finished executing and the resources used by the
625                executor have been reclaimed.
626            cancel_futures: If True then shutdown will cancel all pending
627                futures. Futures that are completed or running will not be
628                cancelled.
629        """
630        pass
631
632    def __enter__(self):
633        return self
634
635    def __exit__(self, exc_type, exc_val, exc_tb):
636        self.shutdown(wait=True)
637        return False
638
639
640class BrokenExecutor(RuntimeError):
641    """
642    Raised when a executor has become non-functional after a severe failure.
643    """
644