1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis\Transaction;
13
14use Predis\ClientContextInterface;
15use Predis\ClientException;
16use Predis\ClientInterface;
17use Predis\Command\CommandInterface;
18use Predis\CommunicationException;
19use Predis\Connection\AggregateConnectionInterface;
20use Predis\NotSupportedException;
21use Predis\Protocol\ProtocolException;
22use Predis\Response\ErrorInterface as ErrorResponseInterface;
23use Predis\Response\ServerException;
24use Predis\Response\Status as StatusResponse;
25
26/**
27 * Client-side abstraction of a Redis transaction based on MULTI / EXEC.
28 *
29 * {@inheritdoc}
30 *
31 * @author Daniele Alessandri <suppakilla@gmail.com>
32 */
33class MultiExec implements ClientContextInterface
34{
35    private $state;
36
37    protected $client;
38    protected $commands;
39    protected $exceptions = true;
40    protected $attempts = 0;
41    protected $watchKeys = array();
42    protected $modeCAS = false;
43
44    /**
45     * @param ClientInterface $client  Client instance used by the transaction.
46     * @param array           $options Initialization options.
47     */
48    public function __construct(ClientInterface $client, array $options = null)
49    {
50        $this->assertClient($client);
51
52        $this->client = $client;
53        $this->state = new MultiExecState();
54
55        $this->configure($client, $options ?: array());
56        $this->reset();
57    }
58
59    /**
60     * Checks if the passed client instance satisfies the required conditions
61     * needed to initialize the transaction object.
62     *
63     * @param ClientInterface $client Client instance used by the transaction object.
64     *
65     * @throws NotSupportedException
66     */
67    private function assertClient(ClientInterface $client)
68    {
69        if ($client->getConnection() instanceof AggregateConnectionInterface) {
70            throw new NotSupportedException(
71                'Cannot initialize a MULTI/EXEC transaction over aggregate connections.'
72            );
73        }
74
75        if (!$client->getProfile()->supportsCommands(array('MULTI', 'EXEC', 'DISCARD'))) {
76            throw new NotSupportedException(
77                'The current profile does not support MULTI, EXEC and DISCARD.'
78            );
79        }
80    }
81
82    /**
83     * Configures the transaction using the provided options.
84     *
85     * @param ClientInterface $client  Underlying client instance.
86     * @param array           $options Array of options for the transaction.
87     **/
88    protected function configure(ClientInterface $client, array $options)
89    {
90        if (isset($options['exceptions'])) {
91            $this->exceptions = (bool) $options['exceptions'];
92        } else {
93            $this->exceptions = $client->getOptions()->exceptions;
94        }
95
96        if (isset($options['cas'])) {
97            $this->modeCAS = (bool) $options['cas'];
98        }
99
100        if (isset($options['watch']) && $keys = $options['watch']) {
101            $this->watchKeys = $keys;
102        }
103
104        if (isset($options['retry'])) {
105            $this->attempts = (int) $options['retry'];
106        }
107    }
108
109    /**
110     * Resets the state of the transaction.
111     */
112    protected function reset()
113    {
114        $this->state->reset();
115        $this->commands = new \SplQueue();
116    }
117
118    /**
119     * Initializes the transaction context.
120     */
121    protected function initialize()
122    {
123        if ($this->state->isInitialized()) {
124            return;
125        }
126
127        if ($this->modeCAS) {
128            $this->state->flag(MultiExecState::CAS);
129        }
130
131        if ($this->watchKeys) {
132            $this->watch($this->watchKeys);
133        }
134
135        $cas = $this->state->isCAS();
136        $discarded = $this->state->isDiscarded();
137
138        if (!$cas || ($cas && $discarded)) {
139            $this->call('MULTI');
140
141            if ($discarded) {
142                $this->state->unflag(MultiExecState::CAS);
143            }
144        }
145
146        $this->state->unflag(MultiExecState::DISCARDED);
147        $this->state->flag(MultiExecState::INITIALIZED);
148    }
149
150    /**
151     * Dynamically invokes a Redis command with the specified arguments.
152     *
153     * @param string $method    Command ID.
154     * @param array  $arguments Arguments for the command.
155     *
156     * @return mixed
157     */
158    public function __call($method, $arguments)
159    {
160        return $this->executeCommand(
161            $this->client->createCommand($method, $arguments)
162        );
163    }
164
165    /**
166     * Executes a Redis command bypassing the transaction logic.
167     *
168     * @param string $commandID Command ID.
169     * @param array  $arguments Arguments for the command.
170     *
171     * @throws ServerException
172     *
173     * @return mixed
174     */
175    protected function call($commandID, array $arguments = array())
176    {
177        $response = $this->client->executeCommand(
178            $this->client->createCommand($commandID, $arguments)
179        );
180
181        if ($response instanceof ErrorResponseInterface) {
182            throw new ServerException($response->getMessage());
183        }
184
185        return $response;
186    }
187
188    /**
189     * Executes the specified Redis command.
190     *
191     * @param CommandInterface $command Command instance.
192     *
193     * @throws AbortedMultiExecException
194     * @throws CommunicationException
195     *
196     * @return $this|mixed
197     */
198    public function executeCommand(CommandInterface $command)
199    {
200        $this->initialize();
201
202        if ($this->state->isCAS()) {
203            return $this->client->executeCommand($command);
204        }
205
206        $response = $this->client->getConnection()->executeCommand($command);
207
208        if ($response instanceof StatusResponse && $response == 'QUEUED') {
209            $this->commands->enqueue($command);
210        } elseif ($response instanceof ErrorResponseInterface) {
211            throw new AbortedMultiExecException($this, $response->getMessage());
212        } else {
213            $this->onProtocolError('The server did not return a +QUEUED status response.');
214        }
215
216        return $this;
217    }
218
219    /**
220     * Executes WATCH against one or more keys.
221     *
222     * @param string|array $keys One or more keys.
223     *
224     * @throws NotSupportedException
225     * @throws ClientException
226     *
227     * @return mixed
228     */
229    public function watch($keys)
230    {
231        if (!$this->client->getProfile()->supportsCommand('WATCH')) {
232            throw new NotSupportedException('WATCH is not supported by the current profile.');
233        }
234
235        if ($this->state->isWatchAllowed()) {
236            throw new ClientException('Sending WATCH after MULTI is not allowed.');
237        }
238
239        $response = $this->call('WATCH', is_array($keys) ? $keys : array($keys));
240        $this->state->flag(MultiExecState::WATCH);
241
242        return $response;
243    }
244
245    /**
246     * Finalizes the transaction by executing MULTI on the server.
247     *
248     * @return MultiExec
249     */
250    public function multi()
251    {
252        if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) {
253            $this->state->unflag(MultiExecState::CAS);
254            $this->call('MULTI');
255        } else {
256            $this->initialize();
257        }
258
259        return $this;
260    }
261
262    /**
263     * Executes UNWATCH.
264     *
265     * @throws NotSupportedException
266     *
267     * @return MultiExec
268     */
269    public function unwatch()
270    {
271        if (!$this->client->getProfile()->supportsCommand('UNWATCH')) {
272            throw new NotSupportedException(
273                'UNWATCH is not supported by the current profile.'
274            );
275        }
276
277        $this->state->unflag(MultiExecState::WATCH);
278        $this->__call('UNWATCH', array());
279
280        return $this;
281    }
282
283    /**
284     * Resets the transaction by UNWATCH-ing the keys that are being WATCHed and
285     * DISCARD-ing pending commands that have been already sent to the server.
286     *
287     * @return MultiExec
288     */
289    public function discard()
290    {
291        if ($this->state->isInitialized()) {
292            $this->call($this->state->isCAS() ? 'UNWATCH' : 'DISCARD');
293
294            $this->reset();
295            $this->state->flag(MultiExecState::DISCARDED);
296        }
297
298        return $this;
299    }
300
301    /**
302     * Executes the whole transaction.
303     *
304     * @return mixed
305     */
306    public function exec()
307    {
308        return $this->execute();
309    }
310
311    /**
312     * Checks the state of the transaction before execution.
313     *
314     * @param mixed $callable Callback for execution.
315     *
316     * @throws \InvalidArgumentException
317     * @throws ClientException
318     */
319    private function checkBeforeExecution($callable)
320    {
321        if ($this->state->isExecuting()) {
322            throw new ClientException(
323                'Cannot invoke "execute" or "exec" inside an active transaction context.'
324            );
325        }
326
327        if ($callable) {
328            if (!is_callable($callable)) {
329                throw new \InvalidArgumentException('The argument must be a callable object.');
330            }
331
332            if (!$this->commands->isEmpty()) {
333                $this->discard();
334
335                throw new ClientException(
336                    'Cannot execute a transaction block after using fluent interface.'
337                );
338            }
339        } elseif ($this->attempts) {
340            $this->discard();
341
342            throw new ClientException(
343                'Automatic retries are supported only when a callable block is provided.'
344            );
345        }
346    }
347
348    /**
349     * Handles the actual execution of the whole transaction.
350     *
351     * @param mixed $callable Optional callback for execution.
352     *
353     * @throws CommunicationException
354     * @throws AbortedMultiExecException
355     * @throws ServerException
356     *
357     * @return array
358     */
359    public function execute($callable = null)
360    {
361        $this->checkBeforeExecution($callable);
362
363        $execResponse = null;
364        $attempts = $this->attempts;
365
366        do {
367            if ($callable) {
368                $this->executeTransactionBlock($callable);
369            }
370
371            if ($this->commands->isEmpty()) {
372                if ($this->state->isWatching()) {
373                    $this->discard();
374                }
375
376                return;
377            }
378
379            $execResponse = $this->call('EXEC');
380
381            if ($execResponse === null) {
382                if ($attempts === 0) {
383                    throw new AbortedMultiExecException(
384                        $this, 'The current transaction has been aborted by the server.'
385                    );
386                }
387
388                $this->reset();
389
390                continue;
391            }
392
393            break;
394        } while ($attempts-- > 0);
395
396        $response = array();
397        $commands = $this->commands;
398        $size = count($execResponse);
399
400        if ($size !== count($commands)) {
401            $this->onProtocolError('EXEC returned an unexpected number of response items.');
402        }
403
404        for ($i = 0; $i < $size; ++$i) {
405            $cmdResponse = $execResponse[$i];
406
407            if ($cmdResponse instanceof ErrorResponseInterface && $this->exceptions) {
408                throw new ServerException($cmdResponse->getMessage());
409            }
410
411            $response[$i] = $commands->dequeue()->parseResponse($cmdResponse);
412        }
413
414        return $response;
415    }
416
417    /**
418     * Passes the current transaction object to a callable block for execution.
419     *
420     * @param mixed $callable Callback.
421     *
422     * @throws CommunicationException
423     * @throws ServerException
424     */
425    protected function executeTransactionBlock($callable)
426    {
427        $exception = null;
428        $this->state->flag(MultiExecState::INSIDEBLOCK);
429
430        try {
431            call_user_func($callable, $this);
432        } catch (CommunicationException $exception) {
433            // NOOP
434        } catch (ServerException $exception) {
435            // NOOP
436        } catch (\Exception $exception) {
437            $this->discard();
438        }
439
440        $this->state->unflag(MultiExecState::INSIDEBLOCK);
441
442        if ($exception) {
443            throw $exception;
444        }
445    }
446
447    /**
448     * Helper method for protocol errors encountered inside the transaction.
449     *
450     * @param string $message Error message.
451     */
452    private function onProtocolError($message)
453    {
454        // Since a MULTI/EXEC block cannot be initialized when using aggregate
455        // connections we can safely assume that Predis\Client::getConnection()
456        // will return a Predis\Connection\NodeConnectionInterface instance.
457        CommunicationException::handle(new ProtocolException(
458            $this->client->getConnection(), $message
459        ));
460    }
461}
462