1<?php 2 3namespace Doctrine\DBAL\Sharding; 4 5use Doctrine\Common\EventManager; 6use Doctrine\DBAL\Configuration; 7use Doctrine\DBAL\Connection; 8use Doctrine\DBAL\Driver; 9use Doctrine\DBAL\Driver\Connection as DriverConnection; 10use Doctrine\DBAL\Event\ConnectionEventArgs; 11use Doctrine\DBAL\Events; 12use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser; 13use InvalidArgumentException; 14use function array_merge; 15use function is_numeric; 16use function is_string; 17 18/** 19 * Sharding implementation that pools many different connections 20 * internally and serves data from the currently active connection. 21 * 22 * The internals of this class are: 23 * 24 * - All sharding clients are specified and given a shard-id during 25 * configuration. 26 * - By default, the global shard is selected. If no global shard is configured 27 * an exception is thrown on access. 28 * - Selecting a shard by distribution value delegates the mapping 29 * "distributionValue" => "client" to the ShardChoser interface. 30 * - An exception is thrown if trying to switch shards during an open 31 * transaction. 32 * 33 * Instantiation through the DriverManager looks like: 34 * 35 * @example 36 * 37 * $conn = DriverManager::getConnection(array( 38 * 'wrapperClass' => 'Doctrine\DBAL\Sharding\PoolingShardConnection', 39 * 'driver' => 'pdo_mysql', 40 * 'global' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), 41 * 'shards' => array( 42 * array('id' => 1, 'user' => 'slave1', 'password', 'host' => '', 'dbname' => ''), 43 * array('id' => 2, 'user' => 'slave2', 'password', 'host' => '', 'dbname' => ''), 44 * ), 45 * 'shardChoser' => 'Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser', 46 * )); 47 * $shardManager = $conn->getShardManager(); 48 * $shardManager->selectGlobal(); 49 * $shardManager->selectShard($value); 50 */ 51class PoolingShardConnection extends Connection 52{ 53 /** @var DriverConnection[] */ 54 private $activeConnections = []; 55 56 /** @var string|int|null */ 57 private $activeShardId; 58 59 /** @var mixed[] */ 60 private $connectionParameters = []; 61 62 /** 63 * {@inheritDoc} 64 * 65 * @throws InvalidArgumentException 66 */ 67 public function __construct(array $params, Driver $driver, ?Configuration $config = null, ?EventManager $eventManager = null) 68 { 69 if (! isset($params['global'], $params['shards'])) { 70 throw new InvalidArgumentException("Connection Parameters require 'global' and 'shards' configurations."); 71 } 72 73 if (! isset($params['shardChoser'])) { 74 throw new InvalidArgumentException("Missing Shard Choser configuration 'shardChoser'"); 75 } 76 77 if (is_string($params['shardChoser'])) { 78 $params['shardChoser'] = new $params['shardChoser'](); 79 } 80 81 if (! ($params['shardChoser'] instanceof ShardChoser)) { 82 throw new InvalidArgumentException("The 'shardChoser' configuration is not a valid instance of Doctrine\DBAL\Sharding\ShardChoser\ShardChoser"); 83 } 84 85 $this->connectionParameters[0] = array_merge($params, $params['global']); 86 87 foreach ($params['shards'] as $shard) { 88 if (! isset($shard['id'])) { 89 throw new InvalidArgumentException("Missing 'id' for one configured shard. Please specify a unique shard-id."); 90 } 91 92 if (! is_numeric($shard['id']) || $shard['id'] < 1) { 93 throw new InvalidArgumentException('Shard Id has to be a non-negative number.'); 94 } 95 96 if (isset($this->connectionParameters[$shard['id']])) { 97 throw new InvalidArgumentException('Shard ' . $shard['id'] . ' is duplicated in the configuration.'); 98 } 99 100 $this->connectionParameters[$shard['id']] = array_merge($params, $shard); 101 } 102 103 parent::__construct($params, $driver, $config, $eventManager); 104 } 105 106 /** 107 * Get active shard id. 108 * 109 * @return string|int|null 110 */ 111 public function getActiveShardId() 112 { 113 return $this->activeShardId; 114 } 115 116 /** 117 * {@inheritdoc} 118 */ 119 public function getParams() 120 { 121 return $this->activeShardId ? $this->connectionParameters[$this->activeShardId] : $this->connectionParameters[0]; 122 } 123 124 /** 125 * {@inheritdoc} 126 */ 127 public function getHost() 128 { 129 $params = $this->getParams(); 130 131 return $params['host'] ?? parent::getHost(); 132 } 133 134 /** 135 * {@inheritdoc} 136 */ 137 public function getPort() 138 { 139 $params = $this->getParams(); 140 141 return $params['port'] ?? parent::getPort(); 142 } 143 144 /** 145 * {@inheritdoc} 146 */ 147 public function getUsername() 148 { 149 $params = $this->getParams(); 150 151 return $params['user'] ?? parent::getUsername(); 152 } 153 154 /** 155 * {@inheritdoc} 156 */ 157 public function getPassword() 158 { 159 $params = $this->getParams(); 160 161 return $params['password'] ?? parent::getPassword(); 162 } 163 164 /** 165 * Connects to a given shard. 166 * 167 * @param string|int|null $shardId 168 * 169 * @return bool 170 * 171 * @throws ShardingException 172 */ 173 public function connect($shardId = null) 174 { 175 if ($shardId === null && $this->_conn) { 176 return false; 177 } 178 179 if ($shardId !== null && $shardId === $this->activeShardId) { 180 return false; 181 } 182 183 if ($this->getTransactionNestingLevel() > 0) { 184 throw new ShardingException('Cannot switch shard when transaction is active.'); 185 } 186 187 $activeShardId = $this->activeShardId = (int) $shardId; 188 189 if (isset($this->activeConnections[$activeShardId])) { 190 $this->_conn = $this->activeConnections[$activeShardId]; 191 192 return false; 193 } 194 195 $this->_conn = $this->activeConnections[$activeShardId] = $this->connectTo($activeShardId); 196 197 if ($this->_eventManager->hasListeners(Events::postConnect)) { 198 $eventArgs = new ConnectionEventArgs($this); 199 $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); 200 } 201 202 return true; 203 } 204 205 /** 206 * Connects to a specific connection. 207 * 208 * @param string|int $shardId 209 * 210 * @return \Doctrine\DBAL\Driver\Connection 211 */ 212 protected function connectTo($shardId) 213 { 214 $params = $this->getParams(); 215 216 $driverOptions = $params['driverOptions'] ?? []; 217 218 $connectionParams = $this->connectionParameters[$shardId]; 219 220 $user = $connectionParams['user'] ?? null; 221 $password = $connectionParams['password'] ?? null; 222 223 return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); 224 } 225 226 /** 227 * @param string|int|null $shardId 228 * 229 * @return bool 230 */ 231 public function isConnected($shardId = null) 232 { 233 if ($shardId === null) { 234 return $this->_conn !== null; 235 } 236 237 return isset($this->activeConnections[$shardId]); 238 } 239 240 /** 241 * @return void 242 */ 243 public function close() 244 { 245 $this->_conn = null; 246 $this->activeConnections = []; 247 $this->activeShardId = null; 248 } 249} 250