1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis; 13 14use Predis\Command\CommandInterface; 15use Predis\Command\RawCommand; 16use Predis\Command\ScriptCommand; 17use Predis\Configuration\Options; 18use Predis\Configuration\OptionsInterface; 19use Predis\Connection\AggregateConnectionInterface; 20use Predis\Connection\ConnectionInterface; 21use Predis\Connection\ParametersInterface; 22use Predis\Monitor\Consumer as MonitorConsumer; 23use Predis\Pipeline\Pipeline; 24use Predis\PubSub\Consumer as PubSubConsumer; 25use Predis\Response\ErrorInterface as ErrorResponseInterface; 26use Predis\Response\ResponseInterface; 27use Predis\Response\ServerException; 28use Predis\Transaction\MultiExec as MultiExecTransaction; 29 30/** 31 * Client class used for connecting and executing commands on Redis. 32 * 33 * This is the main high-level abstraction of Predis upon which various other 34 * abstractions are built. Internally it aggregates various other classes each 35 * one with its own responsibility and scope. 36 * 37 * {@inheritdoc} 38 * 39 * @author Daniele Alessandri <suppakilla@gmail.com> 40 */ 41class Client implements ClientInterface 42{ 43 const VERSION = '1.0.3'; 44 45 protected $connection; 46 protected $options; 47 private $profile; 48 49 /** 50 * @param mixed $parameters Connection parameters for one or more servers. 51 * @param mixed $options Options to configure some behaviours of the client. 52 */ 53 public function __construct($parameters = null, $options = null) 54 { 55 $this->options = $this->createOptions($options ?: array()); 56 $this->connection = $this->createConnection($parameters ?: array()); 57 $this->profile = $this->options->profile; 58 } 59 60 /** 61 * Creates a new instance of Predis\Configuration\Options from different 62 * types of arguments or simply returns the passed argument if it is an 63 * instance of Predis\Configuration\OptionsInterface. 64 * 65 * @param mixed $options Client options. 66 * 67 * @throws \InvalidArgumentException 68 * 69 * @return OptionsInterface 70 */ 71 protected function createOptions($options) 72 { 73 if (is_array($options)) { 74 return new Options($options); 75 } 76 77 if ($options instanceof OptionsInterface) { 78 return $options; 79 } 80 81 throw new \InvalidArgumentException('Invalid type for client options.'); 82 } 83 84 /** 85 * Creates single or aggregate connections from different types of arguments 86 * (string, array) or returns the passed argument if it is an instance of a 87 * class implementing Predis\Connection\ConnectionInterface. 88 * 89 * Accepted types for connection parameters are: 90 * 91 * - Instance of Predis\Connection\ConnectionInterface. 92 * - Instance of Predis\Connection\ParametersInterface. 93 * - Array 94 * - String 95 * - Callable 96 * 97 * @param mixed $parameters Connection parameters or connection instance. 98 * 99 * @throws \InvalidArgumentException 100 * 101 * @return ConnectionInterface 102 */ 103 protected function createConnection($parameters) 104 { 105 if ($parameters instanceof ConnectionInterface) { 106 return $parameters; 107 } 108 109 if ($parameters instanceof ParametersInterface || is_string($parameters)) { 110 return $this->options->connections->create($parameters); 111 } 112 113 if (is_array($parameters)) { 114 if (!isset($parameters[0])) { 115 return $this->options->connections->create($parameters); 116 } 117 118 $options = $this->options; 119 120 if ($options->defined('aggregate')) { 121 $initializer = $this->getConnectionInitializerWrapper($options->aggregate); 122 $connection = $initializer($parameters, $options); 123 } else { 124 if ($options->defined('replication') && $replication = $options->replication) { 125 $connection = $replication; 126 } else { 127 $connection = $options->cluster; 128 } 129 130 $options->connections->aggregate($connection, $parameters); 131 } 132 133 return $connection; 134 } 135 136 if (is_callable($parameters)) { 137 $initializer = $this->getConnectionInitializerWrapper($parameters); 138 $connection = $initializer($this->options); 139 140 return $connection; 141 } 142 143 throw new \InvalidArgumentException('Invalid type for connection parameters.'); 144 } 145 146 /** 147 * Wraps a callable to make sure that its returned value represents a valid 148 * connection type. 149 * 150 * @param mixed $callable 151 * 152 * @return \Closure 153 */ 154 protected function getConnectionInitializerWrapper($callable) 155 { 156 return function () use ($callable) { 157 $connection = call_user_func_array($callable, func_get_args()); 158 159 if (!$connection instanceof ConnectionInterface) { 160 throw new \UnexpectedValueException( 161 'The callable connection initializer returned an invalid type.' 162 ); 163 } 164 165 return $connection; 166 }; 167 } 168 169 /** 170 * {@inheritdoc} 171 */ 172 public function getProfile() 173 { 174 return $this->profile; 175 } 176 177 /** 178 * {@inheritdoc} 179 */ 180 public function getOptions() 181 { 182 return $this->options; 183 } 184 185 /** 186 * Creates a new client instance for the specified connection ID or alias, 187 * only when working with an aggregate connection (cluster, replication). 188 * The new client instances uses the same options of the original one. 189 * 190 * @param string $connectionID Identifier of a connection. 191 * 192 * @throws \InvalidArgumentException 193 * 194 * @return Client 195 */ 196 public function getClientFor($connectionID) 197 { 198 if (!$connection = $this->getConnectionById($connectionID)) { 199 throw new \InvalidArgumentException("Invalid connection ID: $connectionID."); 200 } 201 202 return new static($connection, $this->options); 203 } 204 205 /** 206 * Opens the underlying connection and connects to the server. 207 */ 208 public function connect() 209 { 210 $this->connection->connect(); 211 } 212 213 /** 214 * Closes the underlying connection and disconnects from the server. 215 */ 216 public function disconnect() 217 { 218 $this->connection->disconnect(); 219 } 220 221 /** 222 * Closes the underlying connection and disconnects from the server. 223 * 224 * This is the same as `Client::disconnect()` as it does not actually send 225 * the `QUIT` command to Redis, but simply closes the connection. 226 */ 227 public function quit() 228 { 229 $this->disconnect(); 230 } 231 232 /** 233 * Returns the current state of the underlying connection. 234 * 235 * @return bool 236 */ 237 public function isConnected() 238 { 239 return $this->connection->isConnected(); 240 } 241 242 /** 243 * {@inheritdoc} 244 */ 245 public function getConnection() 246 { 247 return $this->connection; 248 } 249 250 /** 251 * Retrieves the specified connection from the aggregate connection when the 252 * client is in cluster or replication mode. 253 * 254 * @param string $connectionID Index or alias of the single connection. 255 * 256 * @throws NotSupportedException 257 * 258 * @return Connection\NodeConnectionInterface 259 */ 260 public function getConnectionById($connectionID) 261 { 262 if (!$this->connection instanceof AggregateConnectionInterface) { 263 throw new NotSupportedException( 264 'Retrieving connections by ID is supported only by aggregate connections.' 265 ); 266 } 267 268 return $this->connection->getConnectionById($connectionID); 269 } 270 271 /** 272 * Executes a command without filtering its arguments, parsing the response, 273 * applying any prefix to keys or throwing exceptions on Redis errors even 274 * regardless of client options. 275 * 276 * It is possibile to indentify Redis error responses from normal responses 277 * using the second optional argument which is populated by reference. 278 * 279 * @param array $arguments Command arguments as defined by the command signature. 280 * @param bool $error Set to TRUE when Redis returned an error response. 281 * 282 * @return mixed 283 */ 284 public function executeRaw(array $arguments, &$error = null) 285 { 286 $error = false; 287 288 $response = $this->connection->executeCommand( 289 new RawCommand($arguments) 290 ); 291 292 if ($response instanceof ResponseInterface) { 293 if ($response instanceof ErrorResponseInterface) { 294 $error = true; 295 } 296 297 return (string) $response; 298 } 299 300 return $response; 301 } 302 303 /** 304 * {@inheritdoc} 305 */ 306 public function __call($commandID, $arguments) 307 { 308 return $this->executeCommand( 309 $this->createCommand($commandID, $arguments) 310 ); 311 } 312 313 /** 314 * {@inheritdoc} 315 */ 316 public function createCommand($commandID, $arguments = array()) 317 { 318 return $this->profile->createCommand($commandID, $arguments); 319 } 320 321 /** 322 * {@inheritdoc} 323 */ 324 public function executeCommand(CommandInterface $command) 325 { 326 $response = $this->connection->executeCommand($command); 327 328 if ($response instanceof ResponseInterface) { 329 if ($response instanceof ErrorResponseInterface) { 330 $response = $this->onErrorResponse($command, $response); 331 } 332 333 return $response; 334 } 335 336 return $command->parseResponse($response); 337 } 338 339 /** 340 * Handles -ERR responses returned by Redis. 341 * 342 * @param CommandInterface $command Redis command that generated the error. 343 * @param ErrorResponseInterface $response Instance of the error response. 344 * 345 * @throws ServerException 346 * 347 * @return mixed 348 */ 349 protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $response) 350 { 351 if ($command instanceof ScriptCommand && $response->getErrorType() === 'NOSCRIPT') { 352 $eval = $this->createCommand('EVAL'); 353 $eval->setRawArguments($command->getEvalArguments()); 354 355 $response = $this->executeCommand($eval); 356 357 if (!$response instanceof ResponseInterface) { 358 $response = $command->parseResponse($response); 359 } 360 361 return $response; 362 } 363 364 if ($this->options->exceptions) { 365 throw new ServerException($response->getMessage()); 366 } 367 368 return $response; 369 } 370 371 /** 372 * Executes the specified initializer method on `$this` by adjusting the 373 * actual invokation depending on the arity (0, 1 or 2 arguments). This is 374 * simply an utility method to create Redis contexts instances since they 375 * follow a common initialization path. 376 * 377 * @param string $initializer Method name. 378 * @param array $argv Arguments for the method. 379 * 380 * @return mixed 381 */ 382 private function sharedContextFactory($initializer, $argv = null) 383 { 384 switch (count($argv)) { 385 case 0: 386 return $this->$initializer(); 387 388 case 1: 389 return is_array($argv[0]) 390 ? $this->$initializer($argv[0]) 391 : $this->$initializer(null, $argv[0]); 392 393 case 2: 394 list($arg0, $arg1) = $argv; 395 396 return $this->$initializer($arg0, $arg1); 397 398 default: 399 return $this->$initializer($this, $argv); 400 } 401 } 402 403 /** 404 * Creates a new pipeline context and returns it, or returns the results of 405 * a pipeline executed inside the optionally provided callable object. 406 * 407 * @param mixed ... Array of options, a callable for execution, or both. 408 * 409 * @return Pipeline|array 410 */ 411 public function pipeline(/* arguments */) 412 { 413 return $this->sharedContextFactory('createPipeline', func_get_args()); 414 } 415 416 /** 417 * Actual pipeline context initializer method. 418 * 419 * @param array $options Options for the context. 420 * @param mixed $callable Optional callable used to execute the context. 421 * 422 * @return Pipeline|array 423 */ 424 protected function createPipeline(array $options = null, $callable = null) 425 { 426 if (isset($options['atomic']) && $options['atomic']) { 427 $class = 'Predis\Pipeline\Atomic'; 428 } elseif (isset($options['fire-and-forget']) && $options['fire-and-forget']) { 429 $class = 'Predis\Pipeline\FireAndForget'; 430 } else { 431 $class = 'Predis\Pipeline\Pipeline'; 432 } 433 434 /* 435 * @var ClientContextInterface 436 */ 437 $pipeline = new $class($this); 438 439 if (isset($callable)) { 440 return $pipeline->execute($callable); 441 } 442 443 return $pipeline; 444 } 445 446 /** 447 * Creates a new transaction context and returns it, or returns the results 448 * of a transaction executed inside the optionally provided callable object. 449 * 450 * @param mixed ... Array of options, a callable for execution, or both. 451 * 452 * @return MultiExecTransaction|array 453 */ 454 public function transaction(/* arguments */) 455 { 456 return $this->sharedContextFactory('createTransaction', func_get_args()); 457 } 458 459 /** 460 * Actual transaction context initializer method. 461 * 462 * @param array $options Options for the context. 463 * @param mixed $callable Optional callable used to execute the context. 464 * 465 * @return MultiExecTransaction|array 466 */ 467 protected function createTransaction(array $options = null, $callable = null) 468 { 469 $transaction = new MultiExecTransaction($this, $options); 470 471 if (isset($callable)) { 472 return $transaction->execute($callable); 473 } 474 475 return $transaction; 476 } 477 478 /** 479 * Creates a new publis/subscribe context and returns it, or starts its loop 480 * inside the optionally provided callable object. 481 * 482 * @param mixed ... Array of options, a callable for execution, or both. 483 * 484 * @return PubSubConsumer|null 485 */ 486 public function pubSubLoop(/* arguments */) 487 { 488 return $this->sharedContextFactory('createPubSub', func_get_args()); 489 } 490 491 /** 492 * Actual publish/subscribe context initializer method. 493 * 494 * @param array $options Options for the context. 495 * @param mixed $callable Optional callable used to execute the context. 496 * 497 * @return PubSubConsumer|null 498 */ 499 protected function createPubSub(array $options = null, $callable = null) 500 { 501 $pubsub = new PubSubConsumer($this, $options); 502 503 if (!isset($callable)) { 504 return $pubsub; 505 } 506 507 foreach ($pubsub as $message) { 508 if (call_user_func($callable, $pubsub, $message) === false) { 509 $pubsub->stop(); 510 } 511 } 512 } 513 514 /** 515 * Creates a new monitor consumer and returns it. 516 * 517 * @return MonitorConsumer 518 */ 519 public function monitor() 520 { 521 return new MonitorConsumer($this); 522 } 523} 524