1<?php 2 3namespace Doctrine\DBAL\Sharding; 4 5use Doctrine\Common\EventManager; 6use Doctrine\DBAL\Configuration; 7use Doctrine\DBAL\Connection; 8use Doctrine\DBAL\Driver; 9use Doctrine\DBAL\Driver\Connection as DriverConnection; 10use Doctrine\DBAL\Event\ConnectionEventArgs; 11use Doctrine\DBAL\Events; 12use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser; 13use InvalidArgumentException; 14 15use function array_merge; 16use function is_numeric; 17use function is_string; 18 19/** 20 * Sharding implementation that pools many different connections 21 * internally and serves data from the currently active connection. 22 * 23 * The internals of this class are: 24 * 25 * - All sharding clients are specified and given a shard-id during 26 * configuration. 27 * - By default, the global shard is selected. If no global shard is configured 28 * an exception is thrown on access. 29 * - Selecting a shard by distribution value delegates the mapping 30 * "distributionValue" => "client" to the ShardChoser interface. 31 * - An exception is thrown if trying to switch shards during an open 32 * transaction. 33 * 34 * Instantiation through the DriverManager looks like: 35 * 36 * @deprecated 37 * 38 * @example 39 * 40 * $conn = DriverManager::getConnection(array( 41 * 'wrapperClass' => 'Doctrine\DBAL\Sharding\PoolingShardConnection', 42 * 'driver' => 'pdo_mysql', 43 * 'global' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), 44 * 'shards' => array( 45 * array('id' => 1, 'user' => 'slave1', 'password', 'host' => '', 'dbname' => ''), 46 * array('id' => 2, 'user' => 'slave2', 'password', 'host' => '', 'dbname' => ''), 47 * ), 48 * 'shardChoser' => 'Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser', 49 * )); 50 * $shardManager = $conn->getShardManager(); 51 * $shardManager->selectGlobal(); 52 * $shardManager->selectShard($value); 53 */ 54class PoolingShardConnection extends Connection 55{ 56 /** @var DriverConnection[] */ 57 private $activeConnections = []; 58 59 /** @var string|int|null */ 60 private $activeShardId; 61 62 /** @var mixed[] */ 63 private $connectionParameters = []; 64 65 /** 66 * {@inheritDoc} 67 * 68 * @internal The connection can be only instantiated by the driver manager. 69 * 70 * @throws InvalidArgumentException 71 */ 72 public function __construct( 73 array $params, 74 Driver $driver, 75 ?Configuration $config = null, 76 ?EventManager $eventManager = null 77 ) { 78 if (! isset($params['global'], $params['shards'])) { 79 throw new InvalidArgumentException("Connection Parameters require 'global' and 'shards' configurations."); 80 } 81 82 if (! isset($params['shardChoser'])) { 83 throw new InvalidArgumentException("Missing Shard Choser configuration 'shardChoser'"); 84 } 85 86 if (is_string($params['shardChoser'])) { 87 $params['shardChoser'] = new $params['shardChoser'](); 88 } 89 90 if (! ($params['shardChoser'] instanceof ShardChoser)) { 91 throw new InvalidArgumentException( 92 "The 'shardChoser' configuration is not a valid instance of " . ShardChoser::class 93 ); 94 } 95 96 $this->connectionParameters[0] = array_merge($params, $params['global']); 97 98 foreach ($params['shards'] as $shard) { 99 if (! isset($shard['id'])) { 100 throw new InvalidArgumentException( 101 "Missing 'id' for one configured shard. Please specify a unique shard-id." 102 ); 103 } 104 105 if (! is_numeric($shard['id']) || $shard['id'] < 1) { 106 throw new InvalidArgumentException('Shard Id has to be a non-negative number.'); 107 } 108 109 if (isset($this->connectionParameters[$shard['id']])) { 110 throw new InvalidArgumentException('Shard ' . $shard['id'] . ' is duplicated in the configuration.'); 111 } 112 113 $this->connectionParameters[$shard['id']] = array_merge($params, $shard); 114 } 115 116 parent::__construct($params, $driver, $config, $eventManager); 117 } 118 119 /** 120 * Get active shard id. 121 * 122 * @return string|int|null 123 */ 124 public function getActiveShardId() 125 { 126 return $this->activeShardId; 127 } 128 129 /** 130 * {@inheritdoc} 131 */ 132 public function getParams() 133 { 134 return $this->activeShardId 135 ? $this->connectionParameters[$this->activeShardId] 136 : $this->connectionParameters[0]; 137 } 138 139 /** 140 * {@inheritdoc} 141 */ 142 public function getHost() 143 { 144 $params = $this->getParams(); 145 146 return $params['host'] ?? parent::getHost(); 147 } 148 149 /** 150 * {@inheritdoc} 151 */ 152 public function getPort() 153 { 154 $params = $this->getParams(); 155 156 return $params['port'] ?? parent::getPort(); 157 } 158 159 /** 160 * {@inheritdoc} 161 */ 162 public function getUsername() 163 { 164 $params = $this->getParams(); 165 166 return $params['user'] ?? parent::getUsername(); 167 } 168 169 /** 170 * {@inheritdoc} 171 */ 172 public function getPassword() 173 { 174 $params = $this->getParams(); 175 176 return $params['password'] ?? parent::getPassword(); 177 } 178 179 /** 180 * Connects to a given shard. 181 * 182 * @param string|int|null $shardId 183 * 184 * @return bool 185 * 186 * @throws ShardingException 187 */ 188 public function connect($shardId = null) 189 { 190 if ($shardId === null && $this->_conn) { 191 return false; 192 } 193 194 if ($shardId !== null && $shardId === $this->activeShardId) { 195 return false; 196 } 197 198 if ($this->getTransactionNestingLevel() > 0) { 199 throw new ShardingException('Cannot switch shard when transaction is active.'); 200 } 201 202 $activeShardId = $this->activeShardId = (int) $shardId; 203 204 if (isset($this->activeConnections[$activeShardId])) { 205 $this->_conn = $this->activeConnections[$activeShardId]; 206 207 return false; 208 } 209 210 $this->_conn = $this->activeConnections[$activeShardId] = $this->connectTo($activeShardId); 211 212 if ($this->_eventManager->hasListeners(Events::postConnect)) { 213 $eventArgs = new ConnectionEventArgs($this); 214 $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); 215 } 216 217 return true; 218 } 219 220 /** 221 * Connects to a specific connection. 222 * 223 * @param string|int $shardId 224 * 225 * @return \Doctrine\DBAL\Driver\Connection 226 */ 227 protected function connectTo($shardId) 228 { 229 $params = $this->getParams(); 230 231 $driverOptions = $params['driverOptions'] ?? []; 232 233 $connectionParams = $this->connectionParameters[$shardId]; 234 235 $user = $connectionParams['user'] ?? null; 236 $password = $connectionParams['password'] ?? null; 237 238 return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); 239 } 240 241 /** 242 * @param string|int|null $shardId 243 * 244 * @return bool 245 */ 246 public function isConnected($shardId = null) 247 { 248 if ($shardId === null) { 249 return $this->_conn !== null; 250 } 251 252 return isset($this->activeConnections[$shardId]); 253 } 254 255 /** 256 * @return void 257 */ 258 public function close() 259 { 260 $this->_conn = null; 261 $this->activeConnections = []; 262 $this->activeShardId = null; 263 } 264} 265