1<?php 2namespace Aws; 3 4use Aws\Exception\AwsException; 5use Aws\Retry\ConfigurationInterface; 6use Aws\Retry\QuotaManager; 7use Aws\Retry\RateLimiter; 8use Aws\Retry\RetryHelperTrait; 9use GuzzleHttp\Exception\RequestException; 10use GuzzleHttp\Promise; 11use Psr\Http\Message\RequestInterface; 12 13/** 14 * Middleware that retries failures. V2 implementation that supports 'standard' 15 * and 'adaptive' modes. 16 * 17 * @internal 18 */ 19class RetryMiddlewareV2 20{ 21 use RetryHelperTrait; 22 23 private static $standardThrottlingErrors = [ 24 'Throttling' => true, 25 'ThrottlingException' => true, 26 'ThrottledException' => true, 27 'RequestThrottledException' => true, 28 'TooManyRequestsException' => true, 29 'ProvisionedThroughputExceededException' => true, 30 'TransactionInProgressException' => true, 31 'RequestLimitExceeded' => true, 32 'BandwidthLimitExceeded' => true, 33 'LimitExceededException' => true, 34 'RequestThrottled' => true, 35 'SlowDown' => true, 36 'PriorRequestNotComplete' => true, 37 'EC2ThrottledException' => true, 38 ]; 39 40 private static $standardTransientErrors = [ 41 'RequestTimeout' => true, 42 'RequestTimeoutException' => true, 43 ]; 44 45 private static $standardTransientStatusCodes = [ 46 500 => true, 47 502 => true, 48 503 => true, 49 504 => true, 50 ]; 51 52 private $collectStats; 53 private $decider; 54 private $delayer; 55 private $maxAttempts; 56 private $maxBackoff; 57 private $mode; 58 private $nextHandler; 59 private $options; 60 private $quotaManager; 61 private $rateLimiter; 62 63 public static function wrap($config, $options) 64 { 65 return function (callable $handler) use ( 66 $config, 67 $options 68 ) { 69 return new static( 70 $config, 71 $handler, 72 $options 73 ); 74 }; 75 } 76 77 public static function createDefaultDecider( 78 QuotaManager $quotaManager, 79 $maxAttempts = 3, 80 $options = [] 81 ) { 82 $retryCurlErrors = []; 83 if (extension_loaded('curl')) { 84 $retryCurlErrors[CURLE_RECV_ERROR] = true; 85 } 86 87 return function( 88 $attempts, 89 CommandInterface $command, 90 $result 91 ) use ($options, $quotaManager, $retryCurlErrors, $maxAttempts) { 92 93 // Release retry tokens back to quota on a successful result 94 $quotaManager->releaseToQuota($result); 95 96 // Allow command-level option to override this value 97 // # of attempts = # of retries + 1 98 $maxAttempts = (null !== $command['@retries']) 99 ? $command['@retries'] + 1 100 : $maxAttempts; 101 102 $isRetryable = self::isRetryable( 103 $result, 104 $retryCurlErrors, 105 $options 106 ); 107 108 if ($isRetryable) { 109 110 // Retrieve retry tokens and check if quota has been exceeded 111 if (!$quotaManager->hasRetryQuota($result)) { 112 return false; 113 } 114 115 if ($attempts >= $maxAttempts) { 116 if (!empty($result) && $result instanceof AwsException) { 117 $result->setMaxRetriesExceeded(); 118 } 119 return false; 120 } 121 } 122 123 return $isRetryable; 124 }; 125 } 126 127 public function __construct( 128 ConfigurationInterface $config, 129 callable $handler, 130 $options = [] 131 ) { 132 $this->options = $options; 133 $this->maxAttempts = $config->getMaxAttempts(); 134 $this->mode = $config->getMode(); 135 $this->nextHandler = $handler; 136 $this->quotaManager = new QuotaManager(); 137 138 $this->maxBackoff = isset($options['max_backoff']) 139 ? $options['max_backoff'] 140 : 20000; 141 142 $this->collectStats = isset($options['collect_stats']) 143 ? (bool) $options['collect_stats'] 144 : false; 145 146 $this->decider = isset($options['decider']) 147 ? $options['decider'] 148 : self::createDefaultDecider( 149 $this->quotaManager, 150 $this->maxAttempts, 151 $options 152 ); 153 154 $this->delayer = isset($options['delayer']) 155 ? $options['delayer'] 156 : function ($attempts) { 157 return $this->exponentialDelayWithJitter($attempts); 158 }; 159 160 if ($this->mode === 'adaptive') { 161 $this->rateLimiter = isset($options['rate_limiter']) 162 ? $options['rate_limiter'] 163 : new RateLimiter(); 164 } 165 } 166 167 public function __invoke(CommandInterface $cmd, RequestInterface $req) 168 { 169 $decider = $this->decider; 170 $delayer = $this->delayer; 171 $handler = $this->nextHandler; 172 173 $attempts = 1; 174 $monitoringEvents = []; 175 $requestStats = []; 176 177 $req = $this->addRetryHeader($req, 0, 0); 178 179 $callback = function ($value) use ( 180 $handler, 181 $cmd, 182 $req, 183 $decider, 184 $delayer, 185 &$attempts, 186 &$requestStats, 187 &$monitoringEvents, 188 &$callback 189 ) { 190 if ($this->mode === 'adaptive') { 191 $this->rateLimiter->updateSendingRate($this->isThrottlingError($value)); 192 } 193 194 $this->updateHttpStats($value, $requestStats); 195 196 if ($value instanceof MonitoringEventsInterface) { 197 $reversedEvents = array_reverse($monitoringEvents); 198 $monitoringEvents = array_merge($monitoringEvents, $value->getMonitoringEvents()); 199 foreach ($reversedEvents as $event) { 200 $value->prependMonitoringEvent($event); 201 } 202 } 203 if ($value instanceof \Exception || $value instanceof \Throwable) { 204 if (!$decider($attempts, $cmd, $value)) { 205 return Promise\rejection_for( 206 $this->bindStatsToReturn($value, $requestStats) 207 ); 208 } 209 } elseif ($value instanceof ResultInterface 210 && !$decider($attempts, $cmd, $value) 211 ) { 212 return $this->bindStatsToReturn($value, $requestStats); 213 } 214 215 $delayBy = $delayer($attempts++); 216 $cmd['@http']['delay'] = $delayBy; 217 if ($this->collectStats) { 218 $this->updateStats($attempts - 1, $delayBy, $requestStats); 219 } 220 221 // Update retry header with retry count and delayBy 222 $req = $this->addRetryHeader($req, $attempts - 1, $delayBy); 223 224 // Get token from rate limiter, which will sleep if necessary 225 if ($this->mode === 'adaptive') { 226 $this->rateLimiter->getSendToken(); 227 } 228 229 return $handler($cmd, $req)->then($callback, $callback); 230 }; 231 232 // Get token from rate limiter, which will sleep if necessary 233 if ($this->mode === 'adaptive') { 234 $this->rateLimiter->getSendToken(); 235 } 236 237 return $handler($cmd, $req)->then($callback, $callback); 238 } 239 240 /** 241 * Amount of milliseconds to delay as a function of attempt number 242 * 243 * @param $attempts 244 * @return mixed 245 */ 246 public function exponentialDelayWithJitter($attempts) 247 { 248 $rand = mt_rand() / mt_getrandmax(); 249 return min(1000 * $rand * pow(2, $attempts) , $this->maxBackoff); 250 } 251 252 private static function isRetryable( 253 $result, 254 $retryCurlErrors, 255 $options = [] 256 ) { 257 $errorCodes = self::$standardThrottlingErrors + self::$standardTransientErrors; 258 if (!empty($options['transient_error_codes']) 259 && is_array($options['transient_error_codes']) 260 ) { 261 foreach($options['transient_error_codes'] as $code) { 262 $errorCodes[$code] = true; 263 } 264 } 265 if (!empty($options['throttling_error_codes']) 266 && is_array($options['throttling_error_codes']) 267 ) { 268 foreach($options['throttling_error_codes'] as $code) { 269 $errorCodes[$code] = true; 270 } 271 } 272 273 $statusCodes = self::$standardTransientStatusCodes; 274 if (!empty($options['status_codes']) 275 && is_array($options['status_codes']) 276 ) { 277 foreach($options['status_codes'] as $code) { 278 $statusCodes[$code] = true; 279 } 280 } 281 282 if (!empty($options['curl_errors']) 283 && is_array($options['curl_errors']) 284 ) { 285 foreach($options['curl_errors'] as $code) { 286 $retryCurlErrors[$code] = true; 287 } 288 } 289 290 if ($result instanceof \Exception || $result instanceof \Throwable) { 291 $isError = true; 292 } else { 293 $isError = false; 294 } 295 296 if (!$isError) { 297 if (!isset($result['@metadata']['statusCode'])) { 298 return false; 299 } 300 return isset($statusCodes[$result['@metadata']['statusCode']]); 301 } 302 303 if (!($result instanceof AwsException)) { 304 return false; 305 } 306 307 if ($result->isConnectionError()) { 308 return true; 309 } 310 311 if (!empty($errorCodes[$result->getAwsErrorCode()])) { 312 return true; 313 } 314 315 if (!empty($statusCodes[$result->getStatusCode()])) { 316 return true; 317 } 318 319 if (count($retryCurlErrors) 320 && ($previous = $result->getPrevious()) 321 && $previous instanceof RequestException 322 ) { 323 if (method_exists($previous, 'getHandlerContext')) { 324 $context = $previous->getHandlerContext(); 325 return !empty($context['errno']) 326 && isset($retryCurlErrors[$context['errno']]); 327 } 328 329 $message = $previous->getMessage(); 330 foreach (array_keys($retryCurlErrors) as $curlError) { 331 if (strpos($message, 'cURL error ' . $curlError . ':') === 0) { 332 return true; 333 } 334 } 335 } 336 337 // Check error shape for the retryable trait 338 if (!empty($errorShape = $result->getAwsErrorShape())) { 339 $definition = $errorShape->toArray(); 340 if (!empty($definition['retryable'])) { 341 return true; 342 } 343 } 344 345 return false; 346 } 347 348 private function isThrottlingError($result) 349 { 350 if ($result instanceof AwsException) { 351 // Check pre-defined throttling errors 352 $throttlingErrors = self::$standardThrottlingErrors; 353 if (!empty($this->options['throttling_error_codes']) 354 && is_array($this->options['throttling_error_codes']) 355 ) { 356 foreach($this->options['throttling_error_codes'] as $code) { 357 $throttlingErrors[$code] = true; 358 } 359 } 360 if (!empty($result->getAwsErrorCode()) 361 && !empty($throttlingErrors[$result->getAwsErrorCode()]) 362 ) { 363 return true; 364 } 365 366 // Check error shape for the throttling trait 367 if (!empty($errorShape = $result->getAwsErrorShape())) { 368 $definition = $errorShape->toArray(); 369 if (!empty($definition['retryable']['throttling'])) { 370 return true; 371 } 372 } 373 } 374 375 return false; 376 } 377} 378