1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis\Connection;
13
14use Predis\Command\CommandInterface;
15use Predis\NotSupportedException;
16use Predis\Response\Error as ErrorResponse;
17use Predis\Response\ErrorInterface as ErrorResponseInterface;
18use Predis\Response\Status as StatusResponse;
19
20/**
21 * This class provides the implementation of a Predis connection that uses the
22 * PHP socket extension for network communication and wraps the phpiredis C
23 * extension (PHP bindings for hiredis) to parse the Redis protocol.
24 *
25 * This class is intended to provide an optional low-overhead alternative for
26 * processing responses from Redis compared to the standard pure-PHP classes.
27 * Differences in speed when dealing with short inline responses are practically
28 * nonexistent, the actual speed boost is for big multibulk responses when this
29 * protocol processor can parse and return responses very fast.
30 *
31 * For instructions on how to build and install the phpiredis extension, please
32 * consult the repository of the project.
33 *
34 * The connection parameters supported by this class are:
35 *
36 *  - scheme: it can be either 'redis', 'tcp' or 'unix'.
37 *  - host: hostname or IP address of the server.
38 *  - port: TCP port of the server.
39 *  - path: path of a UNIX domain socket when scheme is 'unix'.
40 *  - timeout: timeout to perform the connection (default is 5 seconds).
41 *  - read_write_timeout: timeout of read / write operations.
42 *
43 * @link http://github.com/nrk/phpiredis
44 *
45 * @author Daniele Alessandri <suppakilla@gmail.com>
46 */
47class PhpiredisSocketConnection extends AbstractConnection
48{
49    private $reader;
50
51    /**
52     * {@inheritdoc}
53     */
54    public function __construct(ParametersInterface $parameters)
55    {
56        $this->assertExtensions();
57
58        parent::__construct($parameters);
59
60        $this->reader = $this->createReader();
61    }
62
63    /**
64     * Disconnects from the server and destroys the underlying resource and the
65     * protocol reader resource when PHP's garbage collector kicks in.
66     */
67    public function __destruct()
68    {
69        phpiredis_reader_destroy($this->reader);
70
71        parent::__destruct();
72    }
73
74    /**
75     * Checks if the socket and phpiredis extensions are loaded in PHP.
76     */
77    protected function assertExtensions()
78    {
79        if (!extension_loaded('sockets')) {
80            throw new NotSupportedException(
81                'The "sockets" extension is required by this connection backend.'
82            );
83        }
84
85        if (!extension_loaded('phpiredis')) {
86            throw new NotSupportedException(
87                'The "phpiredis" extension is required by this connection backend.'
88            );
89        }
90    }
91
92    /**
93     * {@inheritdoc}
94     */
95    protected function assertParameters(ParametersInterface $parameters)
96    {
97        switch ($parameters->scheme) {
98            case 'tcp':
99            case 'redis':
100            case 'unix':
101                break;
102
103            default:
104                throw new \InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
105        }
106
107        if (isset($parameters->persistent)) {
108            throw new NotSupportedException(
109                'Persistent connections are not supported by this connection backend.'
110            );
111        }
112
113        return $parameters;
114    }
115
116    /**
117     * Creates a new instance of the protocol reader resource.
118     *
119     * @return resource
120     */
121    private function createReader()
122    {
123        $reader = phpiredis_reader_create();
124
125        phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
126        phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
127
128        return $reader;
129    }
130
131    /**
132     * Returns the underlying protocol reader resource.
133     *
134     * @return resource
135     */
136    protected function getReader()
137    {
138        return $this->reader;
139    }
140
141    /**
142     * Returns the handler used by the protocol reader for inline responses.
143     *
144     * @return \Closure
145     */
146    protected function getStatusHandler()
147    {
148        static $statusHandler;
149
150        if (!$statusHandler) {
151            $statusHandler = function ($payload) {
152                return StatusResponse::get($payload);
153            };
154        }
155
156        return $statusHandler;
157    }
158
159    /**
160     * Returns the handler used by the protocol reader for error responses.
161     *
162     * @return \Closure
163     */
164    protected function getErrorHandler()
165    {
166        static $errorHandler;
167
168        if (!$errorHandler) {
169            $errorHandler = function ($errorMessage) {
170                return new ErrorResponse($errorMessage);
171            };
172        }
173
174        return $errorHandler;
175    }
176
177    /**
178     * Helper method used to throw exceptions on socket errors.
179     */
180    private function emitSocketError()
181    {
182        $errno = socket_last_error();
183        $errstr = socket_strerror($errno);
184
185        $this->disconnect();
186
187        $this->onConnectionError(trim($errstr), $errno);
188    }
189
190    /**
191     * Gets the address of an host from connection parameters.
192     *
193     * @param ParametersInterface $parameters Parameters used to initialize the connection.
194     *
195     * @return string
196     */
197    protected static function getAddress(ParametersInterface $parameters)
198    {
199        if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
200            return $host;
201        }
202
203        if ($host === $address = gethostbyname($host)) {
204            return false;
205        }
206
207        return $address;
208    }
209
210    /**
211     * {@inheritdoc}
212     */
213    protected function createResource()
214    {
215        $parameters = $this->parameters;
216
217        if ($parameters->scheme === 'unix') {
218            $address = $parameters->path;
219            $domain = AF_UNIX;
220            $protocol = 0;
221        } else {
222            if (false === $address = self::getAddress($parameters)) {
223                $this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
224            }
225
226            $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
227            $protocol = SOL_TCP;
228        }
229
230        $socket = @socket_create($domain, SOCK_STREAM, $protocol);
231
232        if (!is_resource($socket)) {
233            $this->emitSocketError();
234        }
235
236        $this->setSocketOptions($socket, $parameters);
237        $this->connectWithTimeout($socket, $address, $parameters);
238
239        return $socket;
240    }
241
242    /**
243     * Sets options on the socket resource from the connection parameters.
244     *
245     * @param resource            $socket     Socket resource.
246     * @param ParametersInterface $parameters Parameters used to initialize the connection.
247     */
248    private function setSocketOptions($socket, ParametersInterface $parameters)
249    {
250        if ($parameters->scheme !== 'unix') {
251            if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
252                $this->emitSocketError();
253            }
254
255            if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
256                $this->emitSocketError();
257            }
258        }
259
260        if (isset($parameters->read_write_timeout)) {
261            $rwtimeout = (float) $parameters->read_write_timeout;
262            $timeoutSec = floor($rwtimeout);
263            $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
264
265            $timeout = array(
266                'sec' => $timeoutSec,
267                'usec' => $timeoutUsec,
268            );
269
270            if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
271                $this->emitSocketError();
272            }
273
274            if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
275                $this->emitSocketError();
276            }
277        }
278    }
279
280    /**
281     * Opens the actual connection to the server with a timeout.
282     *
283     * @param resource            $socket     Socket resource.
284     * @param string              $address    IP address (DNS-resolved from hostname)
285     * @param ParametersInterface $parameters Parameters used to initialize the connection.
286     *
287     * @return string
288     */
289    private function connectWithTimeout($socket, $address, ParametersInterface $parameters)
290    {
291        socket_set_nonblock($socket);
292
293        if (@socket_connect($socket, $address, (int) $parameters->port) === false) {
294            $error = socket_last_error();
295
296            if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
297                $this->emitSocketError();
298            }
299        }
300
301        socket_set_block($socket);
302
303        $null = null;
304        $selectable = array($socket);
305
306        $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
307        $timeoutSecs = floor($timeout);
308        $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
309
310        $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
311
312        if ($selected === 2) {
313            $this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED);
314        }
315
316        if ($selected === 0) {
317            $this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT);
318        }
319
320        if ($selected === false) {
321            $this->emitSocketError();
322        }
323    }
324
325    /**
326     * {@inheritdoc}
327     */
328    public function connect()
329    {
330        if (parent::connect() && $this->initCommands) {
331            foreach ($this->initCommands as $command) {
332                $response = $this->executeCommand($command);
333
334                if ($response instanceof ErrorResponseInterface) {
335                    $this->onConnectionError("`{$command->getId()}` failed: $response", 0);
336                }
337            }
338        }
339    }
340
341    /**
342     * {@inheritdoc}
343     */
344    public function disconnect()
345    {
346        if ($this->isConnected()) {
347            socket_close($this->getResource());
348            parent::disconnect();
349        }
350    }
351
352    /**
353     * {@inheritdoc}
354     */
355    protected function write($buffer)
356    {
357        $socket = $this->getResource();
358
359        while (($length = strlen($buffer)) > 0) {
360            $written = socket_write($socket, $buffer, $length);
361
362            if ($length === $written) {
363                return;
364            }
365
366            if ($written === false) {
367                $this->onConnectionError('Error while writing bytes to the server.');
368            }
369
370            $buffer = substr($buffer, $written);
371        }
372    }
373
374    /**
375     * {@inheritdoc}
376     */
377    public function read()
378    {
379        $socket = $this->getResource();
380        $reader = $this->reader;
381
382        while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
383            if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) {
384                $this->emitSocketError();
385            }
386
387            phpiredis_reader_feed($reader, $buffer);
388        }
389
390        if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
391            return phpiredis_reader_get_reply($reader);
392        } else {
393            $this->onProtocolError(phpiredis_reader_get_error($reader));
394
395            return;
396        }
397    }
398
399    /**
400     * {@inheritdoc}
401     */
402    public function writeRequest(CommandInterface $command)
403    {
404        $arguments = $command->getArguments();
405        array_unshift($arguments, $command->getId());
406
407        $this->write(phpiredis_format_command($arguments));
408    }
409
410    /**
411     * {@inheritdoc}
412     */
413    public function __wakeup()
414    {
415        $this->assertExtensions();
416        $this->reader = $this->createReader();
417    }
418}
419