1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis\Connection; 13 14use Predis\Command\CommandInterface; 15use Predis\NotSupportedException; 16use Predis\Response\Error as ErrorResponse; 17use Predis\Response\Status as StatusResponse; 18 19/** 20 * This class provides the implementation of a Predis connection that uses the 21 * PHP socket extension for network communication and wraps the phpiredis C 22 * extension (PHP bindings for hiredis) to parse the Redis protocol. 23 * 24 * This class is intended to provide an optional low-overhead alternative for 25 * processing responses from Redis compared to the standard pure-PHP classes. 26 * Differences in speed when dealing with short inline responses are practically 27 * nonexistent, the actual speed boost is for big multibulk responses when this 28 * protocol processor can parse and return responses very fast. 29 * 30 * For instructions on how to build and install the phpiredis extension, please 31 * consult the repository of the project. 32 * 33 * The connection parameters supported by this class are: 34 * 35 * - scheme: it can be either 'redis', 'tcp' or 'unix'. 36 * - host: hostname or IP address of the server. 37 * - port: TCP port of the server. 38 * - path: path of a UNIX domain socket when scheme is 'unix'. 39 * - timeout: timeout to perform the connection. 40 * - read_write_timeout: timeout of read / write operations. 41 * 42 * @link http://github.com/nrk/phpiredis 43 * 44 * @author Daniele Alessandri <suppakilla@gmail.com> 45 */ 46class PhpiredisSocketConnection extends AbstractConnection 47{ 48 private $reader; 49 50 /** 51 * {@inheritdoc} 52 */ 53 public function __construct(ParametersInterface $parameters) 54 { 55 $this->assertExtensions(); 56 57 parent::__construct($parameters); 58 59 $this->reader = $this->createReader(); 60 } 61 62 /** 63 * Disconnects from the server and destroys the underlying resource and the 64 * protocol reader resource when PHP's garbage collector kicks in. 65 */ 66 public function __destruct() 67 { 68 phpiredis_reader_destroy($this->reader); 69 70 parent::__destruct(); 71 } 72 73 /** 74 * Checks if the socket and phpiredis extensions are loaded in PHP. 75 */ 76 protected function assertExtensions() 77 { 78 if (!extension_loaded('sockets')) { 79 throw new NotSupportedException( 80 'The "sockets" extension is required by this connection backend.' 81 ); 82 } 83 84 if (!extension_loaded('phpiredis')) { 85 throw new NotSupportedException( 86 'The "phpiredis" extension is required by this connection backend.' 87 ); 88 } 89 } 90 91 /** 92 * {@inheritdoc} 93 */ 94 protected function assertParameters(ParametersInterface $parameters) 95 { 96 parent::assertParameters($parameters); 97 98 if (isset($parameters->persistent)) { 99 throw new NotSupportedException( 100 'Persistent connections are not supported by this connection backend.' 101 ); 102 } 103 104 return $parameters; 105 } 106 107 /** 108 * Creates a new instance of the protocol reader resource. 109 * 110 * @return resource 111 */ 112 private function createReader() 113 { 114 $reader = phpiredis_reader_create(); 115 116 phpiredis_reader_set_status_handler($reader, $this->getStatusHandler()); 117 phpiredis_reader_set_error_handler($reader, $this->getErrorHandler()); 118 119 return $reader; 120 } 121 122 /** 123 * Returns the underlying protocol reader resource. 124 * 125 * @return resource 126 */ 127 protected function getReader() 128 { 129 return $this->reader; 130 } 131 132 /** 133 * Returns the handler used by the protocol reader for inline responses. 134 * 135 * @return \Closure 136 */ 137 private function getStatusHandler() 138 { 139 return function ($payload) { 140 return StatusResponse::get($payload); 141 }; 142 } 143 144 /** 145 * Returns the handler used by the protocol reader for error responses. 146 * 147 * @return \Closure 148 */ 149 protected function getErrorHandler() 150 { 151 return function ($payload) { 152 return new ErrorResponse($payload); 153 }; 154 } 155 156 /** 157 * Helper method used to throw exceptions on socket errors. 158 */ 159 private function emitSocketError() 160 { 161 $errno = socket_last_error(); 162 $errstr = socket_strerror($errno); 163 164 $this->disconnect(); 165 166 $this->onConnectionError(trim($errstr), $errno); 167 } 168 169 /** 170 * Gets the address of an host from connection parameters. 171 * 172 * @param ParametersInterface $parameters Parameters used to initialize the connection. 173 * 174 * @return string 175 */ 176 protected static function getAddress(ParametersInterface $parameters) 177 { 178 if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) { 179 return $host; 180 } 181 182 if ($host === $address = gethostbyname($host)) { 183 return false; 184 } 185 186 return $address; 187 } 188 189 /** 190 * {@inheritdoc} 191 */ 192 protected function createResource() 193 { 194 $parameters = $this->parameters; 195 196 if ($parameters->scheme === 'unix') { 197 $address = $parameters->path; 198 $domain = AF_UNIX; 199 $protocol = 0; 200 } else { 201 if (false === $address = self::getAddress($parameters)) { 202 $this->onConnectionError("Cannot resolve the address of '$parameters->host'."); 203 } 204 205 $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET; 206 $protocol = SOL_TCP; 207 } 208 209 $socket = @socket_create($domain, SOCK_STREAM, $protocol); 210 211 if (!is_resource($socket)) { 212 $this->emitSocketError(); 213 } 214 215 $this->setSocketOptions($socket, $parameters); 216 $this->connectWithTimeout($socket, $address, $parameters); 217 218 return $socket; 219 } 220 221 /** 222 * Sets options on the socket resource from the connection parameters. 223 * 224 * @param resource $socket Socket resource. 225 * @param ParametersInterface $parameters Parameters used to initialize the connection. 226 */ 227 private function setSocketOptions($socket, ParametersInterface $parameters) 228 { 229 if ($parameters->scheme !== 'unix') { 230 if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) { 231 $this->emitSocketError(); 232 } 233 234 if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) { 235 $this->emitSocketError(); 236 } 237 } 238 239 if (isset($parameters->read_write_timeout)) { 240 $rwtimeout = (float) $parameters->read_write_timeout; 241 $timeoutSec = floor($rwtimeout); 242 $timeoutUsec = ($rwtimeout - $timeoutSec) * 1000000; 243 244 $timeout = array( 245 'sec' => $timeoutSec, 246 'usec' => $timeoutUsec, 247 ); 248 249 if (!socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, $timeout)) { 250 $this->emitSocketError(); 251 } 252 253 if (!socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, $timeout)) { 254 $this->emitSocketError(); 255 } 256 } 257 } 258 259 /** 260 * Opens the actual connection to the server with a timeout. 261 * 262 * @param resource $socket Socket resource. 263 * @param string $address IP address (DNS-resolved from hostname) 264 * @param ParametersInterface $parameters Parameters used to initialize the connection. 265 * 266 * @return string 267 */ 268 private function connectWithTimeout($socket, $address, ParametersInterface $parameters) 269 { 270 socket_set_nonblock($socket); 271 272 if (@socket_connect($socket, $address, (int) $parameters->port) === false) { 273 $error = socket_last_error(); 274 275 if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) { 276 $this->emitSocketError(); 277 } 278 } 279 280 socket_set_block($socket); 281 282 $null = null; 283 $selectable = array($socket); 284 285 $timeout = (float) $parameters->timeout; 286 $timeoutSecs = floor($timeout); 287 $timeoutUSecs = ($timeout - $timeoutSecs) * 1000000; 288 289 $selected = socket_select($selectable, $selectable, $null, $timeoutSecs, $timeoutUSecs); 290 291 if ($selected === 2) { 292 $this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED); 293 } 294 295 if ($selected === 0) { 296 $this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT); 297 } 298 299 if ($selected === false) { 300 $this->emitSocketError(); 301 } 302 } 303 304 /** 305 * {@inheritdoc} 306 */ 307 public function connect() 308 { 309 if (parent::connect() && $this->initCommands) { 310 foreach ($this->initCommands as $command) { 311 $this->executeCommand($command); 312 } 313 } 314 } 315 316 /** 317 * {@inheritdoc} 318 */ 319 public function disconnect() 320 { 321 if ($this->isConnected()) { 322 socket_close($this->getResource()); 323 parent::disconnect(); 324 } 325 } 326 327 /** 328 * {@inheritdoc} 329 */ 330 protected function write($buffer) 331 { 332 $socket = $this->getResource(); 333 334 while (($length = strlen($buffer)) > 0) { 335 $written = socket_write($socket, $buffer, $length); 336 337 if ($length === $written) { 338 return; 339 } 340 341 if ($written === false) { 342 $this->onConnectionError('Error while writing bytes to the server.'); 343 } 344 345 $buffer = substr($buffer, $written); 346 } 347 } 348 349 /** 350 * {@inheritdoc} 351 */ 352 public function read() 353 { 354 $socket = $this->getResource(); 355 $reader = $this->reader; 356 357 while (PHPIREDIS_READER_STATE_INCOMPLETE === $state = phpiredis_reader_get_state($reader)) { 358 if (@socket_recv($socket, $buffer, 4096, 0) === false || $buffer === '' || $buffer === null) { 359 $this->emitSocketError(); 360 } 361 362 phpiredis_reader_feed($reader, $buffer); 363 } 364 365 if ($state === PHPIREDIS_READER_STATE_COMPLETE) { 366 return phpiredis_reader_get_reply($reader); 367 } else { 368 $this->onProtocolError(phpiredis_reader_get_error($reader)); 369 370 return; 371 } 372 } 373 374 /** 375 * {@inheritdoc} 376 */ 377 public function writeRequest(CommandInterface $command) 378 { 379 $arguments = $command->getArguments(); 380 array_unshift($arguments, $command->getId()); 381 382 $this->write(phpiredis_format_command($arguments)); 383 } 384 385 /** 386 * {@inheritdoc} 387 */ 388 public function __wakeup() 389 { 390 $this->assertExtensions(); 391 $this->reader = $this->createReader(); 392 } 393} 394