1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis\Transaction; 13 14use Predis\ClientContextInterface; 15use Predis\ClientException; 16use Predis\ClientInterface; 17use Predis\Command\CommandInterface; 18use Predis\CommunicationException; 19use Predis\Connection\AggregateConnectionInterface; 20use Predis\NotSupportedException; 21use Predis\Protocol\ProtocolException; 22use Predis\Response\ErrorInterface as ErrorResponseInterface; 23use Predis\Response\ServerException; 24use Predis\Response\Status as StatusResponse; 25 26/** 27 * Client-side abstraction of a Redis transaction based on MULTI / EXEC. 28 * 29 * {@inheritdoc} 30 * 31 * @author Daniele Alessandri <suppakilla@gmail.com> 32 */ 33class MultiExec implements ClientContextInterface 34{ 35 private $state; 36 37 protected $client; 38 protected $commands; 39 protected $exceptions = true; 40 protected $attempts = 0; 41 protected $watchKeys = array(); 42 protected $modeCAS = false; 43 44 /** 45 * @param ClientInterface $client Client instance used by the transaction. 46 * @param array $options Initialization options. 47 */ 48 public function __construct(ClientInterface $client, array $options = null) 49 { 50 $this->assertClient($client); 51 52 $this->client = $client; 53 $this->state = new MultiExecState(); 54 55 $this->configure($client, $options ?: array()); 56 $this->reset(); 57 } 58 59 /** 60 * Checks if the passed client instance satisfies the required conditions 61 * needed to initialize the transaction object. 62 * 63 * @param ClientInterface $client Client instance used by the transaction object. 64 * 65 * @throws NotSupportedException 66 */ 67 private function assertClient(ClientInterface $client) 68 { 69 if ($client->getConnection() instanceof AggregateConnectionInterface) { 70 throw new NotSupportedException( 71 'Cannot initialize a MULTI/EXEC transaction over aggregate connections.' 72 ); 73 } 74 75 if (!$client->getProfile()->supportsCommands(array('MULTI', 'EXEC', 'DISCARD'))) { 76 throw new NotSupportedException( 77 'The current profile does not support MULTI, EXEC and DISCARD.' 78 ); 79 } 80 } 81 82 /** 83 * Configures the transaction using the provided options. 84 * 85 * @param ClientInterface $client Underlying client instance. 86 * @param array $options Array of options for the transaction. 87 **/ 88 protected function configure(ClientInterface $client, array $options) 89 { 90 if (isset($options['exceptions'])) { 91 $this->exceptions = (bool) $options['exceptions']; 92 } else { 93 $this->exceptions = $client->getOptions()->exceptions; 94 } 95 96 if (isset($options['cas'])) { 97 $this->modeCAS = (bool) $options['cas']; 98 } 99 100 if (isset($options['watch']) && $keys = $options['watch']) { 101 $this->watchKeys = $keys; 102 } 103 104 if (isset($options['retry'])) { 105 $this->attempts = (int) $options['retry']; 106 } 107 } 108 109 /** 110 * Resets the state of the transaction. 111 */ 112 protected function reset() 113 { 114 $this->state->reset(); 115 $this->commands = new \SplQueue(); 116 } 117 118 /** 119 * Initializes the transaction context. 120 */ 121 protected function initialize() 122 { 123 if ($this->state->isInitialized()) { 124 return; 125 } 126 127 if ($this->modeCAS) { 128 $this->state->flag(MultiExecState::CAS); 129 } 130 131 if ($this->watchKeys) { 132 $this->watch($this->watchKeys); 133 } 134 135 $cas = $this->state->isCAS(); 136 $discarded = $this->state->isDiscarded(); 137 138 if (!$cas || ($cas && $discarded)) { 139 $this->call('MULTI'); 140 141 if ($discarded) { 142 $this->state->unflag(MultiExecState::CAS); 143 } 144 } 145 146 $this->state->unflag(MultiExecState::DISCARDED); 147 $this->state->flag(MultiExecState::INITIALIZED); 148 } 149 150 /** 151 * Dynamically invokes a Redis command with the specified arguments. 152 * 153 * @param string $method Command ID. 154 * @param array $arguments Arguments for the command. 155 * 156 * @return mixed 157 */ 158 public function __call($method, $arguments) 159 { 160 return $this->executeCommand( 161 $this->client->createCommand($method, $arguments) 162 ); 163 } 164 165 /** 166 * Executes a Redis command bypassing the transaction logic. 167 * 168 * @param string $commandID Command ID. 169 * @param array $arguments Arguments for the command. 170 * 171 * @throws ServerException 172 * 173 * @return mixed 174 */ 175 protected function call($commandID, array $arguments = array()) 176 { 177 $response = $this->client->executeCommand( 178 $this->client->createCommand($commandID, $arguments) 179 ); 180 181 if ($response instanceof ErrorResponseInterface) { 182 throw new ServerException($response->getMessage()); 183 } 184 185 return $response; 186 } 187 188 /** 189 * Executes the specified Redis command. 190 * 191 * @param CommandInterface $command Command instance. 192 * 193 * @throws AbortedMultiExecException 194 * @throws CommunicationException 195 * 196 * @return $this|mixed 197 */ 198 public function executeCommand(CommandInterface $command) 199 { 200 $this->initialize(); 201 202 if ($this->state->isCAS()) { 203 return $this->client->executeCommand($command); 204 } 205 206 $response = $this->client->getConnection()->executeCommand($command); 207 208 if ($response instanceof StatusResponse && $response == 'QUEUED') { 209 $this->commands->enqueue($command); 210 } elseif ($response instanceof ErrorResponseInterface) { 211 throw new AbortedMultiExecException($this, $response->getMessage()); 212 } else { 213 $this->onProtocolError('The server did not return a +QUEUED status response.'); 214 } 215 216 return $this; 217 } 218 219 /** 220 * Executes WATCH against one or more keys. 221 * 222 * @param string|array $keys One or more keys. 223 * 224 * @throws NotSupportedException 225 * @throws ClientException 226 * 227 * @return mixed 228 */ 229 public function watch($keys) 230 { 231 if (!$this->client->getProfile()->supportsCommand('WATCH')) { 232 throw new NotSupportedException('WATCH is not supported by the current profile.'); 233 } 234 235 if ($this->state->isWatchAllowed()) { 236 throw new ClientException('Sending WATCH after MULTI is not allowed.'); 237 } 238 239 $response = $this->call('WATCH', is_array($keys) ? $keys : array($keys)); 240 $this->state->flag(MultiExecState::WATCH); 241 242 return $response; 243 } 244 245 /** 246 * Finalizes the transaction by executing MULTI on the server. 247 * 248 * @return MultiExec 249 */ 250 public function multi() 251 { 252 if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) { 253 $this->state->unflag(MultiExecState::CAS); 254 $this->call('MULTI'); 255 } else { 256 $this->initialize(); 257 } 258 259 return $this; 260 } 261 262 /** 263 * Executes UNWATCH. 264 * 265 * @throws NotSupportedException 266 * 267 * @return MultiExec 268 */ 269 public function unwatch() 270 { 271 if (!$this->client->getProfile()->supportsCommand('UNWATCH')) { 272 throw new NotSupportedException( 273 'UNWATCH is not supported by the current profile.' 274 ); 275 } 276 277 $this->state->unflag(MultiExecState::WATCH); 278 $this->__call('UNWATCH', array()); 279 280 return $this; 281 } 282 283 /** 284 * Resets the transaction by UNWATCH-ing the keys that are being WATCHed and 285 * DISCARD-ing pending commands that have been already sent to the server. 286 * 287 * @return MultiExec 288 */ 289 public function discard() 290 { 291 if ($this->state->isInitialized()) { 292 $this->call($this->state->isCAS() ? 'UNWATCH' : 'DISCARD'); 293 294 $this->reset(); 295 $this->state->flag(MultiExecState::DISCARDED); 296 } 297 298 return $this; 299 } 300 301 /** 302 * Executes the whole transaction. 303 * 304 * @return mixed 305 */ 306 public function exec() 307 { 308 return $this->execute(); 309 } 310 311 /** 312 * Checks the state of the transaction before execution. 313 * 314 * @param mixed $callable Callback for execution. 315 * 316 * @throws \InvalidArgumentException 317 * @throws ClientException 318 */ 319 private function checkBeforeExecution($callable) 320 { 321 if ($this->state->isExecuting()) { 322 throw new ClientException( 323 'Cannot invoke "execute" or "exec" inside an active transaction context.' 324 ); 325 } 326 327 if ($callable) { 328 if (!is_callable($callable)) { 329 throw new \InvalidArgumentException('The argument must be a callable object.'); 330 } 331 332 if (!$this->commands->isEmpty()) { 333 $this->discard(); 334 335 throw new ClientException( 336 'Cannot execute a transaction block after using fluent interface.' 337 ); 338 } 339 } elseif ($this->attempts) { 340 $this->discard(); 341 342 throw new ClientException( 343 'Automatic retries are supported only when a callable block is provided.' 344 ); 345 } 346 } 347 348 /** 349 * Handles the actual execution of the whole transaction. 350 * 351 * @param mixed $callable Optional callback for execution. 352 * 353 * @throws CommunicationException 354 * @throws AbortedMultiExecException 355 * @throws ServerException 356 * 357 * @return array 358 */ 359 public function execute($callable = null) 360 { 361 $this->checkBeforeExecution($callable); 362 363 $execResponse = null; 364 $attempts = $this->attempts; 365 366 do { 367 if ($callable) { 368 $this->executeTransactionBlock($callable); 369 } 370 371 if ($this->commands->isEmpty()) { 372 if ($this->state->isWatching()) { 373 $this->discard(); 374 } 375 376 return; 377 } 378 379 $execResponse = $this->call('EXEC'); 380 381 if ($execResponse === null) { 382 if ($attempts === 0) { 383 throw new AbortedMultiExecException( 384 $this, 'The current transaction has been aborted by the server.' 385 ); 386 } 387 388 $this->reset(); 389 390 continue; 391 } 392 393 break; 394 } while ($attempts-- > 0); 395 396 $response = array(); 397 $commands = $this->commands; 398 $size = count($execResponse); 399 400 if ($size !== count($commands)) { 401 $this->onProtocolError('EXEC returned an unexpected number of response items.'); 402 } 403 404 for ($i = 0; $i < $size; ++$i) { 405 $cmdResponse = $execResponse[$i]; 406 407 if ($cmdResponse instanceof ErrorResponseInterface && $this->exceptions) { 408 throw new ServerException($cmdResponse->getMessage()); 409 } 410 411 $response[$i] = $commands->dequeue()->parseResponse($cmdResponse); 412 } 413 414 return $response; 415 } 416 417 /** 418 * Passes the current transaction object to a callable block for execution. 419 * 420 * @param mixed $callable Callback. 421 * 422 * @throws CommunicationException 423 * @throws ServerException 424 */ 425 protected function executeTransactionBlock($callable) 426 { 427 $exception = null; 428 $this->state->flag(MultiExecState::INSIDEBLOCK); 429 430 try { 431 call_user_func($callable, $this); 432 } catch (CommunicationException $exception) { 433 // NOOP 434 } catch (ServerException $exception) { 435 // NOOP 436 } catch (\Exception $exception) { 437 $this->discard(); 438 } 439 440 $this->state->unflag(MultiExecState::INSIDEBLOCK); 441 442 if ($exception) { 443 throw $exception; 444 } 445 } 446 447 /** 448 * Helper method for protocol errors encountered inside the transaction. 449 * 450 * @param string $message Error message. 451 */ 452 private function onProtocolError($message) 453 { 454 // Since a MULTI/EXEC block cannot be initialized when using aggregate 455 // connections we can safely assume that Predis\Client::getConnection() 456 // will return a Predis\Connection\NodeConnectionInterface instance. 457 CommunicationException::handle(new ProtocolException( 458 $this->client->getConnection(), $message 459 )); 460 } 461} 462