1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis\Connection\Aggregate;
13
14use Predis\Command\CommandInterface;
15use Predis\Command\RawCommand;
16use Predis\CommunicationException;
17use Predis\Connection\ConnectionException;
18use Predis\Connection\FactoryInterface as ConnectionFactoryInterface;
19use Predis\Connection\NodeConnectionInterface;
20use Predis\Connection\Parameters;
21use Predis\Replication\ReplicationStrategy;
22use Predis\Replication\RoleException;
23use Predis\Response\ErrorInterface as ErrorResponseInterface;
24use Predis\Response\ServerException;
25
26/**
27 * @author Daniele Alessandri <suppakilla@gmail.com>
28 * @author Ville Mattila <ville@eventio.fi>
29 */
30class SentinelReplication implements ReplicationInterface
31{
32    /**
33     * @var NodeConnectionInterface
34     */
35    protected $master;
36
37    /**
38     * @var NodeConnectionInterface[]
39     */
40    protected $slaves = array();
41
42    /**
43     * @var NodeConnectionInterface
44     */
45    protected $current;
46
47    /**
48     * @var string
49     */
50    protected $service;
51
52    /**
53     * @var ConnectionFactoryInterface
54     */
55    protected $connectionFactory;
56
57    /**
58     * @var ReplicationStrategy
59     */
60    protected $strategy;
61
62    /**
63     * @var NodeConnectionInterface[]
64     */
65    protected $sentinels = array();
66
67    /**
68     * @var NodeConnectionInterface
69     */
70    protected $sentinelConnection;
71
72    /**
73     * @var float
74     */
75    protected $sentinelTimeout = 0.100;
76
77    /**
78     * Max number of automatic retries of commands upon server failure.
79     *
80     * -1 = unlimited retry attempts
81     *  0 = no retry attempts (fails immediatly)
82     *  n = fail only after n retry attempts
83     *
84     * @var int
85     */
86    protected $retryLimit = 20;
87
88    /**
89     * Time to wait in milliseconds before fetching a new configuration from one
90     * of the sentinel servers.
91     *
92     * @var int
93     */
94    protected $retryWait = 1000;
95
96    /**
97     * Flag for automatic fetching of available sentinels.
98     *
99     * @var bool
100     */
101    protected $updateSentinels = false;
102
103    /**
104     * @param string                     $service           Name of the service for autodiscovery.
105     * @param array                      $sentinels         Sentinel servers connection parameters.
106     * @param ConnectionFactoryInterface $connectionFactory Connection factory instance.
107     * @param ReplicationStrategy        $strategy          Replication strategy instance.
108     */
109    public function __construct(
110        $service,
111        array $sentinels,
112        ConnectionFactoryInterface $connectionFactory,
113        ReplicationStrategy $strategy = null
114    ) {
115        $this->sentinels = $sentinels;
116        $this->service = $service;
117        $this->connectionFactory = $connectionFactory;
118        $this->strategy = $strategy ?: new ReplicationStrategy();
119    }
120
121    /**
122     * Sets a default timeout for connections to sentinels.
123     *
124     * When "timeout" is present in the connection parameters of sentinels, its
125     * value overrides the default sentinel timeout.
126     *
127     * @param float $timeout Timeout value.
128     */
129    public function setSentinelTimeout($timeout)
130    {
131        $this->sentinelTimeout = (float) $timeout;
132    }
133
134    /**
135     * Sets the maximum number of retries for commands upon server failure.
136     *
137     * -1 = unlimited retry attempts
138     *  0 = no retry attempts (fails immediatly)
139     *  n = fail only after n retry attempts
140     *
141     * @param int $retry Number of retry attempts.
142     */
143    public function setRetryLimit($retry)
144    {
145        $this->retryLimit = (int) $retry;
146    }
147
148    /**
149     * Sets the time to wait (in seconds) before fetching a new configuration
150     * from one of the sentinels.
151     *
152     * @param float $seconds Time to wait before the next attempt.
153     */
154    public function setRetryWait($seconds)
155    {
156        $this->retryWait = (float) $seconds;
157    }
158
159    /**
160     * Set automatic fetching of available sentinels.
161     *
162     * @param bool $update Enable or disable automatic updates.
163     */
164    public function setUpdateSentinels($update)
165    {
166        $this->updateSentinels = (bool) $update;
167    }
168
169    /**
170     * Resets the current connection.
171     */
172    protected function reset()
173    {
174        $this->current = null;
175    }
176
177    /**
178     * Wipes the current list of master and slaves nodes.
179     */
180    protected function wipeServerList()
181    {
182        $this->reset();
183
184        $this->master = null;
185        $this->slaves = array();
186    }
187
188    /**
189     * {@inheritdoc}
190     */
191    public function add(NodeConnectionInterface $connection)
192    {
193        $alias = $connection->getParameters()->alias;
194
195        if ($alias === 'master') {
196            $this->master = $connection;
197        } else {
198            $this->slaves[$alias ?: count($this->slaves)] = $connection;
199        }
200
201        $this->reset();
202    }
203
204    /**
205     * {@inheritdoc}
206     */
207    public function remove(NodeConnectionInterface $connection)
208    {
209        if ($connection === $this->master) {
210            $this->master = null;
211            $this->reset();
212
213            return true;
214        }
215
216        if (false !== $id = array_search($connection, $this->slaves, true)) {
217            unset($this->slaves[$id]);
218            $this->reset();
219
220            return true;
221        }
222
223        return false;
224    }
225
226    /**
227     * Creates a new connection to a sentinel server.
228     *
229     * @return NodeConnectionInterface
230     */
231    protected function createSentinelConnection($parameters)
232    {
233        if ($parameters instanceof NodeConnectionInterface) {
234            return $parameters;
235        }
236
237        if (is_string($parameters)) {
238            $parameters = Parameters::parse($parameters);
239        }
240
241        if (is_array($parameters)) {
242            // NOTE: sentinels do not accept AUTH and SELECT commands so we must
243            // explicitly set them to NULL to avoid problems when using default
244            // parameters set via client options. Actually AUTH is supported for
245            // sentinels starting with Redis 5 but we have to differentiate from
246            // sentinels passwords and nodes passwords, this will be implemented
247            // in a later release.
248            $parameters['database'] = null;
249            $parameters['username'] = null;
250            $parameters['password'] = null;
251
252            if (!isset($parameters['timeout'])) {
253                $parameters['timeout'] = $this->sentinelTimeout;
254            }
255        }
256
257        $connection = $this->connectionFactory->create($parameters);
258
259        return $connection;
260    }
261
262    /**
263     * Returns the current sentinel connection.
264     *
265     * If there is no active sentinel connection, a new connection is created.
266     *
267     * @return NodeConnectionInterface
268     */
269    public function getSentinelConnection()
270    {
271        if (!$this->sentinelConnection) {
272            if (!$this->sentinels) {
273                throw new \Predis\ClientException('No sentinel server available for autodiscovery.');
274            }
275
276            $sentinel = array_shift($this->sentinels);
277            $this->sentinelConnection = $this->createSentinelConnection($sentinel);
278        }
279
280        return $this->sentinelConnection;
281    }
282
283    /**
284     * Fetches an updated list of sentinels from a sentinel.
285     */
286    public function updateSentinels()
287    {
288        SENTINEL_QUERY: {
289            $sentinel = $this->getSentinelConnection();
290
291            try {
292                $payload = $sentinel->executeCommand(
293                    RawCommand::create('SENTINEL', 'sentinels', $this->service)
294                );
295
296                $this->sentinels = array();
297                // NOTE: sentinel server does not return itself, so we add it back.
298                $this->sentinels[] = $sentinel->getParameters()->toArray();
299
300                foreach ($payload as $sentinel) {
301                    $this->sentinels[] = array(
302                        'host' => $sentinel[3],
303                        'port' => $sentinel[5],
304                    );
305                }
306            } catch (ConnectionException $exception) {
307                $this->sentinelConnection = null;
308
309                goto SENTINEL_QUERY;
310            }
311        }
312    }
313
314    /**
315     * Fetches the details for the master and slave servers from a sentinel.
316     */
317    public function querySentinel()
318    {
319        $this->wipeServerList();
320
321        $this->updateSentinels();
322        $this->getMaster();
323        $this->getSlaves();
324    }
325
326    /**
327     * Handles error responses returned by redis-sentinel.
328     *
329     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
330     * @param ErrorResponseInterface  $error    Error response.
331     */
332    private function handleSentinelErrorResponse(NodeConnectionInterface $sentinel, ErrorResponseInterface $error)
333    {
334        if ($error->getErrorType() === 'IDONTKNOW') {
335            throw new ConnectionException($sentinel, $error->getMessage());
336        } else {
337            throw new ServerException($error->getMessage());
338        }
339    }
340
341    /**
342     * Fetches the details for the master server from a sentinel.
343     *
344     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
345     * @param string                  $service  Name of the service.
346     *
347     * @return array
348     */
349    protected function querySentinelForMaster(NodeConnectionInterface $sentinel, $service)
350    {
351        $payload = $sentinel->executeCommand(
352            RawCommand::create('SENTINEL', 'get-master-addr-by-name', $service)
353        );
354
355        if ($payload === null) {
356            throw new ServerException('ERR No such master with that name');
357        }
358
359        if ($payload instanceof ErrorResponseInterface) {
360            $this->handleSentinelErrorResponse($sentinel, $payload);
361        }
362
363        return array(
364            'host' => $payload[0],
365            'port' => $payload[1],
366            'alias' => 'master',
367        );
368    }
369
370    /**
371     * Fetches the details for the slave servers from a sentinel.
372     *
373     * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
374     * @param string                  $service  Name of the service.
375     *
376     * @return array
377     */
378    protected function querySentinelForSlaves(NodeConnectionInterface $sentinel, $service)
379    {
380        $slaves = array();
381
382        $payload = $sentinel->executeCommand(
383            RawCommand::create('SENTINEL', 'slaves', $service)
384        );
385
386        if ($payload instanceof ErrorResponseInterface) {
387            $this->handleSentinelErrorResponse($sentinel, $payload);
388        }
389
390        foreach ($payload as $slave) {
391            $flags = explode(',', $slave[9]);
392
393            if (array_intersect($flags, array('s_down', 'o_down', 'disconnected'))) {
394                continue;
395            }
396
397            $slaves[] = array(
398                'host' => $slave[3],
399                'port' => $slave[5],
400                'alias' => "slave-$slave[1]",
401            );
402        }
403
404        return $slaves;
405    }
406
407    /**
408     * {@inheritdoc}
409     */
410    public function getCurrent()
411    {
412        return $this->current;
413    }
414
415    /**
416     * {@inheritdoc}
417     */
418    public function getMaster()
419    {
420        if ($this->master) {
421            return $this->master;
422        }
423
424        if ($this->updateSentinels) {
425            $this->updateSentinels();
426        }
427
428        SENTINEL_QUERY: {
429            $sentinel = $this->getSentinelConnection();
430
431            try {
432                $masterParameters = $this->querySentinelForMaster($sentinel, $this->service);
433                $masterConnection = $this->connectionFactory->create($masterParameters);
434
435                $this->add($masterConnection);
436            } catch (ConnectionException $exception) {
437                $this->sentinelConnection = null;
438
439                goto SENTINEL_QUERY;
440            }
441        }
442
443        return $masterConnection;
444    }
445
446    /**
447     * {@inheritdoc}
448     */
449    public function getSlaves()
450    {
451        if ($this->slaves) {
452            return array_values($this->slaves);
453        }
454
455        if ($this->updateSentinels) {
456            $this->updateSentinels();
457        }
458
459        SENTINEL_QUERY: {
460            $sentinel = $this->getSentinelConnection();
461
462            try {
463                $slavesParameters = $this->querySentinelForSlaves($sentinel, $this->service);
464
465                foreach ($slavesParameters as $slaveParameters) {
466                    $this->add($this->connectionFactory->create($slaveParameters));
467                }
468            } catch (ConnectionException $exception) {
469                $this->sentinelConnection = null;
470
471                goto SENTINEL_QUERY;
472            }
473        }
474
475        return array_values($this->slaves ?: array());
476    }
477
478    /**
479     * Returns a random slave.
480     *
481     * @return NodeConnectionInterface
482     */
483    protected function pickSlave()
484    {
485        if ($slaves = $this->getSlaves()) {
486            return $slaves[rand(1, count($slaves)) - 1];
487        }
488    }
489
490    /**
491     * Returns the connection instance in charge for the given command.
492     *
493     * @param CommandInterface $command Command instance.
494     *
495     * @return NodeConnectionInterface
496     */
497    private function getConnectionInternal(CommandInterface $command)
498    {
499        if (!$this->current) {
500            if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
501                $this->current = $slave;
502            } else {
503                $this->current = $this->getMaster();
504            }
505
506            return $this->current;
507        }
508
509        if ($this->current === $this->master) {
510            return $this->current;
511        }
512
513        if (!$this->strategy->isReadOperation($command)) {
514            $this->current = $this->getMaster();
515        }
516
517        return $this->current;
518    }
519
520    /**
521     * Asserts that the specified connection matches an expected role.
522     *
523     * @param NodeConnectionInterface $connection Connection to a redis server.
524     * @param string                  $role       Expected role of the server ("master", "slave" or "sentinel").
525     *
526     * @throws RoleException
527     */
528    protected function assertConnectionRole(NodeConnectionInterface $connection, $role)
529    {
530        $role = strtolower($role);
531        $actualRole = $connection->executeCommand(RawCommand::create('ROLE'));
532
533        if ($role !== $actualRole[0]) {
534            throw new RoleException($connection, "Expected $role but got $actualRole[0] [$connection]");
535        }
536    }
537
538    /**
539     * {@inheritdoc}
540     */
541    public function getConnection(CommandInterface $command)
542    {
543        $connection = $this->getConnectionInternal($command);
544
545        if (!$connection->isConnected()) {
546            // When we do not have any available slave in the pool we can expect
547            // read-only operations to hit the master server.
548            $expectedRole = $this->strategy->isReadOperation($command) && $this->slaves ? 'slave' : 'master';
549            $this->assertConnectionRole($connection, $expectedRole);
550        }
551
552        return $connection;
553    }
554
555    /**
556     * {@inheritdoc}
557     */
558    public function getConnectionById($connectionId)
559    {
560        if ($connectionId === 'master') {
561            return $this->getMaster();
562        }
563
564        $this->getSlaves();
565
566        if (isset($this->slaves[$connectionId])) {
567            return $this->slaves[$connectionId];
568        }
569    }
570
571    /**
572     * {@inheritdoc}
573     */
574    public function switchTo($connection)
575    {
576        if (!$connection instanceof NodeConnectionInterface) {
577            $connection = $this->getConnectionById($connection);
578        }
579
580        if ($connection && $connection === $this->current) {
581            return;
582        }
583
584        if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
585            throw new \InvalidArgumentException('Invalid connection or connection not found.');
586        }
587
588        $connection->connect();
589
590        if ($this->current) {
591            $this->current->disconnect();
592        }
593
594        $this->current = $connection;
595    }
596
597    /**
598     * Switches to the master server.
599     */
600    public function switchToMaster()
601    {
602        $this->switchTo('master');
603    }
604
605    /**
606     * Switches to a random slave server.
607     */
608    public function switchToSlave()
609    {
610        $connection = $this->pickSlave();
611        $this->switchTo($connection);
612    }
613
614    /**
615     * {@inheritdoc}
616     */
617    public function isConnected()
618    {
619        return $this->current ? $this->current->isConnected() : false;
620    }
621
622    /**
623     * {@inheritdoc}
624     */
625    public function connect()
626    {
627        if (!$this->current) {
628            if (!$this->current = $this->pickSlave()) {
629                $this->current = $this->getMaster();
630            }
631        }
632
633        $this->current->connect();
634    }
635
636    /**
637     * {@inheritdoc}
638     */
639    public function disconnect()
640    {
641        if ($this->master) {
642            $this->master->disconnect();
643        }
644
645        foreach ($this->slaves as $connection) {
646            $connection->disconnect();
647        }
648    }
649
650    /**
651     * Retries the execution of a command upon server failure after asking a new
652     * configuration to one of the sentinels.
653     *
654     * @param CommandInterface $command Command instance.
655     * @param string           $method  Actual method.
656     *
657     * @return mixed
658     */
659    private function retryCommandOnFailure(CommandInterface $command, $method)
660    {
661        $retries = 0;
662
663        SENTINEL_RETRY: {
664            try {
665                $response = $this->getConnection($command)->$method($command);
666            } catch (CommunicationException $exception) {
667                $this->wipeServerList();
668                $exception->getConnection()->disconnect();
669
670                if ($retries == $this->retryLimit) {
671                    throw $exception;
672                }
673
674                usleep($this->retryWait * 1000);
675
676                ++$retries;
677                goto SENTINEL_RETRY;
678            }
679        }
680
681        return $response;
682    }
683
684    /**
685     * {@inheritdoc}
686     */
687    public function writeRequest(CommandInterface $command)
688    {
689        $this->retryCommandOnFailure($command, __FUNCTION__);
690    }
691
692    /**
693     * {@inheritdoc}
694     */
695    public function readResponse(CommandInterface $command)
696    {
697        return $this->retryCommandOnFailure($command, __FUNCTION__);
698    }
699
700    /**
701     * {@inheritdoc}
702     */
703    public function executeCommand(CommandInterface $command)
704    {
705        return $this->retryCommandOnFailure($command, __FUNCTION__);
706    }
707
708    /**
709     * Returns the underlying replication strategy.
710     *
711     * @return ReplicationStrategy
712     */
713    public function getReplicationStrategy()
714    {
715        return $this->strategy;
716    }
717
718    /**
719     * {@inheritdoc}
720     */
721    public function __sleep()
722    {
723        return array(
724            'master', 'slaves', 'service', 'sentinels', 'connectionFactory', 'strategy',
725        );
726    }
727}
728