1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4    'Future', 'wrap_future', 'isfuture',
5)
6
7import concurrent.futures
8import contextvars
9import logging
10import sys
11
12from . import base_futures
13from . import events
14from . import exceptions
15from . import format_helpers
16
17
18isfuture = base_futures.isfuture
19
20
21_PENDING = base_futures._PENDING
22_CANCELLED = base_futures._CANCELLED
23_FINISHED = base_futures._FINISHED
24
25
26STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
27
28
29class Future:
30    """This class is *almost* compatible with concurrent.futures.Future.
31
32    Differences:
33
34    - This class is not thread-safe.
35
36    - result() and exception() do not take a timeout argument and
37      raise an exception when the future isn't done yet.
38
39    - Callbacks registered with add_done_callback() are always called
40      via the event loop's call_soon().
41
42    - This class is not compatible with the wait() and as_completed()
43      methods in the concurrent.futures package.
44
45    (In Python 3.4 or later we may be able to unify the implementations.)
46    """
47
48    # Class variables serving as defaults for instance variables.
49    _state = _PENDING
50    _result = None
51    _exception = None
52    _loop = None
53    _source_traceback = None
54
55    # This field is used for a dual purpose:
56    # - Its presence is a marker to declare that a class implements
57    #   the Future protocol (i.e. is intended to be duck-type compatible).
58    #   The value must also be not-None, to enable a subclass to declare
59    #   that it is not compatible by setting this to None.
60    # - It is set by __iter__() below so that Task._step() can tell
61    #   the difference between
62    #   `await Future()` or`yield from Future()` (correct) vs.
63    #   `yield Future()` (incorrect).
64    _asyncio_future_blocking = False
65
66    __log_traceback = False
67
68    def __init__(self, *, loop=None):
69        """Initialize the future.
70
71        The optional event_loop argument allows explicitly setting the event
72        loop object used by the future. If it's not provided, the future uses
73        the default event loop.
74        """
75        if loop is None:
76            self._loop = events.get_event_loop()
77        else:
78            self._loop = loop
79        self._callbacks = []
80        if self._loop.get_debug():
81            self._source_traceback = format_helpers.extract_stack(
82                sys._getframe(1))
83
84    _repr_info = base_futures._future_repr_info
85
86    def __repr__(self):
87        return '<{} {}>'.format(self.__class__.__name__,
88                                ' '.join(self._repr_info()))
89
90    def __del__(self):
91        if not self.__log_traceback:
92            # set_exception() was not called, or result() or exception()
93            # has consumed the exception
94            return
95        exc = self._exception
96        context = {
97            'message':
98                f'{self.__class__.__name__} exception was never retrieved',
99            'exception': exc,
100            'future': self,
101        }
102        if self._source_traceback:
103            context['source_traceback'] = self._source_traceback
104        self._loop.call_exception_handler(context)
105
106    @property
107    def _log_traceback(self):
108        return self.__log_traceback
109
110    @_log_traceback.setter
111    def _log_traceback(self, val):
112        if bool(val):
113            raise ValueError('_log_traceback can only be set to False')
114        self.__log_traceback = False
115
116    def get_loop(self):
117        """Return the event loop the Future is bound to."""
118        loop = self._loop
119        if loop is None:
120            raise RuntimeError("Future object is not initialized.")
121        return loop
122
123    def cancel(self):
124        """Cancel the future and schedule callbacks.
125
126        If the future is already done or cancelled, return False.  Otherwise,
127        change the future's state to cancelled, schedule the callbacks and
128        return True.
129        """
130        self.__log_traceback = False
131        if self._state != _PENDING:
132            return False
133        self._state = _CANCELLED
134        self.__schedule_callbacks()
135        return True
136
137    def __schedule_callbacks(self):
138        """Internal: Ask the event loop to call all callbacks.
139
140        The callbacks are scheduled to be called as soon as possible. Also
141        clears the callback list.
142        """
143        callbacks = self._callbacks[:]
144        if not callbacks:
145            return
146
147        self._callbacks[:] = []
148        for callback, ctx in callbacks:
149            self._loop.call_soon(callback, self, context=ctx)
150
151    def cancelled(self):
152        """Return True if the future was cancelled."""
153        return self._state == _CANCELLED
154
155    # Don't implement running(); see http://bugs.python.org/issue18699
156
157    def done(self):
158        """Return True if the future is done.
159
160        Done means either that a result / exception are available, or that the
161        future was cancelled.
162        """
163        return self._state != _PENDING
164
165    def result(self):
166        """Return the result this future represents.
167
168        If the future has been cancelled, raises CancelledError.  If the
169        future's result isn't yet available, raises InvalidStateError.  If
170        the future is done and has an exception set, this exception is raised.
171        """
172        if self._state == _CANCELLED:
173            raise exceptions.CancelledError
174        if self._state != _FINISHED:
175            raise exceptions.InvalidStateError('Result is not ready.')
176        self.__log_traceback = False
177        if self._exception is not None:
178            raise self._exception
179        return self._result
180
181    def exception(self):
182        """Return the exception that was set on this future.
183
184        The exception (or None if no exception was set) is returned only if
185        the future is done.  If the future has been cancelled, raises
186        CancelledError.  If the future isn't done yet, raises
187        InvalidStateError.
188        """
189        if self._state == _CANCELLED:
190            raise exceptions.CancelledError
191        if self._state != _FINISHED:
192            raise exceptions.InvalidStateError('Exception is not set.')
193        self.__log_traceback = False
194        return self._exception
195
196    def add_done_callback(self, fn, *, context=None):
197        """Add a callback to be run when the future becomes done.
198
199        The callback is called with a single argument - the future object. If
200        the future is already done when this is called, the callback is
201        scheduled with call_soon.
202        """
203        if self._state != _PENDING:
204            self._loop.call_soon(fn, self, context=context)
205        else:
206            if context is None:
207                context = contextvars.copy_context()
208            self._callbacks.append((fn, context))
209
210    # New method not in PEP 3148.
211
212    def remove_done_callback(self, fn):
213        """Remove all instances of a callback from the "call when done" list.
214
215        Returns the number of callbacks removed.
216        """
217        filtered_callbacks = [(f, ctx)
218                              for (f, ctx) in self._callbacks
219                              if f != fn]
220        removed_count = len(self._callbacks) - len(filtered_callbacks)
221        if removed_count:
222            self._callbacks[:] = filtered_callbacks
223        return removed_count
224
225    # So-called internal methods (note: no set_running_or_notify_cancel()).
226
227    def set_result(self, result):
228        """Mark the future done and set its result.
229
230        If the future is already done when this method is called, raises
231        InvalidStateError.
232        """
233        if self._state != _PENDING:
234            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
235        self._result = result
236        self._state = _FINISHED
237        self.__schedule_callbacks()
238
239    def set_exception(self, exception):
240        """Mark the future done and set an exception.
241
242        If the future is already done when this method is called, raises
243        InvalidStateError.
244        """
245        if self._state != _PENDING:
246            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
247        if isinstance(exception, type):
248            exception = exception()
249        if type(exception) is StopIteration:
250            raise TypeError("StopIteration interacts badly with generators "
251                            "and cannot be raised into a Future")
252        self._exception = exception
253        self._state = _FINISHED
254        self.__schedule_callbacks()
255        self.__log_traceback = True
256
257    def __await__(self):
258        if not self.done():
259            self._asyncio_future_blocking = True
260            yield self  # This tells Task to wait for completion.
261        if not self.done():
262            raise RuntimeError("await wasn't used with future")
263        return self.result()  # May raise too.
264
265    __iter__ = __await__  # make compatible with 'yield from'.
266
267
268# Needed for testing purposes.
269_PyFuture = Future
270
271
272def _get_loop(fut):
273    # Tries to call Future.get_loop() if it's available.
274    # Otherwise fallbacks to using the old '_loop' property.
275    try:
276        get_loop = fut.get_loop
277    except AttributeError:
278        pass
279    else:
280        return get_loop()
281    return fut._loop
282
283
284def _set_result_unless_cancelled(fut, result):
285    """Helper setting the result only if the future was not cancelled."""
286    if fut.cancelled():
287        return
288    fut.set_result(result)
289
290
291def _convert_future_exc(exc):
292    exc_class = type(exc)
293    if exc_class is concurrent.futures.CancelledError:
294        return exceptions.CancelledError(*exc.args)
295    elif exc_class is concurrent.futures.TimeoutError:
296        return exceptions.TimeoutError(*exc.args)
297    elif exc_class is concurrent.futures.InvalidStateError:
298        return exceptions.InvalidStateError(*exc.args)
299    else:
300        return exc
301
302
303def _set_concurrent_future_state(concurrent, source):
304    """Copy state from a future to a concurrent.futures.Future."""
305    assert source.done()
306    if source.cancelled():
307        concurrent.cancel()
308    if not concurrent.set_running_or_notify_cancel():
309        return
310    exception = source.exception()
311    if exception is not None:
312        concurrent.set_exception(_convert_future_exc(exception))
313    else:
314        result = source.result()
315        concurrent.set_result(result)
316
317
318def _copy_future_state(source, dest):
319    """Internal helper to copy state from another Future.
320
321    The other Future may be a concurrent.futures.Future.
322    """
323    assert source.done()
324    if dest.cancelled():
325        return
326    assert not dest.done()
327    if source.cancelled():
328        dest.cancel()
329    else:
330        exception = source.exception()
331        if exception is not None:
332            dest.set_exception(_convert_future_exc(exception))
333        else:
334            result = source.result()
335            dest.set_result(result)
336
337
338def _chain_future(source, destination):
339    """Chain two futures so that when one completes, so does the other.
340
341    The result (or exception) of source will be copied to destination.
342    If destination is cancelled, source gets cancelled too.
343    Compatible with both asyncio.Future and concurrent.futures.Future.
344    """
345    if not isfuture(source) and not isinstance(source,
346                                               concurrent.futures.Future):
347        raise TypeError('A future is required for source argument')
348    if not isfuture(destination) and not isinstance(destination,
349                                                    concurrent.futures.Future):
350        raise TypeError('A future is required for destination argument')
351    source_loop = _get_loop(source) if isfuture(source) else None
352    dest_loop = _get_loop(destination) if isfuture(destination) else None
353
354    def _set_state(future, other):
355        if isfuture(future):
356            _copy_future_state(other, future)
357        else:
358            _set_concurrent_future_state(future, other)
359
360    def _call_check_cancel(destination):
361        if destination.cancelled():
362            if source_loop is None or source_loop is dest_loop:
363                source.cancel()
364            else:
365                source_loop.call_soon_threadsafe(source.cancel)
366
367    def _call_set_state(source):
368        if (destination.cancelled() and
369                dest_loop is not None and dest_loop.is_closed()):
370            return
371        if dest_loop is None or dest_loop is source_loop:
372            _set_state(destination, source)
373        else:
374            dest_loop.call_soon_threadsafe(_set_state, destination, source)
375
376    destination.add_done_callback(_call_check_cancel)
377    source.add_done_callback(_call_set_state)
378
379
380def wrap_future(future, *, loop=None):
381    """Wrap concurrent.futures.Future object."""
382    if isfuture(future):
383        return future
384    assert isinstance(future, concurrent.futures.Future), \
385        f'concurrent.futures.Future is expected, got {future!r}'
386    if loop is None:
387        loop = events.get_event_loop()
388    new_future = loop.create_future()
389    _chain_future(future, new_future)
390    return new_future
391
392
393try:
394    import _asyncio
395except ImportError:
396    pass
397else:
398    # _CFuture is needed for tests.
399    Future = _CFuture = _asyncio.Future
400