1# Copyright 2016-2018 Julien Danjou 2# Copyright 2017 Elisey Zanko 3# Copyright 2016 Étienne Bersac 4# Copyright 2016 Joshua Harlow 5# Copyright 2013-2014 Ray Holder 6# 7# Licensed under the Apache License, Version 2.0 (the "License"); 8# you may not use this file except in compliance with the License. 9# You may obtain a copy of the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, 15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16# See the License for the specific language governing permissions and 17# limitations under the License. 18 19import functools 20import sys 21import threading 22import time 23import typing as t 24import warnings 25from abc import ABC, abstractmethod 26from concurrent import futures 27from inspect import iscoroutinefunction 28 29# Import all built-in retry strategies for easier usage. 30from .retry import retry_base # noqa 31from .retry import retry_all # noqa 32from .retry import retry_always # noqa 33from .retry import retry_any # noqa 34from .retry import retry_if_exception # noqa 35from .retry import retry_if_exception_type # noqa 36from .retry import retry_if_not_exception_type # noqa 37from .retry import retry_if_not_result # noqa 38from .retry import retry_if_result # noqa 39from .retry import retry_never # noqa 40from .retry import retry_unless_exception_type # noqa 41from .retry import retry_if_exception_message # noqa 42from .retry import retry_if_not_exception_message # noqa 43 44# Import all nap strategies for easier usage. 45from .nap import sleep # noqa 46from .nap import sleep_using_event # noqa 47 48# Import all built-in stop strategies for easier usage. 49from .stop import stop_after_attempt # noqa 50from .stop import stop_after_delay # noqa 51from .stop import stop_all # noqa 52from .stop import stop_any # noqa 53from .stop import stop_never # noqa 54from .stop import stop_when_event_set # noqa 55 56# Import all built-in wait strategies for easier usage. 57from .wait import wait_chain # noqa 58from .wait import wait_combine # noqa 59from .wait import wait_exponential # noqa 60from .wait import wait_fixed # noqa 61from .wait import wait_incrementing # noqa 62from .wait import wait_none # noqa 63from .wait import wait_random # noqa 64from .wait import wait_random_exponential # noqa 65from .wait import wait_random_exponential as wait_full_jitter # noqa 66 67# Import all built-in before strategies for easier usage. 68from .before import before_log # noqa 69from .before import before_nothing # noqa 70 71# Import all built-in after strategies for easier usage. 72from .after import after_log # noqa 73from .after import after_nothing # noqa 74 75# Import all built-in after strategies for easier usage. 76from .before_sleep import before_sleep_log # noqa 77from .before_sleep import before_sleep_nothing # noqa 78 79try: 80 import tornado # type: ignore 81except ImportError: 82 tornado = None # type: ignore 83 84if t.TYPE_CHECKING: 85 import types 86 87 from .wait import wait_base 88 from .stop import stop_base 89 90 91WrappedFn = t.TypeVar("WrappedFn", bound=t.Callable) 92_RetValT = t.TypeVar("_RetValT") 93 94 95@t.overload 96def retry(fn: WrappedFn) -> WrappedFn: 97 pass 98 99 100@t.overload 101def retry(*dargs: t.Any, **dkw: t.Any) -> t.Callable[[WrappedFn], WrappedFn]: # noqa 102 pass 103 104 105def retry(*dargs: t.Any, **dkw: t.Any) -> t.Union[WrappedFn, t.Callable[[WrappedFn], WrappedFn]]: # noqa 106 """Wrap a function with a new `Retrying` object. 107 108 :param dargs: positional arguments passed to Retrying object 109 :param dkw: keyword arguments passed to the Retrying object 110 """ 111 # support both @retry and @retry() as valid syntax 112 if len(dargs) == 1 and callable(dargs[0]): 113 return retry()(dargs[0]) 114 else: 115 116 def wrap(f: WrappedFn) -> WrappedFn: 117 if isinstance(f, retry_base): 118 warnings.warn( 119 f"Got retry_base instance ({f.__class__.__name__}) as callable argument, " 120 f"this will probably hang indefinitely (did you mean retry={f.__class__.__name__}(...)?)" 121 ) 122 if iscoroutinefunction(f): 123 r: "BaseRetrying" = AsyncRetrying(*dargs, **dkw) 124 elif tornado and hasattr(tornado.gen, "is_coroutine_function") and tornado.gen.is_coroutine_function(f): 125 r = TornadoRetrying(*dargs, **dkw) 126 else: 127 r = Retrying(*dargs, **dkw) 128 129 return r.wraps(f) 130 131 return wrap 132 133 134class TryAgain(Exception): 135 """Always retry the executed function when raised.""" 136 137 138NO_RESULT = object() 139 140 141class DoAttempt: 142 pass 143 144 145class DoSleep(float): 146 pass 147 148 149class BaseAction: 150 """Base class for representing actions to take by retry object. 151 152 Concrete implementations must define: 153 - __init__: to initialize all necessary fields 154 - REPR_FIELDS: class variable specifying attributes to include in repr(self) 155 - NAME: for identification in retry object methods and callbacks 156 """ 157 158 REPR_FIELDS: t.Sequence[str] = () 159 NAME: t.Optional[str] = None 160 161 def __repr__(self) -> str: 162 state_str = ", ".join(f"{field}={getattr(self, field)!r}" for field in self.REPR_FIELDS) 163 return f"{self.__class__.__name__}({state_str})" 164 165 def __str__(self) -> str: 166 return repr(self) 167 168 169class RetryAction(BaseAction): 170 REPR_FIELDS = ("sleep",) 171 NAME = "retry" 172 173 def __init__(self, sleep: t.SupportsFloat) -> None: 174 self.sleep = float(sleep) 175 176 177_unset = object() 178 179 180def _first_set(first: t.Union[t.Any, object], second: t.Any) -> t.Any: 181 return second if first is _unset else first 182 183 184class RetryError(Exception): 185 """Encapsulates the last attempt instance right before giving up.""" 186 187 def __init__(self, last_attempt: "Future") -> None: 188 self.last_attempt = last_attempt 189 super().__init__(last_attempt) 190 191 def reraise(self) -> t.NoReturn: 192 if self.last_attempt.failed: 193 raise self.last_attempt.result() 194 raise self 195 196 def __str__(self) -> str: 197 return f"{self.__class__.__name__}[{self.last_attempt}]" 198 199 200class AttemptManager: 201 """Manage attempt context.""" 202 203 def __init__(self, retry_state: "RetryCallState"): 204 self.retry_state = retry_state 205 206 def __enter__(self) -> None: 207 pass 208 209 def __exit__( 210 self, 211 exc_type: t.Optional[t.Type[BaseException]], 212 exc_value: t.Optional[BaseException], 213 traceback: t.Optional["types.TracebackType"], 214 ) -> t.Optional[bool]: 215 if isinstance(exc_value, BaseException): 216 self.retry_state.set_exception((exc_type, exc_value, traceback)) 217 return True # Swallow exception. 218 else: 219 # We don't have the result, actually. 220 self.retry_state.set_result(None) 221 return None 222 223 224class BaseRetrying(ABC): 225 def __init__( 226 self, 227 sleep: t.Callable[[t.Union[int, float]], None] = sleep, 228 stop: "stop_base" = stop_never, 229 wait: "wait_base" = wait_none(), 230 retry: retry_base = retry_if_exception_type(), 231 before: t.Callable[["RetryCallState"], None] = before_nothing, 232 after: t.Callable[["RetryCallState"], None] = after_nothing, 233 before_sleep: t.Optional[t.Callable[["RetryCallState"], None]] = None, 234 reraise: bool = False, 235 retry_error_cls: t.Type[RetryError] = RetryError, 236 retry_error_callback: t.Optional[t.Callable[["RetryCallState"], t.Any]] = None, 237 ): 238 self.sleep = sleep 239 self.stop = stop 240 self.wait = wait 241 self.retry = retry 242 self.before = before 243 self.after = after 244 self.before_sleep = before_sleep 245 self.reraise = reraise 246 self._local = threading.local() 247 self.retry_error_cls = retry_error_cls 248 self.retry_error_callback = retry_error_callback 249 250 def copy( 251 self, 252 sleep: t.Union[t.Callable[[t.Union[int, float]], None], object] = _unset, 253 stop: t.Union["stop_base", object] = _unset, 254 wait: t.Union["wait_base", object] = _unset, 255 retry: t.Union[retry_base, object] = _unset, 256 before: t.Union[t.Callable[["RetryCallState"], None], object] = _unset, 257 after: t.Union[t.Callable[["RetryCallState"], None], object] = _unset, 258 before_sleep: t.Union[t.Optional[t.Callable[["RetryCallState"], None]], object] = _unset, 259 reraise: t.Union[bool, object] = _unset, 260 retry_error_cls: t.Union[t.Type[RetryError], object] = _unset, 261 retry_error_callback: t.Union[t.Optional[t.Callable[["RetryCallState"], t.Any]], object] = _unset, 262 ) -> "BaseRetrying": 263 """Copy this object with some parameters changed if needed.""" 264 return self.__class__( 265 sleep=_first_set(sleep, self.sleep), 266 stop=_first_set(stop, self.stop), 267 wait=_first_set(wait, self.wait), 268 retry=_first_set(retry, self.retry), 269 before=_first_set(before, self.before), 270 after=_first_set(after, self.after), 271 before_sleep=_first_set(before_sleep, self.before_sleep), 272 reraise=_first_set(reraise, self.reraise), 273 retry_error_cls=_first_set(retry_error_cls, self.retry_error_cls), 274 retry_error_callback=_first_set(retry_error_callback, self.retry_error_callback), 275 ) 276 277 def __repr__(self) -> str: 278 return ( 279 f"<{self.__class__.__name__} object at 0x{id(self):x} (" 280 f"stop={self.stop}, " 281 f"wait={self.wait}, " 282 f"sleep={self.sleep}, " 283 f"retry={self.retry}, " 284 f"before={self.before}, " 285 f"after={self.after})>" 286 ) 287 288 @property 289 def statistics(self) -> t.Dict[str, t.Any]: 290 """Return a dictionary of runtime statistics. 291 292 This dictionary will be empty when the controller has never been 293 ran. When it is running or has ran previously it should have (but 294 may not) have useful and/or informational keys and values when 295 running is underway and/or completed. 296 297 .. warning:: The keys in this dictionary **should** be some what 298 stable (not changing), but there existence **may** 299 change between major releases as new statistics are 300 gathered or removed so before accessing keys ensure that 301 they actually exist and handle when they do not. 302 303 .. note:: The values in this dictionary are local to the thread 304 running call (so if multiple threads share the same retrying 305 object - either directly or indirectly) they will each have 306 there own view of statistics they have collected (in the 307 future we may provide a way to aggregate the various 308 statistics from each thread). 309 """ 310 try: 311 return self._local.statistics 312 except AttributeError: 313 self._local.statistics = {} 314 return self._local.statistics 315 316 def wraps(self, f: WrappedFn) -> WrappedFn: 317 """Wrap a function for retrying. 318 319 :param f: A function to wraps for retrying. 320 """ 321 322 @functools.wraps(f) 323 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any: 324 return self(f, *args, **kw) 325 326 def retry_with(*args: t.Any, **kwargs: t.Any) -> WrappedFn: 327 return self.copy(*args, **kwargs).wraps(f) 328 329 wrapped_f.retry = self 330 wrapped_f.retry_with = retry_with 331 332 return wrapped_f 333 334 def begin(self) -> None: 335 self.statistics.clear() 336 self.statistics["start_time"] = time.monotonic() 337 self.statistics["attempt_number"] = 1 338 self.statistics["idle_for"] = 0 339 340 def iter(self, retry_state: "RetryCallState") -> t.Union[DoAttempt, DoSleep, t.Any]: # noqa 341 fut = retry_state.outcome 342 if fut is None: 343 if self.before is not None: 344 self.before(retry_state) 345 return DoAttempt() 346 347 is_explicit_retry = retry_state.outcome.failed and isinstance(retry_state.outcome.exception(), TryAgain) 348 if not (is_explicit_retry or self.retry(retry_state=retry_state)): 349 return fut.result() 350 351 if self.after is not None: 352 self.after(retry_state) 353 354 self.statistics["delay_since_first_attempt"] = retry_state.seconds_since_start 355 if self.stop(retry_state=retry_state): 356 if self.retry_error_callback: 357 return self.retry_error_callback(retry_state) 358 retry_exc = self.retry_error_cls(fut) 359 if self.reraise: 360 raise retry_exc.reraise() 361 raise retry_exc from fut.exception() 362 363 if self.wait: 364 sleep = self.wait(retry_state=retry_state) 365 else: 366 sleep = 0.0 367 retry_state.next_action = RetryAction(sleep) 368 retry_state.idle_for += sleep 369 self.statistics["idle_for"] += sleep 370 self.statistics["attempt_number"] += 1 371 372 if self.before_sleep is not None: 373 self.before_sleep(retry_state) 374 375 return DoSleep(sleep) 376 377 def __iter__(self) -> t.Generator[AttemptManager, None, None]: 378 self.begin() 379 380 retry_state = RetryCallState(self, fn=None, args=(), kwargs={}) 381 while True: 382 do = self.iter(retry_state=retry_state) 383 if isinstance(do, DoAttempt): 384 yield AttemptManager(retry_state=retry_state) 385 elif isinstance(do, DoSleep): 386 retry_state.prepare_for_next_attempt() 387 self.sleep(do) 388 else: 389 break 390 391 @abstractmethod 392 def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT: 393 pass 394 395 396class Retrying(BaseRetrying): 397 """Retrying controller.""" 398 399 def __call__(self, fn: t.Callable[..., _RetValT], *args: t.Any, **kwargs: t.Any) -> _RetValT: 400 self.begin() 401 402 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs) 403 while True: 404 do = self.iter(retry_state=retry_state) 405 if isinstance(do, DoAttempt): 406 try: 407 result = fn(*args, **kwargs) 408 except BaseException: # noqa: B902 409 retry_state.set_exception(sys.exc_info()) 410 else: 411 retry_state.set_result(result) 412 elif isinstance(do, DoSleep): 413 retry_state.prepare_for_next_attempt() 414 self.sleep(do) 415 else: 416 return do 417 418 419class Future(futures.Future): 420 """Encapsulates a (future or past) attempted call to a target function.""" 421 422 def __init__(self, attempt_number: int) -> None: 423 super().__init__() 424 self.attempt_number = attempt_number 425 426 @property 427 def failed(self) -> bool: 428 """Return whether a exception is being held in this future.""" 429 return self.exception() is not None 430 431 @classmethod 432 def construct(cls, attempt_number: int, value: t.Any, has_exception: bool) -> "Future": 433 """Construct a new Future object.""" 434 fut = cls(attempt_number) 435 if has_exception: 436 fut.set_exception(value) 437 else: 438 fut.set_result(value) 439 return fut 440 441 442class RetryCallState: 443 """State related to a single call wrapped with Retrying.""" 444 445 def __init__( 446 self, 447 retry_object: BaseRetrying, 448 fn: t.Optional[WrappedFn], 449 args: t.Any, 450 kwargs: t.Any, 451 ) -> None: 452 #: Retry call start timestamp 453 self.start_time = time.monotonic() 454 #: Retry manager object 455 self.retry_object = retry_object 456 #: Function wrapped by this retry call 457 self.fn = fn 458 #: Arguments of the function wrapped by this retry call 459 self.args = args 460 #: Keyword arguments of the function wrapped by this retry call 461 self.kwargs = kwargs 462 463 #: The number of the current attempt 464 self.attempt_number: int = 1 465 #: Last outcome (result or exception) produced by the function 466 self.outcome: t.Optional[Future] = None 467 #: Timestamp of the last outcome 468 self.outcome_timestamp: t.Optional[float] = None 469 #: Time spent sleeping in retries 470 self.idle_for: float = 0.0 471 #: Next action as decided by the retry manager 472 self.next_action: t.Optional[RetryAction] = None 473 474 @property 475 def seconds_since_start(self) -> t.Optional[float]: 476 if self.outcome_timestamp is None: 477 return None 478 return self.outcome_timestamp - self.start_time 479 480 def prepare_for_next_attempt(self) -> None: 481 self.outcome = None 482 self.outcome_timestamp = None 483 self.attempt_number += 1 484 self.next_action = None 485 486 def set_result(self, val: t.Any) -> None: 487 ts = time.monotonic() 488 fut = Future(self.attempt_number) 489 fut.set_result(val) 490 self.outcome, self.outcome_timestamp = fut, ts 491 492 def set_exception(self, exc_info: t.Tuple[t.Type[BaseException], BaseException, "types.TracebackType"]) -> None: 493 ts = time.monotonic() 494 fut = Future(self.attempt_number) 495 fut.set_exception(exc_info[1]) 496 self.outcome, self.outcome_timestamp = fut, ts 497 498 def __repr__(self): 499 if self.outcome is None: 500 result = "none yet" 501 elif self.outcome.failed: 502 exception = self.outcome.exception() 503 result = f"failed ({exception.__class__.__name__} {exception})" 504 else: 505 result = f"returned {self.outcome.result()}" 506 507 slept = float(round(self.idle_for, 2)) 508 clsname = self.__class__.__name__ 509 return f"<{clsname} {id(self)}: attempt #{self.attempt_number}; slept for {slept}; last result: {result}>" 510 511 512from tenacity._asyncio import AsyncRetrying # noqa:E402,I100 513 514if tornado: 515 from tenacity.tornadoweb import TornadoRetrying 516