1<?php 2 3namespace Doctrine\DBAL\Sharding; 4 5use Doctrine\Common\EventManager; 6use Doctrine\DBAL\Configuration; 7use Doctrine\DBAL\Connection; 8use Doctrine\DBAL\Driver; 9use Doctrine\DBAL\Driver\Connection as DriverConnection; 10use Doctrine\DBAL\Event\ConnectionEventArgs; 11use Doctrine\DBAL\Events; 12use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser; 13use InvalidArgumentException; 14 15use function array_merge; 16use function is_numeric; 17use function is_string; 18 19/** 20 * Sharding implementation that pools many different connections 21 * internally and serves data from the currently active connection. 22 * 23 * The internals of this class are: 24 * 25 * - All sharding clients are specified and given a shard-id during 26 * configuration. 27 * - By default, the global shard is selected. If no global shard is configured 28 * an exception is thrown on access. 29 * - Selecting a shard by distribution value delegates the mapping 30 * "distributionValue" => "client" to the ShardChoser interface. 31 * - An exception is thrown if trying to switch shards during an open 32 * transaction. 33 * 34 * Instantiation through the DriverManager looks like: 35 * 36 * @example 37 * 38 * $conn = DriverManager::getConnection(array( 39 * 'wrapperClass' => 'Doctrine\DBAL\Sharding\PoolingShardConnection', 40 * 'driver' => 'pdo_mysql', 41 * 'global' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), 42 * 'shards' => array( 43 * array('id' => 1, 'user' => 'slave1', 'password', 'host' => '', 'dbname' => ''), 44 * array('id' => 2, 'user' => 'slave2', 'password', 'host' => '', 'dbname' => ''), 45 * ), 46 * 'shardChoser' => 'Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser', 47 * )); 48 * $shardManager = $conn->getShardManager(); 49 * $shardManager->selectGlobal(); 50 * $shardManager->selectShard($value); 51 */ 52class PoolingShardConnection extends Connection 53{ 54 /** @var DriverConnection[] */ 55 private $activeConnections = []; 56 57 /** @var string|int|null */ 58 private $activeShardId; 59 60 /** @var mixed[] */ 61 private $connectionParameters = []; 62 63 /** 64 * {@inheritDoc} 65 * 66 * @throws InvalidArgumentException 67 */ 68 public function __construct( 69 array $params, 70 Driver $driver, 71 ?Configuration $config = null, 72 ?EventManager $eventManager = null 73 ) { 74 if (! isset($params['global'], $params['shards'])) { 75 throw new InvalidArgumentException("Connection Parameters require 'global' and 'shards' configurations."); 76 } 77 78 if (! isset($params['shardChoser'])) { 79 throw new InvalidArgumentException("Missing Shard Choser configuration 'shardChoser'"); 80 } 81 82 if (is_string($params['shardChoser'])) { 83 $params['shardChoser'] = new $params['shardChoser'](); 84 } 85 86 if (! ($params['shardChoser'] instanceof ShardChoser)) { 87 throw new InvalidArgumentException( 88 "The 'shardChoser' configuration is not a valid instance of " . ShardChoser::class 89 ); 90 } 91 92 $this->connectionParameters[0] = array_merge($params, $params['global']); 93 94 foreach ($params['shards'] as $shard) { 95 if (! isset($shard['id'])) { 96 throw new InvalidArgumentException( 97 "Missing 'id' for one configured shard. Please specify a unique shard-id." 98 ); 99 } 100 101 if (! is_numeric($shard['id']) || $shard['id'] < 1) { 102 throw new InvalidArgumentException('Shard Id has to be a non-negative number.'); 103 } 104 105 if (isset($this->connectionParameters[$shard['id']])) { 106 throw new InvalidArgumentException('Shard ' . $shard['id'] . ' is duplicated in the configuration.'); 107 } 108 109 $this->connectionParameters[$shard['id']] = array_merge($params, $shard); 110 } 111 112 parent::__construct($params, $driver, $config, $eventManager); 113 } 114 115 /** 116 * Get active shard id. 117 * 118 * @return string|int|null 119 */ 120 public function getActiveShardId() 121 { 122 return $this->activeShardId; 123 } 124 125 /** 126 * {@inheritdoc} 127 */ 128 public function getParams() 129 { 130 return $this->activeShardId 131 ? $this->connectionParameters[$this->activeShardId] 132 : $this->connectionParameters[0]; 133 } 134 135 /** 136 * {@inheritdoc} 137 */ 138 public function getHost() 139 { 140 $params = $this->getParams(); 141 142 return $params['host'] ?? parent::getHost(); 143 } 144 145 /** 146 * {@inheritdoc} 147 */ 148 public function getPort() 149 { 150 $params = $this->getParams(); 151 152 return $params['port'] ?? parent::getPort(); 153 } 154 155 /** 156 * {@inheritdoc} 157 */ 158 public function getUsername() 159 { 160 $params = $this->getParams(); 161 162 return $params['user'] ?? parent::getUsername(); 163 } 164 165 /** 166 * {@inheritdoc} 167 */ 168 public function getPassword() 169 { 170 $params = $this->getParams(); 171 172 return $params['password'] ?? parent::getPassword(); 173 } 174 175 /** 176 * Connects to a given shard. 177 * 178 * @param string|int|null $shardId 179 * 180 * @return bool 181 * 182 * @throws ShardingException 183 */ 184 public function connect($shardId = null) 185 { 186 if ($shardId === null && $this->_conn) { 187 return false; 188 } 189 190 if ($shardId !== null && $shardId === $this->activeShardId) { 191 return false; 192 } 193 194 if ($this->getTransactionNestingLevel() > 0) { 195 throw new ShardingException('Cannot switch shard when transaction is active.'); 196 } 197 198 $activeShardId = $this->activeShardId = (int) $shardId; 199 200 if (isset($this->activeConnections[$activeShardId])) { 201 $this->_conn = $this->activeConnections[$activeShardId]; 202 203 return false; 204 } 205 206 $this->_conn = $this->activeConnections[$activeShardId] = $this->connectTo($activeShardId); 207 208 if ($this->_eventManager->hasListeners(Events::postConnect)) { 209 $eventArgs = new ConnectionEventArgs($this); 210 $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); 211 } 212 213 return true; 214 } 215 216 /** 217 * Connects to a specific connection. 218 * 219 * @param string|int $shardId 220 * 221 * @return \Doctrine\DBAL\Driver\Connection 222 */ 223 protected function connectTo($shardId) 224 { 225 $params = $this->getParams(); 226 227 $driverOptions = $params['driverOptions'] ?? []; 228 229 $connectionParams = $this->connectionParameters[$shardId]; 230 231 $user = $connectionParams['user'] ?? null; 232 $password = $connectionParams['password'] ?? null; 233 234 return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); 235 } 236 237 /** 238 * @param string|int|null $shardId 239 * 240 * @return bool 241 */ 242 public function isConnected($shardId = null) 243 { 244 if ($shardId === null) { 245 return $this->_conn !== null; 246 } 247 248 return isset($this->activeConnections[$shardId]); 249 } 250 251 /** 252 * @return void 253 */ 254 public function close() 255 { 256 $this->_conn = null; 257 $this->activeConnections = []; 258 $this->activeShardId = null; 259 } 260} 261