1# Copyright 2016-2018 Julien Danjou
2# Copyright 2017 Elisey Zanko
3# Copyright 2016 Étienne Bersac
4# Copyright 2016 Joshua Harlow
5# Copyright 2013-2014 Ray Holder
6#
7# Licensed under the Apache License, Version 2.0 (the "License");
8# you may not use this file except in compliance with the License.
9# You may obtain a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS,
15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18
19import functools
20import sys
21import threading
22import time
23import typing as t
24import warnings
25from abc import ABC, abstractmethod
26from concurrent import futures
27from inspect import iscoroutinefunction
28
29# Import all built-in retry strategies for easier usage.
30from .retry import retry_base  # noqa
31from .retry import retry_all  # noqa
32from .retry import retry_always  # noqa
33from .retry import retry_any  # noqa
34from .retry import retry_if_exception  # noqa
35from .retry import retry_if_exception_type  # noqa
36from .retry import retry_if_not_exception_type  # noqa
37from .retry import retry_if_not_result  # noqa
38from .retry import retry_if_result  # noqa
39from .retry import retry_never  # noqa
40from .retry import retry_unless_exception_type  # noqa
41from .retry import retry_if_exception_message  # noqa
42from .retry import retry_if_not_exception_message  # noqa
43
44# Import all nap strategies for easier usage.
45from .nap import sleep  # noqa
46from .nap import sleep_using_event  # noqa
47
48# Import all built-in stop strategies for easier usage.
49from .stop import stop_after_attempt  # noqa
50from .stop import stop_after_delay  # noqa
51from .stop import stop_all  # noqa
52from .stop import stop_any  # noqa
53from .stop import stop_never  # noqa
54from .stop import stop_when_event_set  # noqa
55
56# Import all built-in wait strategies for easier usage.
57from .wait import wait_chain  # noqa
58from .wait import wait_combine  # noqa
59from .wait import wait_exponential  # noqa
60from .wait import wait_fixed  # noqa
61from .wait import wait_incrementing  # noqa
62from .wait import wait_none  # noqa
63from .wait import wait_random  # noqa
64from .wait import wait_random_exponential  # noqa
65from .wait import wait_random_exponential as wait_full_jitter  # noqa
66
67# Import all built-in before strategies for easier usage.
68from .before import before_log  # noqa
69from .before import before_nothing  # noqa
70
71# Import all built-in after strategies for easier usage.
72from .after import after_log  # noqa
73from .after import after_nothing  # noqa
74
75# Import all built-in after strategies for easier usage.
76from .before_sleep import before_sleep_log  # noqa
77from .before_sleep import before_sleep_nothing  # noqa
78
79try:
80    import tornado  # type: ignore
81except ImportError:
82    tornado = None  # type: ignore
83
84if t.TYPE_CHECKING:
85    import types
86
87    from .wait import wait_base
88    from .stop import stop_base
89
90
91WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable)
92_RetValT = t.TypeVar("_RetValT")
93
94
95@t.overload
96def retry(fn: WrappedFn) -> WrappedFn:
97    pass
98
99
100@t.overload
101def retry(*dargs: t.Any, **dkw: t.Any) -> t.Callable[[WrappedFn], WrappedFn]:  # noqa
102    pass
103
104
105def retry(*dargs: t.Any, **dkw: t.Any) -> t.Union[WrappedFn, t.Callable[[WrappedFn], WrappedFn]]:  # noqa
106    """Wrap a function with a new `Retrying` object.
107
108    :param dargs: positional arguments passed to Retrying object
109    :param dkw: keyword arguments passed to the Retrying object
110    """
111    # support both @retry and @retry() as valid syntax
112    if len(dargs) == 1 and callable(dargs[0]):
113        return retry()(dargs[0])
114    else:
115
116        def wrap(f: WrappedFn) -> WrappedFn:
117            if isinstance(f, retry_base):
118                warnings.warn(
119                    f"Got retry_base instance ({f.__class__.__name__}) as callable argument, "
120                    f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)"
121                )
122            if iscoroutinefunction(f):
123                r: "BaseRetrying" = AsyncRetrying(*dargs, **dkw)
124            elif tornado and hasattr(tornado.gen, "is_coroutine_function") and tornado.gen.is_coroutine_function(f):
125                r = TornadoRetrying(*dargs, **dkw)
126            else:
127                r = Retrying(*dargs, **dkw)
128
129            return r.wraps(f)
130
131        return wrap
132
133
134class TryAgain(Exception):
135    """Always retry the executed function when raised."""
136
137
138NO_RESULT = object()
139
140
141class DoAttempt:
142    pass
143
144
145class DoSleep(float):
146    pass
147
148
149class BaseAction:
150    """Base class for representing actions to take by retry object.
151
152    Concrete implementations must define:
153    - __init__: to initialize all necessary fields
154    - REPR_FIELDS: class variable specifying attributes to include in repr(self)
155    - NAME: for identification in retry object methods and callbacks
156    """
157
158    REPR_FIELDS: t.Sequence[str] = ()
159    NAME: t.Optional[str] = None
160
161    def __repr__(self) -> str:
162        state_str = ", ".join(f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS)
163        return f"{self.__class__.__name__}({state_str})"
164
165    def __str__(self) -> str:
166        return repr(self)
167
168
169class RetryAction(BaseAction):
170    REPR_FIELDS = ("sleep",)
171    NAME = "retry"
172
173    def __init__(self, sleep: t.SupportsFloat) -> None:
174        self.sleep = float(sleep)
175
176
177_unset = object()
178
179
180def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any:
181    return second if first is _unset else first
182
183
184class RetryError(Exception):
185    """Encapsulates the last attempt instance right before giving up."""
186
187    def __init__(self, last_attempt: "Future") -> None:
188        self.last_attempt = last_attempt
189        super().__init__(last_attempt)
190
191    def reraise(self) -> t.NoReturn:
192        if self.last_attempt.failed:
193            raise self.last_attempt.result()
194        raise self
195
196    def __str__(self) -> str:
197        return f"{self.__class__.__name__}[{self.last_attempt}]"
198
199
200class AttemptManager:
201    """Manage attempt context."""
202
203    def __init__(self, retry_state: "RetryCallState"):
204        self.retry_state = retry_state
205
206    def __enter__(self) -> None:
207        pass
208
209    def __exit__(
210        self,
211        exc_type: t.Optional[t.Type[BaseException]],
212        exc_value: t.Optional[BaseException],
213        traceback: t.Optional["types.TracebackType"],
214    ) -> t.Optional[bool]:
215        if isinstance(exc_value, BaseException):
216            self.retry_state.set_exception((exc_type, exc_value, traceback))
217            return True  # Swallow exception.
218        else:
219            # We don't have the result, actually.
220            self.retry_state.set_result(None)
221            return None
222
223
224class BaseRetrying(ABC):
225    def __init__(
226        self,
227        sleep: t.Callable[[t.Union[int, float]], None] = sleep,
228        stop: "stop_base" = stop_never,
229        wait: "wait_base" = wait_none(),
230        retry: retry_base = retry_if_exception_type(),
231        before: t.Callable[["RetryCallState"], None] = before_nothing,
232        after: t.Callable[["RetryCallState"], None] = after_nothing,
233        before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None,
234        reraise: bool = False,
235        retry_error_cls: t.Type[RetryError] = RetryError,
236        retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None,
237    ):
238        self.sleep = sleep
239        self.stop = stop
240        self.wait = wait
241        self.retry = retry
242        self.before = before
243        self.after = after
244        self.before_sleep = before_sleep
245        self.reraise = reraise
246        self._local = threading.local()
247        self.retry_error_cls = retry_error_cls
248        self.retry_error_callback = retry_error_callback
249
250    def copy(
251        self,
252        sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset,
253        stop: t.Union["stop_base", object] = _unset,
254        wait: t.Union["wait_base", object] = _unset,
255        retry: t.Union[retry_base, object] = _unset,
256        before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
257        after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset,
258        before_sleep: t.Union[t.Optional[t.Callable[["RetryCallState"], None]], object] = _unset,
259        reraise: t.Union[bool, object] = _unset,
260        retry_error_cls: t.Union[t.Type[RetryError], object] = _unset,
261        retry_error_callback: t.Union[t.Optional[t.Callable[["RetryCallState"], t.Any]], object] = _unset,
262    ) -> "BaseRetrying":
263        """Copy this object with some parameters changed if needed."""
264        return self.__class__(
265            sleep=_first_set(sleep, self.sleep),
266            stop=_first_set(stop, self.stop),
267            wait=_first_set(wait, self.wait),
268            retry=_first_set(retry, self.retry),
269            before=_first_set(before, self.before),
270            after=_first_set(after, self.after),
271            before_sleep=_first_set(before_sleep, self.before_sleep),
272            reraise=_first_set(reraise, self.reraise),
273            retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls),
274            retry_error_callback=_first_set(retry_error_callback, self.retry_error_callback),
275        )
276
277    def __repr__(self) -> str:
278        return (
279            f"<{self.__class__.__name__} object at 0x{id(self):x} ("
280            f"stop={self.stop}, "
281            f"wait={self.wait}, "
282            f"sleep={self.sleep}, "
283            f"retry={self.retry}, "
284            f"before={self.before}, "
285            f"after={self.after})>"
286        )
287
288    @property
289    def statistics(self) -> t.Dict[str, t.Any]:
290        """Return a dictionary of runtime statistics.
291
292        This dictionary will be empty when the controller has never been
293        ran. When it is running or has ran previously it should have (but
294        may not) have useful and/or informational keys and values when
295        running is underway and/or completed.
296
297        .. warning:: The keys in this dictionary **should** be some what
298                     stable (not changing), but there existence **may**
299                     change between major releases as new statistics are
300                     gathered or removed so before accessing keys ensure that
301                     they actually exist and handle when they do not.
302
303        .. note:: The values in this dictionary are local to the thread
304                  running call (so if multiple threads share the same retrying
305                  object - either directly or indirectly) they will each have
306                  there own view of statistics they have collected (in the
307                  future we may provide a way to aggregate the various
308                  statistics from each thread).
309        """
310        try:
311            return self._local.statistics
312        except AttributeError:
313            self._local.statistics = {}
314            return self._local.statistics
315
316    def wraps(self, f: WrappedFn) -> WrappedFn:
317        """Wrap a function for retrying.
318
319        :param f: A function to wraps for retrying.
320        """
321
322        @functools.wraps(f)
323        def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
324            return self(f, *args, **kw)
325
326        def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn:
327            return self.copy(*args, **kwargs).wraps(f)
328
329        wrapped_f.retry = self
330        wrapped_f.retry_with = retry_with
331
332        return wrapped_f
333
334    def begin(self) -> None:
335        self.statistics.clear()
336        self.statistics["start_time"] = time.monotonic()
337        self.statistics["attempt_number"] = 1
338        self.statistics["idle_for"] = 0
339
340    def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]:  # noqa
341        fut = retry_state.outcome
342        if fut is None:
343            if self.before is not None:
344                self.before(retry_state)
345            return DoAttempt()
346
347        is_explicit_retry = retry_state.outcome.failed and isinstance(retry_state.outcome.exception(), TryAgain)
348        if not (is_explicit_retry or self.retry(retry_state=retry_state)):
349            return fut.result()
350
351        if self.after is not None:
352            self.after(retry_state)
353
354        self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start
355        if self.stop(retry_state=retry_state):
356            if self.retry_error_callback:
357                return self.retry_error_callback(retry_state)
358            retry_exc = self.retry_error_cls(fut)
359            if self.reraise:
360                raise retry_exc.reraise()
361            raise retry_exc from fut.exception()
362
363        if self.wait:
364            sleep = self.wait(retry_state=retry_state)
365        else:
366            sleep = 0.0
367        retry_state.next_action = RetryAction(sleep)
368        retry_state.idle_for += sleep
369        self.statistics["idle_for"] += sleep
370        self.statistics["attempt_number"] += 1
371
372        if self.before_sleep is not None:
373            self.before_sleep(retry_state)
374
375        return DoSleep(sleep)
376
377    def __iter__(self) -> t.Generator[AttemptManager, None, None]:
378        self.begin()
379
380        retry_state = RetryCallState(self, fn=None, args=(), kwargs={})
381        while True:
382            do = self.iter(retry_state=retry_state)
383            if isinstance(do, DoAttempt):
384                yield AttemptManager(retry_state=retry_state)
385            elif isinstance(do, DoSleep):
386                retry_state.prepare_for_next_attempt()
387                self.sleep(do)
388            else:
389                break
390
391    @abstractmethod
392    def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT:
393        pass
394
395
396class Retrying(BaseRetrying):
397    """Retrying controller."""
398
399    def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT:
400        self.begin()
401
402        retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
403        while True:
404            do = self.iter(retry_state=retry_state)
405            if isinstance(do, DoAttempt):
406                try:
407                    result = fn(*args, **kwargs)
408                except BaseException:  # noqa: B902
409                    retry_state.set_exception(sys.exc_info())
410                else:
411                    retry_state.set_result(result)
412            elif isinstance(do, DoSleep):
413                retry_state.prepare_for_next_attempt()
414                self.sleep(do)
415            else:
416                return do
417
418
419class Future(futures.Future):
420    """Encapsulates a (future or past) attempted call to a target function."""
421
422    def __init__(self, attempt_number: int) -> None:
423        super().__init__()
424        self.attempt_number = attempt_number
425
426    @property
427    def failed(self) -> bool:
428        """Return whether a exception is being held in this future."""
429        return self.exception() is not None
430
431    @classmethod
432    def construct(cls, attempt_number: int, value: t.Any, has_exception: bool) -> "Future":
433        """Construct a new Future object."""
434        fut = cls(attempt_number)
435        if has_exception:
436            fut.set_exception(value)
437        else:
438            fut.set_result(value)
439        return fut
440
441
442class RetryCallState:
443    """State related to a single call wrapped with Retrying."""
444
445    def __init__(
446        self,
447        retry_object: BaseRetrying,
448        fn: t.Optional[WrappedFn],
449        args: t.Any,
450        kwargs: t.Any,
451    ) -> None:
452        #: Retry call start timestamp
453        self.start_time = time.monotonic()
454        #: Retry manager object
455        self.retry_object = retry_object
456        #: Function wrapped by this retry call
457        self.fn = fn
458        #: Arguments of the function wrapped by this retry call
459        self.args = args
460        #: Keyword arguments of the function wrapped by this retry call
461        self.kwargs = kwargs
462
463        #: The number of the current attempt
464        self.attempt_number: int = 1
465        #: Last outcome (result or exception) produced by the function
466        self.outcome: t.Optional[Future] = None
467        #: Timestamp of the last outcome
468        self.outcome_timestamp: t.Optional[float] = None
469        #: Time spent sleeping in retries
470        self.idle_for: float = 0.0
471        #: Next action as decided by the retry manager
472        self.next_action: t.Optional[RetryAction] = None
473
474    @property
475    def seconds_since_start(self) -> t.Optional[float]:
476        if self.outcome_timestamp is None:
477            return None
478        return self.outcome_timestamp - self.start_time
479
480    def prepare_for_next_attempt(self) -> None:
481        self.outcome = None
482        self.outcome_timestamp = None
483        self.attempt_number += 1
484        self.next_action = None
485
486    def set_result(self, val: t.Any) -> None:
487        ts = time.monotonic()
488        fut = Future(self.attempt_number)
489        fut.set_result(val)
490        self.outcome, self.outcome_timestamp = fut, ts
491
492    def set_exception(self, exc_info: t.Tuple[t.Type[BaseException], BaseException, "types.TracebackType"]) -> None:
493        ts = time.monotonic()
494        fut = Future(self.attempt_number)
495        fut.set_exception(exc_info[1])
496        self.outcome, self.outcome_timestamp = fut, ts
497
498    def __repr__(self):
499        if self.outcome is None:
500            result = "none yet"
501        elif self.outcome.failed:
502            exception = self.outcome.exception()
503            result = f"failed ({exception.__class__.__name__} {exception})"
504        else:
505            result = f"returned {self.outcome.result()}"
506
507        slept = float(round(self.idle_for, 2))
508        clsname = self.__class__.__name__
509        return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>"
510
511
512from tenacity._asyncio import AsyncRetrying  # noqa:E402,I100
513
514if tornado:
515    from tenacity.tornadoweb import TornadoRetrying
516