1<?php
2namespace GuzzleHttp\Ring\Client;
3
4use GuzzleHttp\Ring\Future\FutureArray;
5use React\Promise\Deferred;
6
7/**
8 * Returns an asynchronous response using curl_multi_* functions.
9 *
10 * This handler supports future responses and the "delay" request client
11 * option that can be used to delay before sending a request.
12 *
13 * When using the CurlMultiHandler, custom curl options can be specified as an
14 * associative array of curl option constants mapping to values in the
15 * **curl** key of the "client" key of the request.
16 *
17 * @property resource $_mh Internal use only. Lazy loaded multi-handle.
18 */
19class CurlMultiHandler
20{
21    /** @var callable */
22    private $factory;
23    private $selectTimeout;
24    private $active;
25    private $handles = [];
26    private $delays = [];
27    private $maxHandles;
28
29    /**
30     * This handler accepts the following options:
31     *
32     * - mh: An optional curl_multi resource
33     * - handle_factory: An optional callable used to generate curl handle
34     *   resources. the callable accepts a request hash and returns an array
35     *   of the handle, headers file resource, and the body resource.
36     * - select_timeout: Optional timeout (in seconds) to block before timing
37     *   out while selecting curl handles. Defaults to 1 second.
38     * - max_handles: Optional integer representing the maximum number of
39     *   open requests. When this number is reached, the queued futures are
40     *   flushed.
41     *
42     * @param array $options
43     */
44    public function __construct(array $options = [])
45    {
46        if (isset($options['mh'])) {
47            $this->_mh = $options['mh'];
48        }
49        $this->factory = isset($options['handle_factory'])
50            ? $options['handle_factory'] : new CurlFactory();
51        $this->selectTimeout = isset($options['select_timeout'])
52            ? $options['select_timeout'] : 1;
53        $this->maxHandles = isset($options['max_handles'])
54            ? $options['max_handles'] : 100;
55    }
56
57    public function __get($name)
58    {
59        if ($name === '_mh') {
60            return $this->_mh = curl_multi_init();
61        }
62
63        throw new \BadMethodCallException();
64    }
65
66    public function __destruct()
67    {
68        // Finish any open connections before terminating the script.
69        if ($this->handles) {
70            $this->execute();
71        }
72
73        if (isset($this->_mh)) {
74            curl_multi_close($this->_mh);
75            unset($this->_mh);
76        }
77    }
78
79    public function __invoke(array $request)
80    {
81        $factory = $this->factory;
82        $result = $factory($request);
83        $entry = [
84            'request'  => $request,
85            'response' => [],
86            'handle'   => $result[0],
87            'headers'  => &$result[1],
88            'body'     => $result[2],
89            'deferred' => new Deferred(),
90        ];
91
92        $id = (int) $result[0];
93
94        $future = new FutureArray(
95            $entry['deferred']->promise(),
96            [$this, 'execute'],
97            function () use ($id) {
98                return $this->cancel($id);
99            }
100        );
101
102        $this->addRequest($entry);
103
104        // Transfer outstanding requests if there are too many open handles.
105        if (count($this->handles) >= $this->maxHandles) {
106            $this->execute();
107        }
108
109        return $future;
110    }
111
112    /**
113     * Runs until all outstanding connections have completed.
114     */
115    public function execute()
116    {
117        do {
118
119            if ($this->active &&
120                curl_multi_select($this->_mh, $this->selectTimeout) === -1
121            ) {
122                // Perform a usleep if a select returns -1.
123                // See: https://bugs.php.net/bug.php?id=61141
124                usleep(250);
125            }
126
127            // Add any delayed futures if needed.
128            if ($this->delays) {
129                $this->addDelays();
130            }
131
132            do {
133                $mrc = curl_multi_exec($this->_mh, $this->active);
134            } while ($mrc === CURLM_CALL_MULTI_PERFORM);
135
136            $this->processMessages();
137
138            // If there are delays but no transfers, then sleep for a bit.
139            if (!$this->active && $this->delays) {
140                usleep(500);
141            }
142
143        } while ($this->active || $this->handles);
144    }
145
146    private function addRequest(array &$entry)
147    {
148        $id = (int) $entry['handle'];
149        $this->handles[$id] = $entry;
150
151        // If the request is a delay, then add the reques to the curl multi
152        // pool only after the specified delay.
153        if (isset($entry['request']['client']['delay'])) {
154            $this->delays[$id] = microtime(true) + ($entry['request']['client']['delay'] / 1000);
155        } elseif (empty($entry['request']['future'])) {
156            curl_multi_add_handle($this->_mh, $entry['handle']);
157        } else {
158            curl_multi_add_handle($this->_mh, $entry['handle']);
159            // "lazy" futures are only sent once the pool has many requests.
160            if ($entry['request']['future'] !== 'lazy') {
161                do {
162                    $mrc = curl_multi_exec($this->_mh, $this->active);
163                } while ($mrc === CURLM_CALL_MULTI_PERFORM);
164                $this->processMessages();
165            }
166        }
167    }
168
169    private function removeProcessed($id)
170    {
171        if (isset($this->handles[$id])) {
172            curl_multi_remove_handle(
173                $this->_mh,
174                $this->handles[$id]['handle']
175            );
176            curl_close($this->handles[$id]['handle']);
177            unset($this->handles[$id], $this->delays[$id]);
178        }
179    }
180
181    /**
182     * Cancels a handle from sending and removes references to it.
183     *
184     * @param int $id Handle ID to cancel and remove.
185     *
186     * @return bool True on success, false on failure.
187     */
188    private function cancel($id)
189    {
190        // Cannot cancel if it has been processed.
191        if (!isset($this->handles[$id])) {
192            return false;
193        }
194
195        $handle = $this->handles[$id]['handle'];
196        unset($this->delays[$id], $this->handles[$id]);
197        curl_multi_remove_handle($this->_mh, $handle);
198        curl_close($handle);
199
200        return true;
201    }
202
203    private function addDelays()
204    {
205        $currentTime = microtime(true);
206
207        foreach ($this->delays as $id => $delay) {
208            if ($currentTime >= $delay) {
209                unset($this->delays[$id]);
210                curl_multi_add_handle(
211                    $this->_mh,
212                    $this->handles[$id]['handle']
213                );
214            }
215        }
216    }
217
218    private function processMessages()
219    {
220        while ($done = curl_multi_info_read($this->_mh)) {
221            $id = (int) $done['handle'];
222
223            if (!isset($this->handles[$id])) {
224                // Probably was cancelled.
225                continue;
226            }
227
228            $entry = $this->handles[$id];
229            $entry['response']['transfer_stats'] = curl_getinfo($done['handle']);
230
231            if ($done['result'] !== CURLM_OK) {
232                $entry['response']['curl']['errno'] = $done['result'];
233                $entry['response']['curl']['error'] = curl_error($done['handle']);
234            }
235
236            $result = CurlFactory::createResponse(
237                $this,
238                $entry['request'],
239                $entry['response'],
240                $entry['headers'],
241                $entry['body']
242            );
243
244            $this->removeProcessed($id);
245            $entry['deferred']->resolve($result);
246        }
247    }
248}
249