1<?php 2 3namespace Doctrine\DBAL\Connections; 4 5use Doctrine\Common\EventManager; 6use Doctrine\DBAL\Configuration; 7use Doctrine\DBAL\Connection; 8use Doctrine\DBAL\Driver; 9use Doctrine\DBAL\Driver\Connection as DriverConnection; 10use Doctrine\DBAL\Event\ConnectionEventArgs; 11use Doctrine\DBAL\Events; 12use InvalidArgumentException; 13 14use function array_rand; 15use function assert; 16use function count; 17use function func_get_args; 18 19/** 20 * Primary-Replica Connection 21 * 22 * Connection can be used with primary-replica setups. 23 * 24 * Important for the understanding of this connection should be how and when 25 * it picks the replica or primary. 26 * 27 * 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection' 28 * or 'executeQuery' is used. 29 * 2. Primary picked when 'exec', 'executeUpdate', 'executeStatement', 'insert', 'delete', 'update', 'createSavepoint', 30 * 'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or 31 * 'prepare' is called. 32 * 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards. 33 * 4. One replica connection is randomly picked ONCE during a request. 34 * 35 * ATTENTION: You can write to the replica with this connection if you execute a write query without 36 * opening up a transaction. For example: 37 * 38 * $conn = DriverManager::getConnection(...); 39 * $conn->executeQuery("DELETE FROM table"); 40 * 41 * Be aware that Connection#executeQuery is a method specifically for READ 42 * operations only. 43 * 44 * Use Connection#executeStatement for any SQL statement that changes/updates 45 * state in the database (UPDATE, INSERT, DELETE or DDL statements). 46 * 47 * This connection is limited to replica operations using the 48 * Connection#executeQuery operation only, because it wouldn't be compatible 49 * with the ORM or SchemaManager code otherwise. Both use all the other 50 * operations in a context where writes could happen to a replica, which makes 51 * this restricted approach necessary. 52 * 53 * You can manually connect to the primary at any time by calling: 54 * 55 * $conn->ensureConnectedToPrimary(); 56 * 57 * Instantiation through the DriverManager looks like: 58 * 59 * @psalm-import-type Params from \Doctrine\DBAL\DriverManager 60 * @example 61 * 62 * $conn = DriverManager::getConnection(array( 63 * 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection', 64 * 'driver' => 'pdo_mysql', 65 * 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), 66 * 'replica' => array( 67 * array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''), 68 * array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''), 69 * ) 70 * )); 71 * 72 * You can also pass 'driverOptions' and any other documented option to each of this drivers 73 * to pass additional information. 74 */ 75class PrimaryReadReplicaConnection extends Connection 76{ 77 /** 78 * Primary and Replica connection (one of the randomly picked replicas). 79 * 80 * @var DriverConnection[]|null[] 81 */ 82 protected $connections = ['primary' => null, 'replica' => null]; 83 84 /** 85 * You can keep the replica connection and then switch back to it 86 * during the request if you know what you are doing. 87 * 88 * @var bool 89 */ 90 protected $keepReplica = false; 91 92 /** 93 * Creates Primary Replica Connection. 94 * 95 * @internal The connection can be only instantiated by the driver manager. 96 * 97 * @param array<string,mixed> $params 98 * 99 * @throws InvalidArgumentException 100 * 101 * @phpstan-param array<string,mixed> $params 102 * @psalm-param Params $params 103 */ 104 public function __construct( 105 array $params, 106 Driver $driver, 107 ?Configuration $config = null, 108 ?EventManager $eventManager = null 109 ) { 110 if (! isset($params['replica'], $params['primary'])) { 111 throw new InvalidArgumentException('primary or replica configuration missing'); 112 } 113 114 if (count($params['replica']) === 0) { 115 throw new InvalidArgumentException('You have to configure at least one replica.'); 116 } 117 118 if (isset($params['driver'])) { 119 $params['primary']['driver'] = $params['driver']; 120 121 foreach ($params['replica'] as $replicaKey => $replica) { 122 $params['replica'][$replicaKey]['driver'] = $params['driver']; 123 } 124 } 125 126 $this->keepReplica = (bool) ($params['keepReplica'] ?? false); 127 128 parent::__construct($params, $driver, $config, $eventManager); 129 } 130 131 /** 132 * Checks if the connection is currently towards the primary or not. 133 */ 134 public function isConnectedToPrimary(): bool 135 { 136 return $this->_conn !== null && $this->_conn === $this->connections['primary']; 137 } 138 139 /** 140 * @param string|null $connectionName 141 * 142 * @return bool 143 */ 144 public function connect($connectionName = null) 145 { 146 if ($connectionName !== null) { 147 throw new InvalidArgumentException( 148 'Passing a connection name as first argument is not supported anymore.' 149 . ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.' 150 ); 151 } 152 153 return $this->performConnect(); 154 } 155 156 protected function performConnect(?string $connectionName = null): bool 157 { 158 $requestedConnectionChange = ($connectionName !== null); 159 $connectionName = $connectionName ?: 'replica'; 160 161 if ($connectionName !== 'replica' && $connectionName !== 'primary') { 162 throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.'); 163 } 164 165 // If we have a connection open, and this is not an explicit connection 166 // change request, then abort right here, because we are already done. 167 // This prevents writes to the replica in case of "keepReplica" option enabled. 168 if ($this->_conn !== null && ! $requestedConnectionChange) { 169 return false; 170 } 171 172 $forcePrimaryAsReplica = false; 173 174 if ($this->getTransactionNestingLevel() > 0) { 175 $connectionName = 'primary'; 176 $forcePrimaryAsReplica = true; 177 } 178 179 if (isset($this->connections[$connectionName])) { 180 $this->_conn = $this->connections[$connectionName]; 181 182 if ($forcePrimaryAsReplica && ! $this->keepReplica) { 183 $this->connections['replica'] = $this->_conn; 184 } 185 186 return false; 187 } 188 189 if ($connectionName === 'primary') { 190 $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName); 191 192 // Set replica connection to primary to avoid invalid reads 193 if (! $this->keepReplica) { 194 $this->connections['replica'] = $this->connections['primary']; 195 } 196 } else { 197 $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName); 198 } 199 200 if ($this->_eventManager->hasListeners(Events::postConnect)) { 201 $eventArgs = new ConnectionEventArgs($this); 202 $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); 203 } 204 205 return true; 206 } 207 208 /** 209 * Connects to the primary node of the database cluster. 210 * 211 * All following statements after this will be executed against the primary node. 212 */ 213 public function ensureConnectedToPrimary(): bool 214 { 215 return $this->performConnect('primary'); 216 } 217 218 /** 219 * Connects to a replica node of the database cluster. 220 * 221 * All following statements after this will be executed against the replica node, 222 * unless the keepReplica option is set to false and a primary connection 223 * was already opened. 224 */ 225 public function ensureConnectedToReplica(): bool 226 { 227 return $this->performConnect('replica'); 228 } 229 230 /** 231 * Connects to a specific connection. 232 * 233 * @param string $connectionName 234 * 235 * @return DriverConnection 236 */ 237 protected function connectTo($connectionName) 238 { 239 $params = $this->getParams(); 240 241 $driverOptions = $params['driverOptions'] ?? []; 242 243 $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params); 244 245 $user = $connectionParams['user'] ?? null; 246 $password = $connectionParams['password'] ?? null; 247 248 return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); 249 } 250 251 /** 252 * @param string $connectionName 253 * @param mixed[] $params 254 * 255 * @return mixed 256 */ 257 protected function chooseConnectionConfiguration($connectionName, $params) 258 { 259 if ($connectionName === 'primary') { 260 return $params['primary']; 261 } 262 263 $config = $params['replica'][array_rand($params['replica'])]; 264 265 if (! isset($config['charset']) && isset($params['primary']['charset'])) { 266 $config['charset'] = $params['primary']['charset']; 267 } 268 269 return $config; 270 } 271 272 /** 273 * {@inheritDoc} 274 * 275 * @deprecated Use {@link executeStatement()} instead. 276 */ 277 public function executeUpdate($sql, array $params = [], array $types = []) 278 { 279 $this->ensureConnectedToPrimary(); 280 281 return parent::executeUpdate($sql, $params, $types); 282 } 283 284 /** 285 * {@inheritDoc} 286 */ 287 public function executeStatement($sql, array $params = [], array $types = []) 288 { 289 $this->ensureConnectedToPrimary(); 290 291 return parent::executeStatement($sql, $params, $types); 292 } 293 294 /** 295 * {@inheritDoc} 296 */ 297 public function beginTransaction() 298 { 299 $this->ensureConnectedToPrimary(); 300 301 return parent::beginTransaction(); 302 } 303 304 /** 305 * {@inheritDoc} 306 */ 307 public function commit() 308 { 309 $this->ensureConnectedToPrimary(); 310 311 return parent::commit(); 312 } 313 314 /** 315 * {@inheritDoc} 316 */ 317 public function rollBack() 318 { 319 $this->ensureConnectedToPrimary(); 320 321 return parent::rollBack(); 322 } 323 324 /** 325 * {@inheritDoc} 326 */ 327 public function delete($table, array $criteria, array $types = []) 328 { 329 $this->ensureConnectedToPrimary(); 330 331 return parent::delete($table, $criteria, $types); 332 } 333 334 /** 335 * {@inheritDoc} 336 */ 337 public function close() 338 { 339 unset($this->connections['primary'], $this->connections['replica']); 340 341 parent::close(); 342 343 $this->_conn = null; 344 $this->connections = ['primary' => null, 'replica' => null]; 345 } 346 347 /** 348 * {@inheritDoc} 349 */ 350 public function update($table, array $data, array $criteria, array $types = []) 351 { 352 $this->ensureConnectedToPrimary(); 353 354 return parent::update($table, $data, $criteria, $types); 355 } 356 357 /** 358 * {@inheritDoc} 359 */ 360 public function insert($table, array $data, array $types = []) 361 { 362 $this->ensureConnectedToPrimary(); 363 364 return parent::insert($table, $data, $types); 365 } 366 367 /** 368 * {@inheritDoc} 369 */ 370 public function exec($statement) 371 { 372 $this->ensureConnectedToPrimary(); 373 374 return parent::exec($statement); 375 } 376 377 /** 378 * {@inheritDoc} 379 */ 380 public function createSavepoint($savepoint) 381 { 382 $this->ensureConnectedToPrimary(); 383 384 parent::createSavepoint($savepoint); 385 } 386 387 /** 388 * {@inheritDoc} 389 */ 390 public function releaseSavepoint($savepoint) 391 { 392 $this->ensureConnectedToPrimary(); 393 394 parent::releaseSavepoint($savepoint); 395 } 396 397 /** 398 * {@inheritDoc} 399 */ 400 public function rollbackSavepoint($savepoint) 401 { 402 $this->ensureConnectedToPrimary(); 403 404 parent::rollbackSavepoint($savepoint); 405 } 406 407 /** 408 * {@inheritDoc} 409 */ 410 public function query() 411 { 412 $this->ensureConnectedToPrimary(); 413 assert($this->_conn instanceof DriverConnection); 414 415 $args = func_get_args(); 416 417 $logger = $this->getConfiguration()->getSQLLogger(); 418 if ($logger) { 419 $logger->startQuery($args[0]); 420 } 421 422 $statement = $this->_conn->query(...$args); 423 424 $statement->setFetchMode($this->defaultFetchMode); 425 426 if ($logger) { 427 $logger->stopQuery(); 428 } 429 430 return $statement; 431 } 432 433 /** 434 * {@inheritDoc} 435 */ 436 public function prepare($statement) 437 { 438 $this->ensureConnectedToPrimary(); 439 440 return parent::prepare($statement); 441 } 442} 443