1<?php
2
3namespace Doctrine\DBAL\Connections;
4
5use Doctrine\Common\EventManager;
6use Doctrine\DBAL\Configuration;
7use Doctrine\DBAL\Connection;
8use Doctrine\DBAL\Driver;
9use Doctrine\DBAL\Driver\Connection as DriverConnection;
10use Doctrine\DBAL\DriverManager;
11use Doctrine\DBAL\Event\ConnectionEventArgs;
12use Doctrine\DBAL\Events;
13use InvalidArgumentException;
14
15use function array_rand;
16use function assert;
17use function count;
18use function func_get_args;
19
20/**
21 * Primary-Replica Connection
22 *
23 * Connection can be used with primary-replica setups.
24 *
25 * Important for the understanding of this connection should be how and when
26 * it picks the replica or primary.
27 *
28 * 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection'
29 *    or 'executeQuery' is used.
30 * 2. Primary picked when 'exec', 'executeUpdate', 'executeStatement', 'insert', 'delete', 'update', 'createSavepoint',
31 *    'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or
32 *    'prepare' is called.
33 * 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards.
34 * 4. One replica connection is randomly picked ONCE during a request.
35 *
36 * ATTENTION: You can write to the replica with this connection if you execute a write query without
37 * opening up a transaction. For example:
38 *
39 *      $conn = DriverManager::getConnection(...);
40 *      $conn->executeQuery("DELETE FROM table");
41 *
42 * Be aware that Connection#executeQuery is a method specifically for READ
43 * operations only.
44 *
45 * Use Connection#executeStatement for any SQL statement that changes/updates
46 * state in the database (UPDATE, INSERT, DELETE or DDL statements).
47 *
48 * This connection is limited to replica operations using the
49 * Connection#executeQuery operation only, because it wouldn't be compatible
50 * with the ORM or SchemaManager code otherwise. Both use all the other
51 * operations in a context where writes could happen to a replica, which makes
52 * this restricted approach necessary.
53 *
54 * You can manually connect to the primary at any time by calling:
55 *
56 *      $conn->ensureConnectedToPrimary();
57 *
58 * Instantiation through the DriverManager looks like:
59 *
60 * @psalm-import-type Params from DriverManager
61 * @example
62 *
63 * $conn = DriverManager::getConnection(array(
64 *    'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReadReplicaConnection',
65 *    'driver' => 'pdo_mysql',
66 *    'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
67 *    'replica' => array(
68 *        array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''),
69 *        array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''),
70 *    )
71 * ));
72 *
73 * You can also pass 'driverOptions' and any other documented option to each of this drivers
74 * to pass additional information.
75 */
76class PrimaryReadReplicaConnection extends Connection
77{
78    /**
79     * Primary and Replica connection (one of the randomly picked replicas).
80     *
81     * @var DriverConnection[]|null[]
82     */
83    protected $connections = ['primary' => null, 'replica' => null];
84
85    /**
86     * You can keep the replica connection and then switch back to it
87     * during the request if you know what you are doing.
88     *
89     * @var bool
90     */
91    protected $keepReplica = false;
92
93    /**
94     * Creates Primary Replica Connection.
95     *
96     * @internal The connection can be only instantiated by the driver manager.
97     *
98     * @param array<string,mixed> $params
99     * @psalm-param Params $params
100     * @phpstan-param array<string,mixed> $params
101     *
102     * @throws InvalidArgumentException
103     */
104    public function __construct(
105        array $params,
106        Driver $driver,
107        ?Configuration $config = null,
108        ?EventManager $eventManager = null
109    ) {
110        if (! isset($params['replica'], $params['primary'])) {
111            throw new InvalidArgumentException('primary or replica configuration missing');
112        }
113
114        if (count($params['replica']) === 0) {
115            throw new InvalidArgumentException('You have to configure at least one replica.');
116        }
117
118        if (isset($params['driver'])) {
119            $params['primary']['driver'] = $params['driver'];
120
121            foreach ($params['replica'] as $replicaKey => $replica) {
122                $params['replica'][$replicaKey]['driver'] = $params['driver'];
123            }
124        }
125
126        $this->keepReplica = (bool) ($params['keepReplica'] ?? false);
127
128        parent::__construct($params, $driver, $config, $eventManager);
129    }
130
131    /**
132     * Checks if the connection is currently towards the primary or not.
133     */
134    public function isConnectedToPrimary(): bool
135    {
136        return $this->_conn !== null && $this->_conn === $this->connections['primary'];
137    }
138
139    /**
140     * @param string|null $connectionName
141     *
142     * @return bool
143     */
144    public function connect($connectionName = null)
145    {
146        if ($connectionName !== null) {
147            throw new InvalidArgumentException(
148                'Passing a connection name as first argument is not supported anymore.'
149                    . ' Use ensureConnectedToPrimary()/ensureConnectedToReplica() instead.'
150            );
151        }
152
153        return $this->performConnect();
154    }
155
156    protected function performConnect(?string $connectionName = null): bool
157    {
158        $requestedConnectionChange = ($connectionName !== null);
159        $connectionName            = $connectionName ?: 'replica';
160
161        if ($connectionName !== 'replica' && $connectionName !== 'primary') {
162            throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.');
163        }
164
165        // If we have a connection open, and this is not an explicit connection
166        // change request, then abort right here, because we are already done.
167        // This prevents writes to the replica in case of "keepReplica" option enabled.
168        if ($this->_conn !== null && ! $requestedConnectionChange) {
169            return false;
170        }
171
172        $forcePrimaryAsReplica = false;
173
174        if ($this->getTransactionNestingLevel() > 0) {
175            $connectionName        = 'primary';
176            $forcePrimaryAsReplica = true;
177        }
178
179        if (isset($this->connections[$connectionName])) {
180            $this->_conn = $this->connections[$connectionName];
181
182            if ($forcePrimaryAsReplica && ! $this->keepReplica) {
183                $this->connections['replica'] = $this->_conn;
184            }
185
186            return false;
187        }
188
189        if ($connectionName === 'primary') {
190            $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName);
191
192            // Set replica connection to primary to avoid invalid reads
193            if (! $this->keepReplica) {
194                $this->connections['replica'] = $this->connections['primary'];
195            }
196        } else {
197            $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName);
198        }
199
200        if ($this->_eventManager->hasListeners(Events::postConnect)) {
201            $eventArgs = new ConnectionEventArgs($this);
202            $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
203        }
204
205        return true;
206    }
207
208    /**
209     * Connects to the primary node of the database cluster.
210     *
211     * All following statements after this will be executed against the primary node.
212     */
213    public function ensureConnectedToPrimary(): bool
214    {
215        return $this->performConnect('primary');
216    }
217
218    /**
219     * Connects to a replica node of the database cluster.
220     *
221     * All following statements after this will be executed against the replica node,
222     * unless the keepReplica option is set to false and a primary connection
223     * was already opened.
224     */
225    public function ensureConnectedToReplica(): bool
226    {
227        return $this->performConnect('replica');
228    }
229
230    /**
231     * Connects to a specific connection.
232     *
233     * @param string $connectionName
234     *
235     * @return DriverConnection
236     */
237    protected function connectTo($connectionName)
238    {
239        $params = $this->getParams();
240
241        $driverOptions = $params['driverOptions'] ?? [];
242
243        $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params);
244
245        $user     = $connectionParams['user'] ?? null;
246        $password = $connectionParams['password'] ?? null;
247
248        return $this->_driver->connect($connectionParams, $user, $password, $driverOptions);
249    }
250
251    /**
252     * @param string  $connectionName
253     * @param mixed[] $params
254     *
255     * @return mixed
256     */
257    protected function chooseConnectionConfiguration($connectionName, $params)
258    {
259        if ($connectionName === 'primary') {
260            return $params['primary'];
261        }
262
263        $config = $params['replica'][array_rand($params['replica'])];
264
265        if (! isset($config['charset']) && isset($params['primary']['charset'])) {
266            $config['charset'] = $params['primary']['charset'];
267        }
268
269        return $config;
270    }
271
272    /**
273     * {@inheritDoc}
274     *
275     * @deprecated Use {@link executeStatement()} instead.
276     */
277    public function executeUpdate($sql, array $params = [], array $types = [])
278    {
279        $this->ensureConnectedToPrimary();
280
281        return parent::executeUpdate($sql, $params, $types);
282    }
283
284    /**
285     * {@inheritDoc}
286     */
287    public function executeStatement($sql, array $params = [], array $types = [])
288    {
289        $this->ensureConnectedToPrimary();
290
291        return parent::executeStatement($sql, $params, $types);
292    }
293
294    /**
295     * {@inheritDoc}
296     */
297    public function beginTransaction()
298    {
299        $this->ensureConnectedToPrimary();
300
301        return parent::beginTransaction();
302    }
303
304    /**
305     * {@inheritDoc}
306     */
307    public function commit()
308    {
309        $this->ensureConnectedToPrimary();
310
311        return parent::commit();
312    }
313
314    /**
315     * {@inheritDoc}
316     */
317    public function rollBack()
318    {
319        $this->ensureConnectedToPrimary();
320
321        return parent::rollBack();
322    }
323
324    /**
325     * {@inheritDoc}
326     */
327    public function delete($table, array $criteria, array $types = [])
328    {
329        $this->ensureConnectedToPrimary();
330
331        return parent::delete($table, $criteria, $types);
332    }
333
334    /**
335     * {@inheritDoc}
336     */
337    public function close()
338    {
339        unset($this->connections['primary'], $this->connections['replica']);
340
341        parent::close();
342
343        $this->_conn       = null;
344        $this->connections = ['primary' => null, 'replica' => null];
345    }
346
347    /**
348     * {@inheritDoc}
349     */
350    public function update($table, array $data, array $criteria, array $types = [])
351    {
352        $this->ensureConnectedToPrimary();
353
354        return parent::update($table, $data, $criteria, $types);
355    }
356
357    /**
358     * {@inheritDoc}
359     */
360    public function insert($table, array $data, array $types = [])
361    {
362        $this->ensureConnectedToPrimary();
363
364        return parent::insert($table, $data, $types);
365    }
366
367    /**
368     * {@inheritDoc}
369     */
370    public function exec($statement)
371    {
372        $this->ensureConnectedToPrimary();
373
374        return parent::exec($statement);
375    }
376
377    /**
378     * {@inheritDoc}
379     */
380    public function createSavepoint($savepoint)
381    {
382        $this->ensureConnectedToPrimary();
383
384        parent::createSavepoint($savepoint);
385    }
386
387    /**
388     * {@inheritDoc}
389     */
390    public function releaseSavepoint($savepoint)
391    {
392        $this->ensureConnectedToPrimary();
393
394        parent::releaseSavepoint($savepoint);
395    }
396
397    /**
398     * {@inheritDoc}
399     */
400    public function rollbackSavepoint($savepoint)
401    {
402        $this->ensureConnectedToPrimary();
403
404        parent::rollbackSavepoint($savepoint);
405    }
406
407    /**
408     * {@inheritDoc}
409     */
410    public function query()
411    {
412        $this->ensureConnectedToPrimary();
413        assert($this->_conn instanceof DriverConnection);
414
415        $args = func_get_args();
416
417        $logger = $this->getConfiguration()->getSQLLogger();
418        if ($logger) {
419            $logger->startQuery($args[0]);
420        }
421
422        $statement = $this->_conn->query(...$args);
423
424        $statement->setFetchMode($this->defaultFetchMode);
425
426        if ($logger) {
427            $logger->stopQuery();
428        }
429
430        return $statement;
431    }
432
433    /**
434     * {@inheritDoc}
435     */
436    public function prepare($statement)
437    {
438        $this->ensureConnectedToPrimary();
439
440        return parent::prepare($statement);
441    }
442}
443