1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis\Connection; 13 14use Predis\Command\CommandInterface; 15use Predis\NotSupportedException; 16use Predis\Response\Error as ErrorResponse; 17use Predis\Response\ErrorInterface as ErrorResponseInterface; 18use Predis\Response\Status as StatusResponse; 19 20/** 21 * This class provides the implementation of a Predis connection that uses the 22 * PHP socket extension for network communication and wraps the phpiredis C 23 * extension (PHP bindings for hiredis) to parse the Redis protocol. 24 * 25 * This class is intended to provide an optional low-overhead alternative for 26 * processing responses from Redis compared to the standard pure-PHP classes. 27 * Differences in speed when dealing with short inline responses are practically 28 * nonexistent, the actual speed boost is for big multibulk responses when this 29 * protocol processor can parse and return responses very fast. 30 * 31 * For instructions on how to build and install the phpiredis extension, please 32 * consult the repository of the project. 33 * 34 * The connection parameters supported by this class are: 35 * 36 * - scheme: it can be either 'redis', 'tcp' or 'unix'. 37 * - host: hostname or IP address of the server. 38 * - port: TCP port of the server. 39 * - path: path of a UNIX domain socket when scheme is 'unix'. 40 * - timeout: timeout to perform the connection (default is 5 seconds). 41 * - read_write_timeout: timeout of read / write operations. 42 * 43 * @link http://github.com/nrk/phpiredis 44 * 45 * @author Daniele Alessandri <suppakilla@gmail.com> 46 */ 47class PhpiredisSocketConnection extends AbstractConnection 48{ 49 private $reader; 50 51 /** 52 * {@inheritdoc} 53 */ 54 public function __construct(ParametersInterface $parameters) 55 { 56 $this->assertExtensions(); 57 58 parent::__construct($parameters); 59 60 $this->reader = $this->createReader(); 61 } 62 63 /** 64 * Disconnects from the server and destroys the underlying resource and the 65 * protocol reader resource when PHP's garbage collector kicks in. 66 */ 67 public function __destruct() 68 { 69 phpiredis_reader_destroy($this->reader); 70 71 parent::__destruct(); 72 } 73 74 /** 75 * Checks if the socket and phpiredis extensions are loaded in PHP. 76 */ 77 protected function assertExtensions() 78 { 79 if (!extension_loaded('sockets')) { 80 throw new NotSupportedException( 81 'The "sockets" extension is required by this connection backend.' 82 ); 83 } 84 85 if (!extension_loaded('phpiredis')) { 86 throw new NotSupportedException( 87 'The "phpiredis" extension is required by this connection backend.' 88 ); 89 } 90 } 91 92 /** 93 * {@inheritdoc} 94 */ 95 protected function assertParameters(ParametersInterface $parameters) 96 { 97 switch ($parameters->scheme) { 98 case 'tcp': 99 case 'redis': 100 case 'unix': 101 break; 102 103 default: 104 throw new \InvalidArgumentException("Invalid scheme: '$parameters->scheme'."); 105 } 106 107 if (isset($parameters->persistent)) { 108 throw new NotSupportedException( 109 'Persistent connections are not supported by this connection backend.' 110 ); 111 } 112 113 return $parameters; 114 } 115 116 /** 117 * Creates a new instance of the protocol reader resource. 118 * 119 * @return resource 120 */ 121 private function createReader() 122 { 123 $reader = phpiredis_reader_create(); 124 125 phpiredis_reader_set_status_handler($reader, $this->getStatusHandler()); 126 phpiredis_reader_set_error_handler($reader, $this->getErrorHandler()); 127 128 return $reader; 129 } 130 131 /** 132 * Returns the underlying protocol reader resource. 133 * 134 * @return resource 135 */ 136 protected function getReader() 137 { 138 return $this->reader; 139 } 140 141 /** 142 * Returns the handler used by the protocol reader for inline responses. 143 * 144 * @return \Closure 145 */ 146 protected function getStatusHandler() 147 { 148 static $statusHandler; 149 150 if (!$statusHandler) { 151 $statusHandler = function ($payload) { 152 return StatusResponse::get($payload); 153 }; 154 } 155 156 return $statusHandler; 157 } 158 159 /** 160 * Returns the handler used by the protocol reader for error responses. 161 * 162 * @return \Closure 163 */ 164 protected function getErrorHandler() 165 { 166 static $errorHandler; 167 168 if (!$errorHandler) { 169 $errorHandler = function ($errorMessage) { 170 return new ErrorResponse($errorMessage); 171 }; 172 } 173 174 return $errorHandler; 175 } 176 177 /** 178 * Helper method used to throw exceptions on socket errors. 179 */ 180 private function emitSocketError() 181 { 182 $errno = socket_last_error(); 183 $errstr = socket_strerror($errno); 184 185 $this->disconnect(); 186 187 $this->onConnectionError(trim($errstr), $errno); 188 } 189 190 /** 191 * Gets the address of an host from connection parameters. 192 * 193 * @param ParametersInterface $parameters Parameters used to initialize the connection. 194 * 195 * @return string 196 */ 197 protected static function getAddress(ParametersInterface $parameters) 198 { 199 if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) { 200 return $host; 201 } 202 203 if ($host === $address = gethostbyname($host)) { 204 return false; 205 } 206 207 return $address; 208 } 209 210 /** 211 * {@inheritdoc} 212 */ 213 protected function createResource() 214 { 215 $parameters = $this->parameters; 216 217 if ($parameters->scheme === 'unix') { 218 $address = $parameters->path; 219 $domain = AF_UNIX; 220 $protocol = 0; 221 } else { 222 if (false === $address = self::getAddress($parameters)) { 223 $this->onConnectionError("Cannot resolve the address of '$parameters->host'."); 224 } 225 226 $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET; 227 $protocol = SOL_TCP; 228 } 229 230 $socket = @socket_create($domain, SOCK_STREAM, $protocol); 231 232 if (!is_resource($socket)) { 233 $this->emitSocketError(); 234 } 235 236 $this->setSocketOptions($socket, $parameters); 237 $this->connectWithTimeout($socket, $address, $parameters); 238 239 return $socket; 240 } 241 242 /** 243 * Sets options on the socket resource from the connection parameters. 244 * 245 * @param resource $socket Socket resource. 246 * @param ParametersInterface $parameters Parameters used to initialize the connection. 247 */ 248 private function setSocketOptions($socket, ParametersInterface $parameters) 249 { 250 if ($parameters->scheme !== 'unix') { 251 if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) { 252 $this->emitSocketError(); 253 } 254 255 if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) { 256 $this->emitSocketError(); 257 } 258 } 259 260 if (isset($parameters->read_write_timeout)) { 261 $rwtimeout = (float) $parameters->read_write_timeout; 262 $timeoutSec = floor($rwtimeout); 263 $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000; 264 265 $timeout = array( 266 'sec' => $timeoutSec, 267 'usec' => $timeoutUsec, 268 ); 269 270 if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) { 271 $this->emitSocketError(); 272 } 273 274 if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) { 275 $this->emitSocketError(); 276 } 277 } 278 } 279 280 /** 281 * Opens the actual connection to the server with a timeout. 282 * 283 * @param resource $socket Socket resource. 284 * @param string $address IP address (DNS-resolved from hostname) 285 * @param ParametersInterface $parameters Parameters used to initialize the connection. 286 * 287 * @return string 288 */ 289 private function connectWithTimeout($socket, $address, ParametersInterface $parameters) 290 { 291 socket_set_nonblock($socket); 292 293 if (@socket_connect($socket, $address, (int) $parameters->port) === false) { 294 $error = socket_last_error(); 295 296 if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) { 297 $this->emitSocketError(); 298 } 299 } 300 301 socket_set_block($socket); 302 303 $null = null; 304 $selectable = array($socket); 305 306 $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0); 307 $timeoutSecs = floor($timeout); 308 $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000; 309 310 $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs); 311 312 if ($selected === 2) { 313 $this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED); 314 } 315 316 if ($selected === 0) { 317 $this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT); 318 } 319 320 if ($selected === false) { 321 $this->emitSocketError(); 322 } 323 } 324 325 /** 326 * {@inheritdoc} 327 */ 328 public function connect() 329 { 330 if (parent::connect() && $this->initCommands) { 331 foreach ($this->initCommands as $command) { 332 $response = $this->executeCommand($command); 333 334 if ($response instanceof ErrorResponseInterface) { 335 $this->onConnectionError("`{$command->getId()}` failed: $response", 0); 336 } 337 } 338 } 339 } 340 341 /** 342 * {@inheritdoc} 343 */ 344 public function disconnect() 345 { 346 if ($this->isConnected()) { 347 socket_close($this->getResource()); 348 parent::disconnect(); 349 } 350 } 351 352 /** 353 * {@inheritdoc} 354 */ 355 protected function write($buffer) 356 { 357 $socket = $this->getResource(); 358 359 while (($length = strlen($buffer)) > 0) { 360 $written = socket_write($socket, $buffer, $length); 361 362 if ($length === $written) { 363 return; 364 } 365 366 if ($written === false) { 367 $this->onConnectionError('Error while writing bytes to the server.'); 368 } 369 370 $buffer = substr($buffer, $written); 371 } 372 } 373 374 /** 375 * {@inheritdoc} 376 */ 377 public function read() 378 { 379 $socket = $this->getResource(); 380 $reader = $this->reader; 381 382 while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) { 383 if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) { 384 $this->emitSocketError(); 385 } 386 387 phpiredis_reader_feed($reader, $buffer); 388 } 389 390 if ($state === PHPIREDIS_READER_STATE_COMPLETE) { 391 return phpiredis_reader_get_reply($reader); 392 } else { 393 $this->onProtocolError(phpiredis_reader_get_error($reader)); 394 395 return; 396 } 397 } 398 399 /** 400 * {@inheritdoc} 401 */ 402 public function writeRequest(CommandInterface $command) 403 { 404 $arguments = $command->getArguments(); 405 array_unshift($arguments, $command->getId()); 406 407 $this->write(phpiredis_format_command($arguments)); 408 } 409 410 /** 411 * {@inheritdoc} 412 */ 413 public function __wakeup() 414 { 415 $this->assertExtensions(); 416 $this->reader = $this->createReader(); 417 } 418} 419