1<?php 2namespace Aws\Retry; 3 4 5/** 6 * @internal 7 */ 8class RateLimiter 9{ 10 // User-configurable constants 11 private $beta; 12 private $minCapacity; 13 private $minFillRate; 14 private $scaleConstant; 15 private $smooth; 16 17 // Optional callable time provider 18 private $timeProvider; 19 20 // Pre-set state variables 21 private $currentCapacity = 0; 22 private $enabled = false; 23 private $lastMaxRate = 0; 24 private $measuredTxRate = 0; 25 private $requestCount = 0; 26 27 // Other state variables 28 private $fillRate; 29 private $lastThrottleTime; 30 private $lastTimestamp; 31 private $lastTxRateBucket; 32 private $maxCapacity; 33 private $timeWindow; 34 35 public function __construct($options = []) 36 { 37 $this->beta = isset($options['beta']) 38 ? $options['beta'] 39 : 0.7; 40 $this->minCapacity = isset($options['min_capacity']) 41 ? $options['min_capacity'] 42 : 1; 43 $this->minFillRate = isset($options['min_fill_rate']) 44 ? $options['min_fill_rate'] 45 : 0.5; 46 $this->scaleConstant = isset($options['scale_constant']) 47 ? $options['scale_constant'] 48 : 0.4; 49 $this->smooth = isset($options['smooth']) 50 ? $options['smooth'] 51 : 0.8; 52 $this->timeProvider = isset($options['time_provider']) 53 ? $options['time_provider'] 54 : null; 55 56 $this->lastTxRateBucket = floor($this->time()); 57 $this->lastThrottleTime = $this->time(); 58 } 59 60 public function isEnabled() 61 { 62 return $this->enabled; 63 } 64 65 public function getSendToken() 66 { 67 $this->acquireToken(1); 68 } 69 70 public function updateSendingRate($isThrottled) 71 { 72 $this->updateMeasuredRate(); 73 74 if ($isThrottled) { 75 if (!$this->isEnabled()) { 76 $rateToUse = $this->measuredTxRate; 77 } else { 78 $rateToUse = min($this->measuredTxRate, $this->fillRate); 79 } 80 81 $this->lastMaxRate = $rateToUse; 82 $this->calculateTimeWindow(); 83 $this->lastThrottleTime = $this->time(); 84 $calculatedRate = $this->cubicThrottle($rateToUse); 85 $this->enableTokenBucket(); 86 } else { 87 $this->calculateTimeWindow(); 88 $calculatedRate = $this->cubicSuccess($this->time()); 89 } 90 $newRate = min($calculatedRate, 2 * $this->measuredTxRate); 91 $this->updateTokenBucketRate($newRate); 92 return $newRate; 93 } 94 95 private function acquireToken($amount) 96 { 97 if (!$this->enabled) { 98 return true; 99 } 100 101 $this->refillTokenBucket(); 102 103 if ($amount > $this->currentCapacity) { 104 usleep(1000000 * ($amount - $this->currentCapacity) / $this->fillRate); 105 } 106 107 $this->currentCapacity -= $amount; 108 return true; 109 } 110 111 private function calculateTimeWindow() 112 { 113 $this->timeWindow = pow(($this->lastMaxRate * (1 - $this->beta) / $this->scaleConstant), 0.333); 114 } 115 116 private function cubicSuccess($timestamp) 117 { 118 $dt = $timestamp - $this->lastThrottleTime; 119 return $this->scaleConstant * pow($dt - $this->timeWindow, 3) + $this->lastMaxRate; 120 } 121 122 private function cubicThrottle($rateToUse) 123 { 124 return $rateToUse * $this->beta; 125 } 126 127 private function enableTokenBucket() 128 { 129 $this->enabled = true; 130 } 131 132 private function refillTokenBucket() 133 { 134 $timestamp = $this->time(); 135 if (!isset($this->lastTimestamp)) { 136 $this->lastTimestamp = $timestamp; 137 return; 138 } 139 $fillAmount = ($timestamp - $this->lastTimestamp) * $this->fillRate; 140 $this->currentCapacity = $this->currentCapacity + $fillAmount; 141 if (!is_null($this->maxCapacity)) { 142 $this->currentCapacity = min( 143 $this->maxCapacity, 144 $this->currentCapacity 145 ); 146 } 147 148 $this->lastTimestamp = $timestamp; 149 } 150 151 private function time() 152 { 153 if (is_callable($this->timeProvider)) { 154 $provider = $this->timeProvider; 155 $time = $provider(); 156 return $time; 157 } 158 return microtime(true); 159 } 160 161 private function updateMeasuredRate() 162 { 163 $timestamp = $this->time(); 164 $timeBucket = floor(round($timestamp, 3) * 2) / 2; 165 $this->requestCount++; 166 if ($timeBucket > $this->lastTxRateBucket) { 167 $currentRate = $this->requestCount / ($timeBucket - $this->lastTxRateBucket); 168 $this->measuredTxRate = ($currentRate * $this->smooth) 169 + ($this->measuredTxRate * (1 - $this->smooth)); 170 $this->requestCount = 0; 171 $this->lastTxRateBucket = $timeBucket; 172 } 173 } 174 175 private function updateTokenBucketRate($newRps) 176 { 177 $this->refillTokenBucket(); 178 $this->fillRate = max($newRps, $this->minFillRate); 179 $this->maxCapacity = max($newRps, $this->minCapacity); 180 $this->currentCapacity = min($this->currentCapacity, $this->maxCapacity); 181 } 182} 183