1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis;
13
14use Predis\Command\CommandInterface;
15use Predis\Command\RawCommand;
16use Predis\Command\ScriptCommand;
17use Predis\Configuration\Options;
18use Predis\Configuration\OptionsInterface;
19use Predis\Connection\AggregateConnectionInterface;
20use Predis\Connection\ConnectionInterface;
21use Predis\Connection\ParametersInterface;
22use Predis\Monitor\Consumer as MonitorConsumer;
23use Predis\Pipeline\Pipeline;
24use Predis\PubSub\Consumer as PubSubConsumer;
25use Predis\Response\ErrorInterface as ErrorResponseInterface;
26use Predis\Response\ResponseInterface;
27use Predis\Response\ServerException;
28use Predis\Transaction\MultiExec as MultiExecTransaction;
29
30/**
31 * Client class used for connecting and executing commands on Redis.
32 *
33 * This is the main high-level abstraction of Predis upon which various other
34 * abstractions are built. Internally it aggregates various other classes each
35 * one with its own responsibility and scope.
36 *
37 * {@inheritdoc}
38 *
39 * @author Daniele Alessandri <suppakilla@gmail.com>
40 */
41class Client implements ClientInterface
42{
43    const VERSION = '1.0.3';
44
45    protected $connection;
46    protected $options;
47    private $profile;
48
49    /**
50     * @param mixed $parameters Connection parameters for one or more servers.
51     * @param mixed $options    Options to configure some behaviours of the client.
52     */
53    public function __construct($parameters = null, $options = null)
54    {
55        $this->options = $this->createOptions($options ?: array());
56        $this->connection = $this->createConnection($parameters ?: array());
57        $this->profile = $this->options->profile;
58    }
59
60    /**
61     * Creates a new instance of Predis\Configuration\Options from different
62     * types of arguments or simply returns the passed argument if it is an
63     * instance of Predis\Configuration\OptionsInterface.
64     *
65     * @param mixed $options Client options.
66     *
67     * @throws \InvalidArgumentException
68     *
69     * @return OptionsInterface
70     */
71    protected function createOptions($options)
72    {
73        if (is_array($options)) {
74            return new Options($options);
75        }
76
77        if ($options instanceof OptionsInterface) {
78            return $options;
79        }
80
81        throw new \InvalidArgumentException('Invalid type for client options.');
82    }
83
84    /**
85     * Creates single or aggregate connections from different types of arguments
86     * (string, array) or returns the passed argument if it is an instance of a
87     * class implementing Predis\Connection\ConnectionInterface.
88     *
89     * Accepted types for connection parameters are:
90     *
91     *  - Instance of Predis\Connection\ConnectionInterface.
92     *  - Instance of Predis\Connection\ParametersInterface.
93     *  - Array
94     *  - String
95     *  - Callable
96     *
97     * @param mixed $parameters Connection parameters or connection instance.
98     *
99     * @throws \InvalidArgumentException
100     *
101     * @return ConnectionInterface
102     */
103    protected function createConnection($parameters)
104    {
105        if ($parameters instanceof ConnectionInterface) {
106            return $parameters;
107        }
108
109        if ($parameters instanceof ParametersInterface || is_string($parameters)) {
110            return $this->options->connections->create($parameters);
111        }
112
113        if (is_array($parameters)) {
114            if (!isset($parameters[0])) {
115                return $this->options->connections->create($parameters);
116            }
117
118            $options = $this->options;
119
120            if ($options->defined('aggregate')) {
121                $initializer = $this->getConnectionInitializerWrapper($options->aggregate);
122                $connection = $initializer($parameters, $options);
123            } else {
124                if ($options->defined('replication') && $replication = $options->replication) {
125                    $connection = $replication;
126                } else {
127                    $connection = $options->cluster;
128                }
129
130                $options->connections->aggregate($connection, $parameters);
131            }
132
133            return $connection;
134        }
135
136        if (is_callable($parameters)) {
137            $initializer = $this->getConnectionInitializerWrapper($parameters);
138            $connection = $initializer($this->options);
139
140            return $connection;
141        }
142
143        throw new \InvalidArgumentException('Invalid type for connection parameters.');
144    }
145
146    /**
147     * Wraps a callable to make sure that its returned value represents a valid
148     * connection type.
149     *
150     * @param mixed $callable
151     *
152     * @return \Closure
153     */
154    protected function getConnectionInitializerWrapper($callable)
155    {
156        return function () use ($callable) {
157            $connection = call_user_func_array($callable, func_get_args());
158
159            if (!$connection instanceof ConnectionInterface) {
160                throw new \UnexpectedValueException(
161                    'The callable connection initializer returned an invalid type.'
162                );
163            }
164
165            return $connection;
166        };
167    }
168
169    /**
170     * {@inheritdoc}
171     */
172    public function getProfile()
173    {
174        return $this->profile;
175    }
176
177    /**
178     * {@inheritdoc}
179     */
180    public function getOptions()
181    {
182        return $this->options;
183    }
184
185    /**
186     * Creates a new client instance for the specified connection ID or alias,
187     * only when working with an aggregate connection (cluster, replication).
188     * The new client instances uses the same options of the original one.
189     *
190     * @param string $connectionID Identifier of a connection.
191     *
192     * @throws \InvalidArgumentException
193     *
194     * @return Client
195     */
196    public function getClientFor($connectionID)
197    {
198        if (!$connection = $this->getConnectionById($connectionID)) {
199            throw new \InvalidArgumentException("Invalid connection ID: $connectionID.");
200        }
201
202        return new static($connection, $this->options);
203    }
204
205    /**
206     * Opens the underlying connection and connects to the server.
207     */
208    public function connect()
209    {
210        $this->connection->connect();
211    }
212
213    /**
214     * Closes the underlying connection and disconnects from the server.
215     */
216    public function disconnect()
217    {
218        $this->connection->disconnect();
219    }
220
221    /**
222     * Closes the underlying connection and disconnects from the server.
223     *
224     * This is the same as `Client::disconnect()` as it does not actually send
225     * the `QUIT` command to Redis, but simply closes the connection.
226     */
227    public function quit()
228    {
229        $this->disconnect();
230    }
231
232    /**
233     * Returns the current state of the underlying connection.
234     *
235     * @return bool
236     */
237    public function isConnected()
238    {
239        return $this->connection->isConnected();
240    }
241
242    /**
243     * {@inheritdoc}
244     */
245    public function getConnection()
246    {
247        return $this->connection;
248    }
249
250    /**
251     * Retrieves the specified connection from the aggregate connection when the
252     * client is in cluster or replication mode.
253     *
254     * @param string $connectionID Index or alias of the single connection.
255     *
256     * @throws NotSupportedException
257     *
258     * @return Connection\NodeConnectionInterface
259     */
260    public function getConnectionById($connectionID)
261    {
262        if (!$this->connection instanceof AggregateConnectionInterface) {
263            throw new NotSupportedException(
264                'Retrieving connections by ID is supported only by aggregate connections.'
265            );
266        }
267
268        return $this->connection->getConnectionById($connectionID);
269    }
270
271    /**
272     * Executes a command without filtering its arguments, parsing the response,
273     * applying any prefix to keys or throwing exceptions on Redis errors even
274     * regardless of client options.
275     *
276     * It is possibile to indentify Redis error responses from normal responses
277     * using the second optional argument which is populated by reference.
278     *
279     * @param array $arguments Command arguments as defined by the command signature.
280     * @param bool  $error     Set to TRUE when Redis returned an error response.
281     *
282     * @return mixed
283     */
284    public function executeRaw(array $arguments, &$error = null)
285    {
286        $error = false;
287
288        $response = $this->connection->executeCommand(
289            new RawCommand($arguments)
290        );
291
292        if ($response instanceof ResponseInterface) {
293            if ($response instanceof ErrorResponseInterface) {
294                $error = true;
295            }
296
297            return (string) $response;
298        }
299
300        return $response;
301    }
302
303    /**
304     * {@inheritdoc}
305     */
306    public function __call($commandID, $arguments)
307    {
308        return $this->executeCommand(
309            $this->createCommand($commandID, $arguments)
310        );
311    }
312
313    /**
314     * {@inheritdoc}
315     */
316    public function createCommand($commandID, $arguments = array())
317    {
318        return $this->profile->createCommand($commandID, $arguments);
319    }
320
321    /**
322     * {@inheritdoc}
323     */
324    public function executeCommand(CommandInterface $command)
325    {
326        $response = $this->connection->executeCommand($command);
327
328        if ($response instanceof ResponseInterface) {
329            if ($response instanceof ErrorResponseInterface) {
330                $response = $this->onErrorResponse($command, $response);
331            }
332
333            return $response;
334        }
335
336        return $command->parseResponse($response);
337    }
338
339    /**
340     * Handles -ERR responses returned by Redis.
341     *
342     * @param CommandInterface       $command  Redis command that generated the error.
343     * @param ErrorResponseInterface $response Instance of the error response.
344     *
345     * @throws ServerException
346     *
347     * @return mixed
348     */
349    protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $response)
350    {
351        if ($command instanceof ScriptCommand && $response->getErrorType() === 'NOSCRIPT') {
352            $eval = $this->createCommand('EVAL');
353            $eval->setRawArguments($command->getEvalArguments());
354
355            $response = $this->executeCommand($eval);
356
357            if (!$response instanceof ResponseInterface) {
358                $response = $command->parseResponse($response);
359            }
360
361            return $response;
362        }
363
364        if ($this->options->exceptions) {
365            throw new ServerException($response->getMessage());
366        }
367
368        return $response;
369    }
370
371    /**
372     * Executes the specified initializer method on `$this` by adjusting the
373     * actual invokation depending on the arity (0, 1 or 2 arguments). This is
374     * simply an utility method to create Redis contexts instances since they
375     * follow a common initialization path.
376     *
377     * @param string $initializer Method name.
378     * @param array  $argv        Arguments for the method.
379     *
380     * @return mixed
381     */
382    private function sharedContextFactory($initializer, $argv = null)
383    {
384        switch (count($argv)) {
385            case 0:
386                return $this->$initializer();
387
388            case 1:
389                return is_array($argv[0])
390                    ? $this->$initializer($argv[0])
391                    : $this->$initializer(null, $argv[0]);
392
393            case 2:
394                list($arg0, $arg1) = $argv;
395
396                return $this->$initializer($arg0, $arg1);
397
398            default:
399                return $this->$initializer($this, $argv);
400        }
401    }
402
403    /**
404     * Creates a new pipeline context and returns it, or returns the results of
405     * a pipeline executed inside the optionally provided callable object.
406     *
407     * @param mixed ... Array of options, a callable for execution, or both.
408     *
409     * @return Pipeline|array
410     */
411    public function pipeline(/* arguments */)
412    {
413        return $this->sharedContextFactory('createPipeline', func_get_args());
414    }
415
416    /**
417     * Actual pipeline context initializer method.
418     *
419     * @param array $options  Options for the context.
420     * @param mixed $callable Optional callable used to execute the context.
421     *
422     * @return Pipeline|array
423     */
424    protected function createPipeline(array $options = null, $callable = null)
425    {
426        if (isset($options['atomic']) && $options['atomic']) {
427            $class = 'Predis\Pipeline\Atomic';
428        } elseif (isset($options['fire-and-forget']) && $options['fire-and-forget']) {
429            $class = 'Predis\Pipeline\FireAndForget';
430        } else {
431            $class = 'Predis\Pipeline\Pipeline';
432        }
433
434        /*
435         * @var ClientContextInterface
436         */
437        $pipeline = new $class($this);
438
439        if (isset($callable)) {
440            return $pipeline->execute($callable);
441        }
442
443        return $pipeline;
444    }
445
446    /**
447     * Creates a new transaction context and returns it, or returns the results
448     * of a transaction executed inside the optionally provided callable object.
449     *
450     * @param mixed ... Array of options, a callable for execution, or both.
451     *
452     * @return MultiExecTransaction|array
453     */
454    public function transaction(/* arguments */)
455    {
456        return $this->sharedContextFactory('createTransaction', func_get_args());
457    }
458
459    /**
460     * Actual transaction context initializer method.
461     *
462     * @param array $options  Options for the context.
463     * @param mixed $callable Optional callable used to execute the context.
464     *
465     * @return MultiExecTransaction|array
466     */
467    protected function createTransaction(array $options = null, $callable = null)
468    {
469        $transaction = new MultiExecTransaction($this, $options);
470
471        if (isset($callable)) {
472            return $transaction->execute($callable);
473        }
474
475        return $transaction;
476    }
477
478    /**
479     * Creates a new publis/subscribe context and returns it, or starts its loop
480     * inside the optionally provided callable object.
481     *
482     * @param mixed ... Array of options, a callable for execution, or both.
483     *
484     * @return PubSubConsumer|null
485     */
486    public function pubSubLoop(/* arguments */)
487    {
488        return $this->sharedContextFactory('createPubSub', func_get_args());
489    }
490
491    /**
492     * Actual publish/subscribe context initializer method.
493     *
494     * @param array $options  Options for the context.
495     * @param mixed $callable Optional callable used to execute the context.
496     *
497     * @return PubSubConsumer|null
498     */
499    protected function createPubSub(array $options = null, $callable = null)
500    {
501        $pubsub = new PubSubConsumer($this, $options);
502
503        if (!isset($callable)) {
504            return $pubsub;
505        }
506
507        foreach ($pubsub as $message) {
508            if (call_user_func($callable, $pubsub, $message) === false) {
509                $pubsub->stop();
510            }
511        }
512    }
513
514    /**
515     * Creates a new monitor consumer and returns it.
516     *
517     * @return MonitorConsumer
518     */
519    public function monitor()
520    {
521        return new MonitorConsumer($this);
522    }
523}
524