1<?php
2namespace Aws\Retry;
3
4
5/**
6 * @internal
7 */
8class RateLimiter
9{
10    // User-configurable constants
11    private $beta;
12    private $minCapacity;
13    private $minFillRate;
14    private $scaleConstant;
15    private $smooth;
16
17    // Optional callable time provider
18    private $timeProvider;
19
20    // Pre-set state variables
21    private $currentCapacity = 0;
22    private $enabled = false;
23    private $lastMaxRate = 0;
24    private $measuredTxRate = 0;
25    private $requestCount = 0;
26
27    // Other state variables
28    private $fillRate;
29    private $lastThrottleTime;
30    private $lastTimestamp;
31    private $lastTxRateBucket;
32    private $maxCapacity;
33    private $timeWindow;
34
35    public function __construct($options = [])
36    {
37        $this->beta = isset($options['beta'])
38            ? $options['beta']
39            : 0.7;
40        $this->minCapacity = isset($options['min_capacity'])
41            ? $options['min_capacity']
42            : 1;
43        $this->minFillRate = isset($options['min_fill_rate'])
44            ? $options['min_fill_rate']
45            : 0.5;
46        $this->scaleConstant = isset($options['scale_constant'])
47            ? $options['scale_constant']
48            : 0.4;
49        $this->smooth = isset($options['smooth'])
50            ? $options['smooth']
51            : 0.8;
52        $this->timeProvider = isset($options['time_provider'])
53            ? $options['time_provider']
54            : null;
55
56        $this->lastTxRateBucket = floor($this->time());
57        $this->lastThrottleTime = $this->time();
58    }
59
60    public function isEnabled()
61    {
62        return $this->enabled;
63    }
64
65    public function getSendToken()
66    {
67        $this->acquireToken(1);
68    }
69
70    public function updateSendingRate($isThrottled)
71    {
72        $this->updateMeasuredRate();
73
74        if ($isThrottled) {
75            if (!$this->isEnabled()) {
76                $rateToUse = $this->measuredTxRate;
77            } else {
78                $rateToUse = min($this->measuredTxRate, $this->fillRate);
79            }
80
81            $this->lastMaxRate = $rateToUse;
82            $this->calculateTimeWindow();
83            $this->lastThrottleTime = $this->time();
84            $calculatedRate = $this->cubicThrottle($rateToUse);
85            $this->enableTokenBucket();
86        } else {
87            $this->calculateTimeWindow();
88            $calculatedRate = $this->cubicSuccess($this->time());
89        }
90        $newRate = min($calculatedRate, 2 * $this->measuredTxRate);
91        $this->updateTokenBucketRate($newRate);
92        return $newRate;
93    }
94
95    private function acquireToken($amount)
96    {
97        if (!$this->enabled) {
98            return true;
99        }
100
101        $this->refillTokenBucket();
102
103        if ($amount > $this->currentCapacity) {
104            usleep(1000000 * ($amount - $this->currentCapacity) / $this->fillRate);
105        }
106
107        $this->currentCapacity -= $amount;
108        return true;
109    }
110
111    private function calculateTimeWindow()
112    {
113        $this->timeWindow = pow(($this->lastMaxRate * (1 - $this->beta) / $this->scaleConstant), 0.333);
114    }
115
116    private function cubicSuccess($timestamp)
117    {
118        $dt = $timestamp - $this->lastThrottleTime;
119        return $this->scaleConstant * pow($dt - $this->timeWindow, 3) + $this->lastMaxRate;
120    }
121
122    private function cubicThrottle($rateToUse)
123    {
124        return $rateToUse * $this->beta;
125    }
126
127    private function enableTokenBucket()
128    {
129        $this->enabled = true;
130    }
131
132    private function refillTokenBucket()
133    {
134        $timestamp = $this->time();
135        if (!isset($this->lastTimestamp)) {
136            $this->lastTimestamp = $timestamp;
137            return;
138        }
139        $fillAmount = ($timestamp - $this->lastTimestamp) * $this->fillRate;
140        $this->currentCapacity = $this->currentCapacity + $fillAmount;
141        if (!is_null($this->maxCapacity)) {
142            $this->currentCapacity = min(
143                $this->maxCapacity,
144                $this->currentCapacity
145            );
146        }
147
148        $this->lastTimestamp = $timestamp;
149    }
150
151    private function time()
152    {
153        if (is_callable($this->timeProvider)) {
154            $provider = $this->timeProvider;
155            $time = $provider();
156            return $time;
157        }
158        return microtime(true);
159    }
160
161    private function updateMeasuredRate()
162    {
163        $timestamp = $this->time();
164        $timeBucket = floor(round($timestamp, 3) * 2) / 2;
165        $this->requestCount++;
166        if ($timeBucket > $this->lastTxRateBucket) {
167            $currentRate = $this->requestCount / ($timeBucket - $this->lastTxRateBucket);
168            $this->measuredTxRate = ($currentRate * $this->smooth)
169                + ($this->measuredTxRate * (1 - $this->smooth));
170            $this->requestCount = 0;
171            $this->lastTxRateBucket = $timeBucket;
172        }
173    }
174
175    private function updateTokenBucketRate($newRps)
176    {
177        $this->refillTokenBucket();
178        $this->fillRate = max($newRps, $this->minFillRate);
179        $this->maxCapacity = max($newRps, $this->minCapacity);
180        $this->currentCapacity = min($this->currentCapacity, $this->maxCapacity);
181    }
182}
183