1"""Standard retry behavior. 2 3This contains the default standard retry behavior. 4It provides consistent behavior with other AWS SDKs. 5 6The key base classes uses for retries: 7 8 * ``BaseRetryableChecker`` - Use to check a specific condition that 9 indicates a retry should happen. This can include things like 10 max attempts, HTTP status code checks, error code checks etc. 11 * ``RetryBackoff`` - Use to determine how long we should backoff until 12 we retry a request. This is the class that will implement delay such 13 as exponential backoff. 14 * ``RetryPolicy`` - Main class that determines if a retry should 15 happen. It can combine data from a various BaseRetryableCheckers 16 to make a final call as to whether or not a retry should happen. 17 It then uses a ``BaseRetryBackoff`` to determine how long to delay. 18 * ``RetryHandler`` - The bridge between botocore's event system 19 used by endpoint.py to manage retries and the interfaces defined 20 in this module. 21 22This allows us to define an API that has minimal coupling to the event 23based API used by botocore. 24 25""" 26import random 27import logging 28 29from botocore.exceptions import ConnectionError, HTTPClientError 30from botocore.exceptions import ReadTimeoutError, ConnectTimeoutError 31from botocore.retries import quota 32from botocore.retries import special 33from botocore.retries.base import BaseRetryBackoff, BaseRetryableChecker 34 35DEFAULT_MAX_ATTEMPTS = 3 36logger = logging.getLogger(__name__) 37 38 39def register_retry_handler(client, max_attempts=DEFAULT_MAX_ATTEMPTS): 40 retry_quota = RetryQuotaChecker(quota.RetryQuota()) 41 42 service_id = client.meta.service_model.service_id 43 service_event_name = service_id.hyphenize() 44 client.meta.events.register('after-call.%s' % service_event_name, 45 retry_quota.release_retry_quota) 46 47 handler = RetryHandler( 48 retry_policy=RetryPolicy( 49 retry_checker=StandardRetryConditions(max_attempts=max_attempts), 50 retry_backoff=ExponentialBackoff(), 51 ), 52 retry_event_adapter=RetryEventAdapter(), 53 retry_quota=retry_quota, 54 ) 55 56 unique_id = 'retry-config-%s' % service_event_name 57 client.meta.events.register( 58 'needs-retry.%s' % service_event_name, handler.needs_retry, 59 unique_id=unique_id 60 ) 61 return handler 62 63 64class RetryHandler(object): 65 """Bridge between botocore's event system and this module. 66 67 This class is intended to be hooked to botocore's event system 68 as an event handler. 69 """ 70 def __init__(self, retry_policy, retry_event_adapter, retry_quota): 71 self._retry_policy = retry_policy 72 self._retry_event_adapter = retry_event_adapter 73 self._retry_quota = retry_quota 74 75 def needs_retry(self, **kwargs): 76 """Connect as a handler to the needs-retry event.""" 77 retry_delay = None 78 context = self._retry_event_adapter.create_retry_context(**kwargs) 79 if self._retry_policy.should_retry(context): 80 # Before we can retry we need to ensure we have sufficient 81 # capacity in our retry quota. 82 if self._retry_quota.acquire_retry_quota(context): 83 retry_delay = self._retry_policy.compute_retry_delay(context) 84 logger.debug("Retry needed, retrying request after " 85 "delay of: %s", retry_delay) 86 else: 87 logger.debug("Retry needed but retry quota reached, " 88 "not retrying request.") 89 else: 90 logger.debug("Not retrying request.") 91 self._retry_event_adapter.adapt_retry_response_from_context( 92 context) 93 return retry_delay 94 95 96class RetryEventAdapter(object): 97 """Adapter to existing retry interface used in the endpoints layer. 98 99 This existing interface for determining if a retry needs to happen 100 is event based and used in ``botocore.endpoint``. The interface has 101 grown organically over the years and could use some cleanup. This 102 adapter converts that interface into the interface used by the 103 new retry strategies. 104 105 """ 106 def create_retry_context(self, **kwargs): 107 """Create context based on needs-retry kwargs.""" 108 response = kwargs['response'] 109 if response is None: 110 # If response is None it means that an exception was raised 111 # because we never received a response from the service. This 112 # could be something like a ConnectionError we get from our 113 # http layer. 114 http_response = None 115 parsed_response = None 116 else: 117 http_response, parsed_response = response 118 # This provides isolation between the kwargs emitted in the 119 # needs-retry event, and what this module uses to check for 120 # retries. 121 context = RetryContext( 122 attempt_number=kwargs['attempts'], 123 operation_model=kwargs['operation'], 124 http_response=http_response, 125 parsed_response=parsed_response, 126 caught_exception=kwargs['caught_exception'], 127 request_context=kwargs['request_dict']['context'], 128 ) 129 return context 130 131 def adapt_retry_response_from_context(self, context): 132 """Modify response back to user back from context.""" 133 # This will mutate attributes that are returned back to the end 134 # user. We do it this way so that all the various retry classes 135 # don't mutate any input parameters from the needs-retry event. 136 metadata = context.get_retry_metadata() 137 if context.parsed_response is not None: 138 context.parsed_response.setdefault( 139 'ResponseMetadata', {}).update(metadata) 140 141 142# Implementation note: this is meant to encapsulate all the misc stuff 143# that gets sent in the needs-retry event. This is mapped so that params 144# are more clear and explicit. 145class RetryContext(object): 146 """Normalize a response that we use to check if a retry should occur. 147 148 This class smoothes over the different types of responses we may get 149 from a service including: 150 151 * A modeled error response from the service that contains a service 152 code and error message. 153 * A raw HTTP response that doesn't contain service protocol specific 154 error keys. 155 * An exception received while attempting to retrieve a response. 156 This could be a ConnectionError we receive from our HTTP layer which 157 could represent that we weren't able to receive a response from 158 the service. 159 160 This class guarantees that at least one of the above attributes will be 161 non None. 162 163 This class is meant to provide a read-only view into the properties 164 associated with a possible retryable response. None of the properties 165 are meant to be modified directly. 166 167 """ 168 def __init__(self, attempt_number, operation_model=None, 169 parsed_response=None, http_response=None, 170 caught_exception=None, request_context=None): 171 # 1-based attempt number. 172 self.attempt_number = attempt_number 173 self.operation_model = operation_model 174 # This is the parsed response dictionary we get from parsing 175 # the HTTP response from the service. 176 self.parsed_response = parsed_response 177 # This is an instance of botocore.awsrequest.AWSResponse. 178 self.http_response = http_response 179 # This is a subclass of Exception that will be non None if 180 # an exception was raised when retrying to retrieve a response. 181 self.caught_exception = caught_exception 182 # This is the request context dictionary that's added to the 183 # request dict. This is used to story any additional state 184 # about the request. We use this for storing retry quota 185 # capacity. 186 if request_context is None: 187 request_context = {} 188 self.request_context = request_context 189 self._retry_metadata = {} 190 191 # These are misc helper methods to avoid duplication in the various 192 # checkers. 193 def get_error_code(self): 194 """Check if there was a parsed response with an error code. 195 196 If we could not find any error codes, ``None`` is returned. 197 198 """ 199 if self.parsed_response is None: 200 return 201 error = self.parsed_response.get('Error', {}) 202 if not isinstance(error, dict): 203 return 204 return error.get('Code') 205 206 def add_retry_metadata(self, **kwargs): 207 """Add key/value pairs to the retry metadata. 208 209 This allows any objects during the retry process to add 210 metadata about any checks/validations that happened. 211 212 This gets added to the response metadata in the retry handler. 213 214 """ 215 self._retry_metadata.update(**kwargs) 216 217 def get_retry_metadata(self): 218 return self._retry_metadata.copy() 219 220 221class RetryPolicy(object): 222 def __init__(self, retry_checker, retry_backoff): 223 self._retry_checker = retry_checker 224 self._retry_backoff = retry_backoff 225 226 def should_retry(self, context): 227 return self._retry_checker.is_retryable(context) 228 229 def compute_retry_delay(self, context): 230 return self._retry_backoff.delay_amount(context) 231 232 233class ExponentialBackoff(BaseRetryBackoff): 234 235 _BASE = 2 236 _MAX_BACKOFF = 20 237 238 def __init__(self, max_backoff=20, random=random.random): 239 self._base = self._BASE 240 self._max_backoff = max_backoff 241 self._random = random 242 243 def delay_amount(self, context): 244 """Calculates delay based on exponential backoff. 245 246 This class implements truncated binary exponential backoff 247 with jitter:: 248 249 t_i = min(rand(0, 1) * 2 ** attempt, MAX_BACKOFF) 250 251 where ``i`` is the request attempt (0 based). 252 253 """ 254 # The context.attempt_number is a 1-based value, but we have 255 # to calculate the delay based on i based a 0-based value. We 256 # want the first delay to just be ``rand(0, 1)``. 257 return min( 258 self._random() * (self._base ** (context.attempt_number - 1)), 259 self._max_backoff 260 ) 261 262 263class MaxAttemptsChecker(BaseRetryableChecker): 264 def __init__(self, max_attempts): 265 self._max_attempts = max_attempts 266 267 def is_retryable(self, context): 268 under_max_attempts = context.attempt_number < self._max_attempts 269 if not under_max_attempts: 270 logger.debug("Max attempts of %s reached.", self._max_attempts) 271 context.add_retry_metadata(MaxAttemptsReached=True) 272 return under_max_attempts 273 274 275class TransientRetryableChecker(BaseRetryableChecker): 276 _TRANSIENT_ERROR_CODES = [ 277 'RequestTimeout', 278 'RequestTimeoutException', 279 'PriorRequestNotComplete', 280 ] 281 _TRANSIENT_STATUS_CODES = [500, 502, 503, 504] 282 _TRANSIENT_EXCEPTION_CLS = ( 283 ConnectionError, 284 HTTPClientError, 285 ) 286 287 def __init__(self, transient_error_codes=None, 288 transient_status_codes=None, 289 transient_exception_cls=None): 290 if transient_error_codes is None: 291 transient_error_codes = self._TRANSIENT_ERROR_CODES[:] 292 if transient_status_codes is None: 293 transient_status_codes = self._TRANSIENT_STATUS_CODES[:] 294 if transient_exception_cls is None: 295 transient_exception_cls = self._TRANSIENT_EXCEPTION_CLS 296 self._transient_error_codes = transient_error_codes 297 self._transient_status_codes = transient_status_codes 298 self._transient_exception_cls = transient_exception_cls 299 300 def is_retryable(self, context): 301 if context.get_error_code() in self._transient_error_codes: 302 return True 303 if context.http_response is not None: 304 if context.http_response.status_code in \ 305 self._transient_status_codes: 306 return True 307 if context.caught_exception is not None: 308 return isinstance(context.caught_exception, 309 self._transient_exception_cls) 310 return False 311 312 313class ThrottledRetryableChecker(BaseRetryableChecker): 314 # This is the union of all error codes we've seen that represent 315 # a throttled error. 316 _THROTTLED_ERROR_CODES = [ 317 'Throttling', 318 'ThrottlingException', 319 'ThrottledException', 320 'RequestThrottledException', 321 'TooManyRequestsException', 322 'ProvisionedThroughputExceededException', 323 'TransactionInProgressException', 324 'RequestLimitExceeded', 325 'BandwidthLimitExceeded', 326 'LimitExceededException', 327 'RequestThrottled', 328 'SlowDown', 329 'PriorRequestNotComplete', 330 'EC2ThrottledException', 331 ] 332 333 def __init__(self, throttled_error_codes=None): 334 if throttled_error_codes is None: 335 throttled_error_codes = self._THROTTLED_ERROR_CODES[:] 336 self._throttled_error_codes = throttled_error_codes 337 338 def is_retryable(self, context): 339 # Only the error code from a parsed service response is used 340 # to determine if the response is a throttled response. 341 return context.get_error_code() in self._throttled_error_codes 342 343 344class ModeledRetryableChecker(BaseRetryableChecker): 345 """Check if an error has been modeled as retryable.""" 346 347 def __init__(self): 348 self._error_detector = ModeledRetryErrorDetector() 349 350 def is_retryable(self, context): 351 error_code = context.get_error_code() 352 if error_code is None: 353 return False 354 return self._error_detector.detect_error_type(context) is not None 355 356 357class ModeledRetryErrorDetector(object): 358 """Checks whether or not an error is a modeled retryable error.""" 359 # There are return values from the detect_error_type() method. 360 TRANSIENT_ERROR = 'TRANSIENT_ERROR' 361 THROTTLING_ERROR = 'THROTTLING_ERROR' 362 # This class is lower level than ModeledRetryableChecker, which 363 # implements BaseRetryableChecker. This object allows you to distinguish 364 # between the various types of retryable errors. 365 366 def detect_error_type(self, context): 367 """Detect the error type associated with an error code and model. 368 369 This will either return: 370 371 * ``self.TRANSIENT_ERROR`` - If the error is a transient error 372 * ``self.THROTTLING_ERROR`` - If the error is a throttling error 373 * ``None`` - If the error is neither type of error. 374 375 """ 376 error_code = context.get_error_code() 377 op_model = context.operation_model 378 if op_model is None or not op_model.error_shapes: 379 return 380 for shape in op_model.error_shapes: 381 if shape.metadata.get('retryable') is not None: 382 # Check if this error code matches the shape. This can 383 # be either by name or by a modeled error code. 384 error_code_to_check = ( 385 shape.metadata.get('error', {}).get('code') or shape.name 386 ) 387 if error_code == error_code_to_check: 388 if shape.metadata['retryable'].get('throttling'): 389 return self.THROTTLING_ERROR 390 return self.TRANSIENT_ERROR 391 392 393class ThrottlingErrorDetector(object): 394 def __init__(self, retry_event_adapter): 395 self._modeled_error_detector = ModeledRetryErrorDetector() 396 self._fixed_error_code_detector = ThrottledRetryableChecker() 397 self._retry_event_adapter = retry_event_adapter 398 399 # This expects the kwargs from needs-retry to be passed through. 400 def is_throttling_error(self, **kwargs): 401 context = self._retry_event_adapter.create_retry_context(**kwargs) 402 if self._fixed_error_code_detector.is_retryable(context): 403 return True 404 error_type = self._modeled_error_detector.detect_error_type(context) 405 return error_type == self._modeled_error_detector.THROTTLING_ERROR 406 407 408class StandardRetryConditions(BaseRetryableChecker): 409 """Concrete class that implements the standard retry policy checks. 410 411 Specifically: 412 413 not max_attempts and (transient or throttled or modeled_retry) 414 415 """ 416 417 def __init__(self, max_attempts=DEFAULT_MAX_ATTEMPTS): 418 # Note: This class is for convenience so you can have the 419 # standard retry condition in a single class. 420 self._max_attempts_checker = MaxAttemptsChecker(max_attempts) 421 self._additional_checkers = OrRetryChecker([ 422 TransientRetryableChecker(), 423 ThrottledRetryableChecker(), 424 ModeledRetryableChecker(), 425 OrRetryChecker([ 426 special.RetryIDPCommunicationError(), 427 special.RetryDDBChecksumError(), 428 ]) 429 ]) 430 431 def is_retryable(self, context): 432 return ( 433 self._max_attempts_checker.is_retryable(context) 434 and self._additional_checkers.is_retryable(context) 435 ) 436 437 438class OrRetryChecker(BaseRetryableChecker): 439 def __init__(self, checkers): 440 self._checkers = checkers 441 442 def is_retryable(self, context): 443 return any(checker.is_retryable(context) for checker in self._checkers) 444 445 446class RetryQuotaChecker(object): 447 _RETRY_COST = 5 448 _NO_RETRY_INCREMENT = 1 449 _TIMEOUT_RETRY_REQUEST = 10 450 _TIMEOUT_EXCEPTIONS = (ConnectTimeoutError, ReadTimeoutError) 451 452 # Implementation note: We're not making this a BaseRetryableChecker 453 # because this isn't just a check if we can retry. This also changes 454 # state so we have to careful when/how we call this. Making it 455 # a BaseRetryableChecker implies you can call .is_retryable(context) 456 # as many times as you want and not affect anything. 457 458 def __init__(self, quota): 459 self._quota = quota 460 # This tracks the last amount 461 self._last_amount_acquired = None 462 463 def acquire_retry_quota(self, context): 464 if self._is_timeout_error(context): 465 capacity_amount = self._TIMEOUT_RETRY_REQUEST 466 else: 467 capacity_amount = self._RETRY_COST 468 success = self._quota.acquire(capacity_amount) 469 if success: 470 # We add the capacity amount to the request context so we know 471 # how much to release later. The capacity amount can vary based 472 # on the error. 473 context.request_context['retry_quota_capacity'] = capacity_amount 474 return True 475 context.add_retry_metadata(RetryQuotaReached=True) 476 return False 477 478 def _is_timeout_error(self, context): 479 return isinstance(context.caught_exception, self._TIMEOUT_EXCEPTIONS) 480 481 # This is intended to be hooked up to ``after-call``. 482 def release_retry_quota(self, context, http_response, **kwargs): 483 # There's three possible options. 484 # 1. The HTTP response did not have a 2xx response. In that case we 485 # give no quota back. 486 # 2. The HTTP request was successful and was never retried. In 487 # that case we give _NO_RETRY_INCREMENT back. 488 # 3. The API call had retries, and we eventually receive an HTTP 489 # response with a 2xx status code. In that case we give back 490 # whatever quota was associated with the last acquisition. 491 if http_response is None: 492 return 493 status_code = http_response.status_code 494 if 200 <= status_code < 300: 495 if 'retry_quota_capacity' not in context: 496 self._quota.release(self._NO_RETRY_INCREMENT) 497 else: 498 capacity_amount = context['retry_quota_capacity'] 499 self._quota.release(capacity_amount) 500