1<?php 2 3/* 4 * This file is part of the Predis package. 5 * 6 * (c) Daniele Alessandri <suppakilla@gmail.com> 7 * 8 * For the full copyright and license information, please view the LICENSE 9 * file that was distributed with this source code. 10 */ 11 12namespace Predis\Pipeline; 13 14use Predis\CommunicationException; 15use Predis\Connection\Aggregate\ClusterInterface; 16use Predis\Connection\ConnectionInterface; 17use Predis\Connection\NodeConnectionInterface; 18use Predis\NotSupportedException; 19 20/** 21 * Command pipeline that does not throw exceptions on connection errors, but 22 * returns the exception instances as the rest of the response elements. 23 * 24 * @todo Awful naming! 25 * 26 * @author Daniele Alessandri <suppakilla@gmail.com> 27 */ 28class ConnectionErrorProof extends Pipeline 29{ 30 /** 31 * {@inheritdoc} 32 */ 33 protected function getConnection() 34 { 35 return $this->getClient()->getConnection(); 36 } 37 38 /** 39 * {@inheritdoc} 40 */ 41 protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands) 42 { 43 if ($connection instanceof NodeConnectionInterface) { 44 return $this->executeSingleNode($connection, $commands); 45 } elseif ($connection instanceof ClusterInterface) { 46 return $this->executeCluster($connection, $commands); 47 } else { 48 $class = get_class($connection); 49 50 throw new NotSupportedException("The connection class '$class' is not supported."); 51 } 52 } 53 54 /** 55 * {@inheritdoc} 56 */ 57 protected function executeSingleNode(NodeConnectionInterface $connection, \SplQueue $commands) 58 { 59 $responses = array(); 60 $sizeOfPipe = count($commands); 61 62 foreach ($commands as $command) { 63 try { 64 $connection->writeRequest($command); 65 } catch (CommunicationException $exception) { 66 return array_fill(0, $sizeOfPipe, $exception); 67 } 68 } 69 70 for ($i = 0; $i < $sizeOfPipe; ++$i) { 71 $command = $commands->dequeue(); 72 73 try { 74 $responses[$i] = $connection->readResponse($command); 75 } catch (CommunicationException $exception) { 76 $add = count($commands) - count($responses); 77 $responses = array_merge($responses, array_fill(0, $add, $exception)); 78 79 break; 80 } 81 } 82 83 return $responses; 84 } 85 86 /** 87 * {@inheritdoc} 88 */ 89 protected function executeCluster(ClusterInterface $connection, \SplQueue $commands) 90 { 91 $responses = array(); 92 $sizeOfPipe = count($commands); 93 $exceptions = array(); 94 95 foreach ($commands as $command) { 96 $cmdConnection = $connection->getConnection($command); 97 98 if (isset($exceptions[spl_object_hash($cmdConnection)])) { 99 continue; 100 } 101 102 try { 103 $cmdConnection->writeRequest($command); 104 } catch (CommunicationException $exception) { 105 $exceptions[spl_object_hash($cmdConnection)] = $exception; 106 } 107 } 108 109 for ($i = 0; $i < $sizeOfPipe; ++$i) { 110 $command = $commands->dequeue(); 111 112 $cmdConnection = $connection->getConnection($command); 113 $connectionHash = spl_object_hash($cmdConnection); 114 115 if (isset($exceptions[$connectionHash])) { 116 $responses[$i] = $exceptions[$connectionHash]; 117 continue; 118 } 119 120 try { 121 $responses[$i] = $cmdConnection->readResponse($command); 122 } catch (CommunicationException $exception) { 123 $responses[$i] = $exception; 124 $exceptions[$connectionHash] = $exception; 125 } 126 } 127 128 return $responses; 129 } 130} 131