1<?php
2
3/*
4 * This file is part of the Predis package.
5 *
6 * (c) Daniele Alessandri <suppakilla@gmail.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Predis\Pipeline;
13
14use Predis\CommunicationException;
15use Predis\Connection\Aggregate\ClusterInterface;
16use Predis\Connection\ConnectionInterface;
17use Predis\Connection\NodeConnectionInterface;
18use Predis\NotSupportedException;
19
20/**
21 * Command pipeline that does not throw exceptions on connection errors, but
22 * returns the exception instances as the rest of the response elements.
23 *
24 * @todo Awful naming!
25 *
26 * @author Daniele Alessandri <suppakilla@gmail.com>
27 */
28class ConnectionErrorProof extends Pipeline
29{
30    /**
31     * {@inheritdoc}
32     */
33    protected function getConnection()
34    {
35        return $this->getClient()->getConnection();
36    }
37
38    /**
39     * {@inheritdoc}
40     */
41    protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
42    {
43        if ($connection instanceof NodeConnectionInterface) {
44            return $this->executeSingleNode($connection, $commands);
45        } elseif ($connection instanceof ClusterInterface) {
46            return $this->executeCluster($connection, $commands);
47        } else {
48            $class = get_class($connection);
49
50            throw new NotSupportedException("The connection class '$class' is not supported.");
51        }
52    }
53
54    /**
55     * {@inheritdoc}
56     */
57    protected function executeSingleNode(NodeConnectionInterface $connection, \SplQueue $commands)
58    {
59        $responses = array();
60        $sizeOfPipe = count($commands);
61
62        foreach ($commands as $command) {
63            try {
64                $connection->writeRequest($command);
65            } catch (CommunicationException $exception) {
66                return array_fill(0, $sizeOfPipe, $exception);
67            }
68        }
69
70        for ($i = 0; $i < $sizeOfPipe; ++$i) {
71            $command = $commands->dequeue();
72
73            try {
74                $responses[$i] = $connection->readResponse($command);
75            } catch (CommunicationException $exception) {
76                $add = count($commands) - count($responses);
77                $responses = array_merge($responses, array_fill(0, $add, $exception));
78
79                break;
80            }
81        }
82
83        return $responses;
84    }
85
86    /**
87     * {@inheritdoc}
88     */
89    protected function executeCluster(ClusterInterface $connection, \SplQueue $commands)
90    {
91        $responses = array();
92        $sizeOfPipe = count($commands);
93        $exceptions = array();
94
95        foreach ($commands as $command) {
96            $cmdConnection = $connection->getConnection($command);
97
98            if (isset($exceptions[spl_object_hash($cmdConnection)])) {
99                continue;
100            }
101
102            try {
103                $cmdConnection->writeRequest($command);
104            } catch (CommunicationException $exception) {
105                $exceptions[spl_object_hash($cmdConnection)] = $exception;
106            }
107        }
108
109        for ($i = 0; $i < $sizeOfPipe; ++$i) {
110            $command = $commands->dequeue();
111
112            $cmdConnection = $connection->getConnection($command);
113            $connectionHash = spl_object_hash($cmdConnection);
114
115            if (isset($exceptions[$connectionHash])) {
116                $responses[$i] = $exceptions[$connectionHash];
117                continue;
118            }
119
120            try {
121                $responses[$i] = $cmdConnection->readResponse($command);
122            } catch (CommunicationException $exception) {
123                $responses[$i] = $exception;
124                $exceptions[$connectionHash] = $exception;
125            }
126        }
127
128        return $responses;
129    }
130}
131