1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis\Connection\Aggregate; 13 14use Predis\Command\CommandInterface; 15use Predis\Command\RawCommand; 16use Predis\CommunicationException; 17use Predis\Connection\ConnectionException; 18use Predis\Connection\FactoryInterface as ConnectionFactoryInterface; 19use Predis\Connection\NodeConnectionInterface; 20use Predis\Connection\Parameters; 21use Predis\Replication\ReplicationStrategy; 22use Predis\Replication\RoleException; 23use Predis\Response\ErrorInterface as ErrorResponseInterface; 24use Predis\Response\ServerException; 25 26/** 27 * @author Daniele Alessandri <suppakilla@gmail.com> 28 * @author Ville Mattila <ville@eventio.fi> 29 */ 30class SentinelReplication implements ReplicationInterface 31{ 32 /** 33 * @var NodeConnectionInterface 34 */ 35 protected $master; 36 37 /** 38 * @var NodeConnectionInterface[] 39 */ 40 protected $slaves = array(); 41 42 /** 43 * @var NodeConnectionInterface 44 */ 45 protected $current; 46 47 /** 48 * @var string 49 */ 50 protected $service; 51 52 /** 53 * @var ConnectionFactoryInterface 54 */ 55 protected $connectionFactory; 56 57 /** 58 * @var ReplicationStrategy 59 */ 60 protected $strategy; 61 62 /** 63 * @var NodeConnectionInterface[] 64 */ 65 protected $sentinels = array(); 66 67 /** 68 * @var NodeConnectionInterface 69 */ 70 protected $sentinelConnection; 71 72 /** 73 * @var float 74 */ 75 protected $sentinelTimeout = 0.100; 76 77 /** 78 * Max number of automatic retries of commands upon server failure. 79 * 80 * -1 = unlimited retry attempts 81 * 0 = no retry attempts (fails immediatly) 82 * n = fail only after n retry attempts 83 * 84 * @var int 85 */ 86 protected $retryLimit = 20; 87 88 /** 89 * Time to wait in milliseconds before fetching a new configuration from one 90 * of the sentinel servers. 91 * 92 * @var int 93 */ 94 protected $retryWait = 1000; 95 96 /** 97 * Flag for automatic fetching of available sentinels. 98 * 99 * @var bool 100 */ 101 protected $updateSentinels = false; 102 103 /** 104 * @param string $service Name of the service for autodiscovery. 105 * @param array $sentinels Sentinel servers connection parameters. 106 * @param ConnectionFactoryInterface $connectionFactory Connection factory instance. 107 * @param ReplicationStrategy $strategy Replication strategy instance. 108 */ 109 public function __construct( 110 $service, 111 array $sentinels, 112 ConnectionFactoryInterface $connectionFactory, 113 ReplicationStrategy $strategy = null 114 ) { 115 $this->sentinels = $sentinels; 116 $this->service = $service; 117 $this->connectionFactory = $connectionFactory; 118 $this->strategy = $strategy ?: new ReplicationStrategy(); 119 } 120 121 /** 122 * Sets a default timeout for connections to sentinels. 123 * 124 * When "timeout" is present in the connection parameters of sentinels, its 125 * value overrides the default sentinel timeout. 126 * 127 * @param float $timeout Timeout value. 128 */ 129 public function setSentinelTimeout($timeout) 130 { 131 $this->sentinelTimeout = (float) $timeout; 132 } 133 134 /** 135 * Sets the maximum number of retries for commands upon server failure. 136 * 137 * -1 = unlimited retry attempts 138 * 0 = no retry attempts (fails immediatly) 139 * n = fail only after n retry attempts 140 * 141 * @param int $retry Number of retry attempts. 142 */ 143 public function setRetryLimit($retry) 144 { 145 $this->retryLimit = (int) $retry; 146 } 147 148 /** 149 * Sets the time to wait (in seconds) before fetching a new configuration 150 * from one of the sentinels. 151 * 152 * @param float $seconds Time to wait before the next attempt. 153 */ 154 public function setRetryWait($seconds) 155 { 156 $this->retryWait = (float) $seconds; 157 } 158 159 /** 160 * Set automatic fetching of available sentinels. 161 * 162 * @param bool $update Enable or disable automatic updates. 163 */ 164 public function setUpdateSentinels($update) 165 { 166 $this->updateSentinels = (bool) $update; 167 } 168 169 /** 170 * Resets the current connection. 171 */ 172 protected function reset() 173 { 174 $this->current = null; 175 } 176 177 /** 178 * Wipes the current list of master and slaves nodes. 179 */ 180 protected function wipeServerList() 181 { 182 $this->reset(); 183 184 $this->master = null; 185 $this->slaves = array(); 186 } 187 188 /** 189 * {@inheritdoc} 190 */ 191 public function add(NodeConnectionInterface $connection) 192 { 193 $alias = $connection->getParameters()->alias; 194 195 if ($alias === 'master') { 196 $this->master = $connection; 197 } else { 198 $this->slaves[$alias ?: count($this->slaves)] = $connection; 199 } 200 201 $this->reset(); 202 } 203 204 /** 205 * {@inheritdoc} 206 */ 207 public function remove(NodeConnectionInterface $connection) 208 { 209 if ($connection === $this->master) { 210 $this->master = null; 211 $this->reset(); 212 213 return true; 214 } 215 216 if (false !== $id = array_search($connection, $this->slaves, true)) { 217 unset($this->slaves[$id]); 218 $this->reset(); 219 220 return true; 221 } 222 223 return false; 224 } 225 226 /** 227 * Creates a new connection to a sentinel server. 228 * 229 * @return NodeConnectionInterface 230 */ 231 protected function createSentinelConnection($parameters) 232 { 233 if ($parameters instanceof NodeConnectionInterface) { 234 return $parameters; 235 } 236 237 if (is_string($parameters)) { 238 $parameters = Parameters::parse($parameters); 239 } 240 241 if (is_array($parameters)) { 242 // NOTE: sentinels do not accept AUTH and SELECT commands so we must 243 // explicitly set them to NULL to avoid problems when using default 244 // parameters set via client options. Actually AUTH is supported for 245 // sentinels starting with Redis 5 but we have to differentiate from 246 // sentinels passwords and nodes passwords, this will be implemented 247 // in a later release. 248 $parameters['database'] = null; 249 $parameters['username'] = null; 250 $parameters['password'] = null; 251 252 if (!isset($parameters['timeout'])) { 253 $parameters['timeout'] = $this->sentinelTimeout; 254 } 255 } 256 257 $connection = $this->connectionFactory->create($parameters); 258 259 return $connection; 260 } 261 262 /** 263 * Returns the current sentinel connection. 264 * 265 * If there is no active sentinel connection, a new connection is created. 266 * 267 * @return NodeConnectionInterface 268 */ 269 public function getSentinelConnection() 270 { 271 if (!$this->sentinelConnection) { 272 if (!$this->sentinels) { 273 throw new \Predis\ClientException('No sentinel server available for autodiscovery.'); 274 } 275 276 $sentinel = array_shift($this->sentinels); 277 $this->sentinelConnection = $this->createSentinelConnection($sentinel); 278 } 279 280 return $this->sentinelConnection; 281 } 282 283 /** 284 * Fetches an updated list of sentinels from a sentinel. 285 */ 286 public function updateSentinels() 287 { 288 SENTINEL_QUERY: { 289 $sentinel = $this->getSentinelConnection(); 290 291 try { 292 $payload = $sentinel->executeCommand( 293 RawCommand::create('SENTINEL', 'sentinels', $this->service) 294 ); 295 296 $this->sentinels = array(); 297 // NOTE: sentinel server does not return itself, so we add it back. 298 $this->sentinels[] = $sentinel->getParameters()->toArray(); 299 300 foreach ($payload as $sentinel) { 301 $this->sentinels[] = array( 302 'host' => $sentinel[3], 303 'port' => $sentinel[5], 304 ); 305 } 306 } catch (ConnectionException $exception) { 307 $this->sentinelConnection = null; 308 309 goto SENTINEL_QUERY; 310 } 311 } 312 } 313 314 /** 315 * Fetches the details for the master and slave servers from a sentinel. 316 */ 317 public function querySentinel() 318 { 319 $this->wipeServerList(); 320 321 $this->updateSentinels(); 322 $this->getMaster(); 323 $this->getSlaves(); 324 } 325 326 /** 327 * Handles error responses returned by redis-sentinel. 328 * 329 * @param NodeConnectionInterface $sentinel Connection to a sentinel server. 330 * @param ErrorResponseInterface $error Error response. 331 */ 332 private function handleSentinelErrorResponse(NodeConnectionInterface $sentinel, ErrorResponseInterface $error) 333 { 334 if ($error->getErrorType() === 'IDONTKNOW') { 335 throw new ConnectionException($sentinel, $error->getMessage()); 336 } else { 337 throw new ServerException($error->getMessage()); 338 } 339 } 340 341 /** 342 * Fetches the details for the master server from a sentinel. 343 * 344 * @param NodeConnectionInterface $sentinel Connection to a sentinel server. 345 * @param string $service Name of the service. 346 * 347 * @return array 348 */ 349 protected function querySentinelForMaster(NodeConnectionInterface $sentinel, $service) 350 { 351 $payload = $sentinel->executeCommand( 352 RawCommand::create('SENTINEL', 'get-master-addr-by-name', $service) 353 ); 354 355 if ($payload === null) { 356 throw new ServerException('ERR No such master with that name'); 357 } 358 359 if ($payload instanceof ErrorResponseInterface) { 360 $this->handleSentinelErrorResponse($sentinel, $payload); 361 } 362 363 return array( 364 'host' => $payload[0], 365 'port' => $payload[1], 366 'alias' => 'master', 367 ); 368 } 369 370 /** 371 * Fetches the details for the slave servers from a sentinel. 372 * 373 * @param NodeConnectionInterface $sentinel Connection to a sentinel server. 374 * @param string $service Name of the service. 375 * 376 * @return array 377 */ 378 protected function querySentinelForSlaves(NodeConnectionInterface $sentinel, $service) 379 { 380 $slaves = array(); 381 382 $payload = $sentinel->executeCommand( 383 RawCommand::create('SENTINEL', 'slaves', $service) 384 ); 385 386 if ($payload instanceof ErrorResponseInterface) { 387 $this->handleSentinelErrorResponse($sentinel, $payload); 388 } 389 390 foreach ($payload as $slave) { 391 $flags = explode(',', $slave[9]); 392 393 if (array_intersect($flags, array('s_down', 'o_down', 'disconnected'))) { 394 continue; 395 } 396 397 $slaves[] = array( 398 'host' => $slave[3], 399 'port' => $slave[5], 400 'alias' => "slave-$slave[1]", 401 ); 402 } 403 404 return $slaves; 405 } 406 407 /** 408 * {@inheritdoc} 409 */ 410 public function getCurrent() 411 { 412 return $this->current; 413 } 414 415 /** 416 * {@inheritdoc} 417 */ 418 public function getMaster() 419 { 420 if ($this->master) { 421 return $this->master; 422 } 423 424 if ($this->updateSentinels) { 425 $this->updateSentinels(); 426 } 427 428 SENTINEL_QUERY: { 429 $sentinel = $this->getSentinelConnection(); 430 431 try { 432 $masterParameters = $this->querySentinelForMaster($sentinel, $this->service); 433 $masterConnection = $this->connectionFactory->create($masterParameters); 434 435 $this->add($masterConnection); 436 } catch (ConnectionException $exception) { 437 $this->sentinelConnection = null; 438 439 goto SENTINEL_QUERY; 440 } 441 } 442 443 return $masterConnection; 444 } 445 446 /** 447 * {@inheritdoc} 448 */ 449 public function getSlaves() 450 { 451 if ($this->slaves) { 452 return array_values($this->slaves); 453 } 454 455 if ($this->updateSentinels) { 456 $this->updateSentinels(); 457 } 458 459 SENTINEL_QUERY: { 460 $sentinel = $this->getSentinelConnection(); 461 462 try { 463 $slavesParameters = $this->querySentinelForSlaves($sentinel, $this->service); 464 465 foreach ($slavesParameters as $slaveParameters) { 466 $this->add($this->connectionFactory->create($slaveParameters)); 467 } 468 } catch (ConnectionException $exception) { 469 $this->sentinelConnection = null; 470 471 goto SENTINEL_QUERY; 472 } 473 } 474 475 return array_values($this->slaves ?: array()); 476 } 477 478 /** 479 * Returns a random slave. 480 * 481 * @return NodeConnectionInterface 482 */ 483 protected function pickSlave() 484 { 485 if ($slaves = $this->getSlaves()) { 486 return $slaves[rand(1, count($slaves)) - 1]; 487 } 488 } 489 490 /** 491 * Returns the connection instance in charge for the given command. 492 * 493 * @param CommandInterface $command Command instance. 494 * 495 * @return NodeConnectionInterface 496 */ 497 private function getConnectionInternal(CommandInterface $command) 498 { 499 if (!$this->current) { 500 if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) { 501 $this->current = $slave; 502 } else { 503 $this->current = $this->getMaster(); 504 } 505 506 return $this->current; 507 } 508 509 if ($this->current === $this->master) { 510 return $this->current; 511 } 512 513 if (!$this->strategy->isReadOperation($command)) { 514 $this->current = $this->getMaster(); 515 } 516 517 return $this->current; 518 } 519 520 /** 521 * Asserts that the specified connection matches an expected role. 522 * 523 * @param NodeConnectionInterface $connection Connection to a redis server. 524 * @param string $role Expected role of the server ("master", "slave" or "sentinel"). 525 * 526 * @throws RoleException 527 */ 528 protected function assertConnectionRole(NodeConnectionInterface $connection, $role) 529 { 530 $role = strtolower($role); 531 $actualRole = $connection->executeCommand(RawCommand::create('ROLE')); 532 533 if ($role !== $actualRole[0]) { 534 throw new RoleException($connection, "Expected $role but got $actualRole[0] [$connection]"); 535 } 536 } 537 538 /** 539 * {@inheritdoc} 540 */ 541 public function getConnection(CommandInterface $command) 542 { 543 $connection = $this->getConnectionInternal($command); 544 545 if (!$connection->isConnected()) { 546 // When we do not have any available slave in the pool we can expect 547 // read-only operations to hit the master server. 548 $expectedRole = $this->strategy->isReadOperation($command) && $this->slaves ? 'slave' : 'master'; 549 $this->assertConnectionRole($connection, $expectedRole); 550 } 551 552 return $connection; 553 } 554 555 /** 556 * {@inheritdoc} 557 */ 558 public function getConnectionById($connectionId) 559 { 560 if ($connectionId === 'master') { 561 return $this->getMaster(); 562 } 563 564 $this->getSlaves(); 565 566 if (isset($this->slaves[$connectionId])) { 567 return $this->slaves[$connectionId]; 568 } 569 } 570 571 /** 572 * {@inheritdoc} 573 */ 574 public function switchTo($connection) 575 { 576 if (!$connection instanceof NodeConnectionInterface) { 577 $connection = $this->getConnectionById($connection); 578 } 579 580 if ($connection && $connection === $this->current) { 581 return; 582 } 583 584 if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) { 585 throw new \InvalidArgumentException('Invalid connection or connection not found.'); 586 } 587 588 $connection->connect(); 589 590 if ($this->current) { 591 $this->current->disconnect(); 592 } 593 594 $this->current = $connection; 595 } 596 597 /** 598 * Switches to the master server. 599 */ 600 public function switchToMaster() 601 { 602 $this->switchTo('master'); 603 } 604 605 /** 606 * Switches to a random slave server. 607 */ 608 public function switchToSlave() 609 { 610 $connection = $this->pickSlave(); 611 $this->switchTo($connection); 612 } 613 614 /** 615 * {@inheritdoc} 616 */ 617 public function isConnected() 618 { 619 return $this->current ? $this->current->isConnected() : false; 620 } 621 622 /** 623 * {@inheritdoc} 624 */ 625 public function connect() 626 { 627 if (!$this->current) { 628 if (!$this->current = $this->pickSlave()) { 629 $this->current = $this->getMaster(); 630 } 631 } 632 633 $this->current->connect(); 634 } 635 636 /** 637 * {@inheritdoc} 638 */ 639 public function disconnect() 640 { 641 if ($this->master) { 642 $this->master->disconnect(); 643 } 644 645 foreach ($this->slaves as $connection) { 646 $connection->disconnect(); 647 } 648 } 649 650 /** 651 * Retries the execution of a command upon server failure after asking a new 652 * configuration to one of the sentinels. 653 * 654 * @param CommandInterface $command Command instance. 655 * @param string $method Actual method. 656 * 657 * @return mixed 658 */ 659 private function retryCommandOnFailure(CommandInterface $command, $method) 660 { 661 $retries = 0; 662 663 SENTINEL_RETRY: { 664 try { 665 $response = $this->getConnection($command)->$method($command); 666 } catch (CommunicationException $exception) { 667 $this->wipeServerList(); 668 $exception->getConnection()->disconnect(); 669 670 if ($retries == $this->retryLimit) { 671 throw $exception; 672 } 673 674 usleep($this->retryWait * 1000); 675 676 ++$retries; 677 goto SENTINEL_RETRY; 678 } 679 } 680 681 return $response; 682 } 683 684 /** 685 * {@inheritdoc} 686 */ 687 public function writeRequest(CommandInterface $command) 688 { 689 $this->retryCommandOnFailure($command, __FUNCTION__); 690 } 691 692 /** 693 * {@inheritdoc} 694 */ 695 public function readResponse(CommandInterface $command) 696 { 697 return $this->retryCommandOnFailure($command, __FUNCTION__); 698 } 699 700 /** 701 * {@inheritdoc} 702 */ 703 public function executeCommand(CommandInterface $command) 704 { 705 return $this->retryCommandOnFailure($command, __FUNCTION__); 706 } 707 708 /** 709 * Returns the underlying replication strategy. 710 * 711 * @return ReplicationStrategy 712 */ 713 public function getReplicationStrategy() 714 { 715 return $this->strategy; 716 } 717 718 /** 719 * {@inheritdoc} 720 */ 721 public function __sleep() 722 { 723 return array( 724 'master', 'slaves', 'service', 'sentinels', 'connectionFactory', 'strategy', 725 ); 726 } 727} 728