1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis\Connection;
13
14use Predis\Command\CommandInterface;
15use Predis\NotSupportedException;
16use Predis\Response\Error as ErrorResponse;
17use Predis\Response\Status as StatusResponse;
18
19/**
20 * This class provides the implementation of a Predis connection that uses the
21 * PHP socket extension for network communication and wraps the phpiredis C
22 * extension (PHP bindings for hiredis) to parse the Redis protocol.
23 *
24 * This class is intended to provide an optional low-overhead alternative for
25 * processing responses from Redis compared to the standard pure-PHP classes.
26 * Differences in speed when dealing with short inline responses are practically
27 * nonexistent, the actual speed boost is for big multibulk responses when this
28 * protocol processor can parse and return responses very fast.
29 *
30 * For instructions on how to build and install the phpiredis extension, please
31 * consult the repository of the project.
32 *
33 * The connection parameters supported by this class are:
34 *
35 *  - scheme: it can be either 'redis', 'tcp' or 'unix'.
36 *  - host: hostname or IP address of the server.
37 *  - port: TCP port of the server.
38 *  - path: path of a UNIX domain socket when scheme is 'unix'.
39 *  - timeout: timeout to perform the connection.
40 *  - read_write_timeout: timeout of read / write operations.
41 *
42 * @link http://github.com/nrk/phpiredis
43 *
44 * @author Daniele Alessandri <suppakilla@gmail.com>
45 */
46class PhpiredisSocketConnection extends AbstractConnection
47{
48    private $reader;
49
50    /**
51     * {@inheritdoc}
52     */
53    public function __construct(ParametersInterface $parameters)
54    {
55        $this->assertExtensions();
56
57        parent::__construct($parameters);
58
59        $this->reader = $this->createReader();
60    }
61
62    /**
63     * Disconnects from the server and destroys the underlying resource and the
64     * protocol reader resource when PHP's garbage collector kicks in.
65     */
66    public function __destruct()
67    {
68        phpiredis_reader_destroy($this->reader);
69
70        parent::__destruct();
71    }
72
73    /**
74     * Checks if the socket and phpiredis extensions are loaded in PHP.
75     */
76    protected function assertExtensions()
77    {
78        if (!extension_loaded('sockets')) {
79            throw new NotSupportedException(
80                'The "sockets" extension is required by this connection backend.'
81            );
82        }
83
84        if (!extension_loaded('phpiredis')) {
85            throw new NotSupportedException(
86                'The "phpiredis" extension is required by this connection backend.'
87            );
88        }
89    }
90
91    /**
92     * {@inheritdoc}
93     */
94    protected function assertParameters(ParametersInterface $parameters)
95    {
96        parent::assertParameters($parameters);
97
98        if (isset($parameters->persistent)) {
99            throw new NotSupportedException(
100                'Persistent connections are not supported by this connection backend.'
101            );
102        }
103
104        return $parameters;
105    }
106
107    /**
108     * Creates a new instance of the protocol reader resource.
109     *
110     * @return resource
111     */
112    private function createReader()
113    {
114        $reader = phpiredis_reader_create();
115
116        phpiredis_reader_set_status_handler($reader, $this->getStatusHandler());
117        phpiredis_reader_set_error_handler($reader, $this->getErrorHandler());
118
119        return $reader;
120    }
121
122    /**
123     * Returns the underlying protocol reader resource.
124     *
125     * @return resource
126     */
127    protected function getReader()
128    {
129        return $this->reader;
130    }
131
132    /**
133     * Returns the handler used by the protocol reader for inline responses.
134     *
135     * @return \Closure
136     */
137    private function getStatusHandler()
138    {
139        return function ($payload) {
140            return StatusResponse::get($payload);
141        };
142    }
143
144    /**
145     * Returns the handler used by the protocol reader for error responses.
146     *
147     * @return \Closure
148     */
149    protected function getErrorHandler()
150    {
151        return function ($payload) {
152            return new ErrorResponse($payload);
153        };
154    }
155
156    /**
157     * Helper method used to throw exceptions on socket errors.
158     */
159    private function emitSocketError()
160    {
161        $errno = socket_last_error();
162        $errstr = socket_strerror($errno);
163
164        $this->disconnect();
165
166        $this->onConnectionError(trim($errstr), $errno);
167    }
168
169    /**
170     * Gets the address of an host from connection parameters.
171     *
172     * @param ParametersInterface $parameters Parameters used to initialize the connection.
173     *
174     * @return string
175     */
176    protected static function getAddress(ParametersInterface $parameters)
177    {
178        if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
179            return $host;
180        }
181
182        if ($host === $address = gethostbyname($host)) {
183            return false;
184        }
185
186        return $address;
187    }
188
189    /**
190     * {@inheritdoc}
191     */
192    protected function createResource()
193    {
194        $parameters = $this->parameters;
195
196        if ($parameters->scheme === 'unix') {
197            $address = $parameters->path;
198            $domain = AF_UNIX;
199            $protocol = 0;
200        } else {
201            if (false === $address = self::getAddress($parameters)) {
202                $this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
203            }
204
205            $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
206            $protocol = SOL_TCP;
207        }
208
209        $socket = @socket_create($domain, SOCK_STREAM, $protocol);
210
211        if (!is_resource($socket)) {
212            $this->emitSocketError();
213        }
214
215        $this->setSocketOptions($socket, $parameters);
216        $this->connectWithTimeout($socket, $address, $parameters);
217
218        return $socket;
219    }
220
221    /**
222     * Sets options on the socket resource from the connection parameters.
223     *
224     * @param resource            $socket     Socket resource.
225     * @param ParametersInterface $parameters Parameters used to initialize the connection.
226     */
227    private function setSocketOptions($socket, ParametersInterface $parameters)
228    {
229        if ($parameters->scheme !== 'unix') {
230            if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
231                $this->emitSocketError();
232            }
233
234            if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
235                $this->emitSocketError();
236            }
237        }
238
239        if (isset($parameters->read_write_timeout)) {
240            $rwtimeout = (float) $parameters->read_write_timeout;
241            $timeoutSec = floor($rwtimeout);
242            $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000;
243
244            $timeout = array(
245                'sec' => $timeoutSec,
246                'usec' => $timeoutUsec,
247            );
248
249            if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) {
250                $this->emitSocketError();
251            }
252
253            if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) {
254                $this->emitSocketError();
255            }
256        }
257    }
258
259    /**
260     * Opens the actual connection to the server with a timeout.
261     *
262     * @param resource            $socket     Socket resource.
263     * @param string              $address    IP address (DNS-resolved from hostname)
264     * @param ParametersInterface $parameters Parameters used to initialize the connection.
265     *
266     * @return string
267     */
268    private function connectWithTimeout($socket, $address, ParametersInterface $parameters)
269    {
270        socket_set_nonblock($socket);
271
272        if (@socket_connect($socket, $address, (int) $parameters->port) === false) {
273            $error = socket_last_error();
274
275            if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
276                $this->emitSocketError();
277            }
278        }
279
280        socket_set_block($socket);
281
282        $null = null;
283        $selectable = array($socket);
284
285        $timeout = (float) $parameters->timeout;
286        $timeoutSecs = floor($timeout);
287        $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000;
288
289        $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs);
290
291        if ($selected === 2) {
292            $this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED);
293        }
294
295        if ($selected === 0) {
296            $this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT);
297        }
298
299        if ($selected === false) {
300            $this->emitSocketError();
301        }
302    }
303
304    /**
305     * {@inheritdoc}
306     */
307    public function connect()
308    {
309        if (parent::connect() && $this->initCommands) {
310            foreach ($this->initCommands as $command) {
311                $this->executeCommand($command);
312            }
313        }
314    }
315
316    /**
317     * {@inheritdoc}
318     */
319    public function disconnect()
320    {
321        if ($this->isConnected()) {
322            socket_close($this->getResource());
323            parent::disconnect();
324        }
325    }
326
327    /**
328     * {@inheritdoc}
329     */
330    protected function write($buffer)
331    {
332        $socket = $this->getResource();
333
334        while (($length = strlen($buffer)) > 0) {
335            $written = socket_write($socket, $buffer, $length);
336
337            if ($length === $written) {
338                return;
339            }
340
341            if ($written === false) {
342                $this->onConnectionError('Error while writing bytes to the server.');
343            }
344
345            $buffer = substr($buffer, $written);
346        }
347    }
348
349    /**
350     * {@inheritdoc}
351     */
352    public function read()
353    {
354        $socket = $this->getResource();
355        $reader = $this->reader;
356
357        while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) {
358            if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) {
359                $this->emitSocketError();
360            }
361
362            phpiredis_reader_feed($reader, $buffer);
363        }
364
365        if ($state === PHPIREDIS_READER_STATE_COMPLETE) {
366            return phpiredis_reader_get_reply($reader);
367        } else {
368            $this->onProtocolError(phpiredis_reader_get_error($reader));
369
370            return;
371        }
372    }
373
374    /**
375     * {@inheritdoc}
376     */
377    public function writeRequest(CommandInterface $command)
378    {
379        $arguments = $command->getArguments();
380        array_unshift($arguments, $command->getId());
381
382        $this->write(phpiredis_format_command($arguments));
383    }
384
385    /**
386     * {@inheritdoc}
387     */
388    public function __wakeup()
389    {
390        $this->assertExtensions();
391        $this->reader = $this->createReader();
392    }
393}
394