1<?php
2
3namespace Doctrine\DBAL\Sharding;
4
5use Doctrine\Common\EventManager;
6use Doctrine\DBAL\Configuration;
7use Doctrine\DBAL\Connection;
8use Doctrine\DBAL\Driver;
9use Doctrine\DBAL\Driver\Connection as DriverConnection;
10use Doctrine\DBAL\Event\ConnectionEventArgs;
11use Doctrine\DBAL\Events;
12use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser;
13use InvalidArgumentException;
14use function array_merge;
15use function is_numeric;
16use function is_string;
17
18/**
19 * Sharding implementation that pools many different connections
20 * internally and serves data from the currently active connection.
21 *
22 * The internals of this class are:
23 *
24 * - All sharding clients are specified and given a shard-id during
25 *   configuration.
26 * - By default, the global shard is selected. If no global shard is configured
27 *   an exception is thrown on access.
28 * - Selecting a shard by distribution value delegates the mapping
29 *   "distributionValue" => "client" to the ShardChoser interface.
30 * - An exception is thrown if trying to switch shards during an open
31 *   transaction.
32 *
33 * Instantiation through the DriverManager looks like:
34 *
35 * @example
36 *
37 * $conn = DriverManager::getConnection(array(
38 *    'wrapperClass' => 'Doctrine\DBAL\Sharding\PoolingShardConnection',
39 *    'driver' => 'pdo_mysql',
40 *    'global' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
41 *    'shards' => array(
42 *        array('id' => 1, 'user' => 'slave1', 'password', 'host' => '', 'dbname' => ''),
43 *        array('id' => 2, 'user' => 'slave2', 'password', 'host' => '', 'dbname' => ''),
44 *    ),
45 *    'shardChoser' => 'Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser',
46 * ));
47 * $shardManager = $conn->getShardManager();
48 * $shardManager->selectGlobal();
49 * $shardManager->selectShard($value);
50 */
51class PoolingShardConnection extends Connection
52{
53    /** @var DriverConnection[] */
54    private $activeConnections = [];
55
56    /** @var string|int|null */
57    private $activeShardId;
58
59    /** @var mixed[] */
60    private $connectionParameters = [];
61
62    /**
63     * {@inheritDoc}
64     *
65     * @throws InvalidArgumentException
66     */
67    public function __construct(array $params, Driver $driver, ?Configuration $config = null, ?EventManager $eventManager = null)
68    {
69        if (! isset($params['global'], $params['shards'])) {
70            throw new InvalidArgumentException("Connection Parameters require 'global' and 'shards' configurations.");
71        }
72
73        if (! isset($params['shardChoser'])) {
74            throw new InvalidArgumentException("Missing Shard Choser configuration 'shardChoser'");
75        }
76
77        if (is_string($params['shardChoser'])) {
78            $params['shardChoser'] = new $params['shardChoser']();
79        }
80
81        if (! ($params['shardChoser'] instanceof ShardChoser)) {
82            throw new InvalidArgumentException("The 'shardChoser' configuration is not a valid instance of Doctrine\DBAL\Sharding\ShardChoser\ShardChoser");
83        }
84
85        $this->connectionParameters[0] = array_merge($params, $params['global']);
86
87        foreach ($params['shards'] as $shard) {
88            if (! isset($shard['id'])) {
89                throw new InvalidArgumentException("Missing 'id' for one configured shard. Please specify a unique shard-id.");
90            }
91
92            if (! is_numeric($shard['id']) || $shard['id'] < 1) {
93                throw new InvalidArgumentException('Shard Id has to be a non-negative number.');
94            }
95
96            if (isset($this->connectionParameters[$shard['id']])) {
97                throw new InvalidArgumentException('Shard ' . $shard['id'] . ' is duplicated in the configuration.');
98            }
99
100            $this->connectionParameters[$shard['id']] = array_merge($params, $shard);
101        }
102
103        parent::__construct($params, $driver, $config, $eventManager);
104    }
105
106    /**
107     * Get active shard id.
108     *
109     * @return string|int|null
110     */
111    public function getActiveShardId()
112    {
113        return $this->activeShardId;
114    }
115
116    /**
117     * {@inheritdoc}
118     */
119    public function getParams()
120    {
121        return $this->activeShardId ? $this->connectionParameters[$this->activeShardId] : $this->connectionParameters[0];
122    }
123
124    /**
125     * {@inheritdoc}
126     */
127    public function getHost()
128    {
129        $params = $this->getParams();
130
131        return $params['host'] ?? parent::getHost();
132    }
133
134    /**
135     * {@inheritdoc}
136     */
137    public function getPort()
138    {
139        $params = $this->getParams();
140
141        return $params['port'] ?? parent::getPort();
142    }
143
144    /**
145     * {@inheritdoc}
146     */
147    public function getUsername()
148    {
149        $params = $this->getParams();
150
151        return $params['user'] ?? parent::getUsername();
152    }
153
154    /**
155     * {@inheritdoc}
156     */
157    public function getPassword()
158    {
159        $params = $this->getParams();
160
161        return $params['password'] ?? parent::getPassword();
162    }
163
164    /**
165     * Connects to a given shard.
166     *
167     * @param string|int|null $shardId
168     *
169     * @return bool
170     *
171     * @throws ShardingException
172     */
173    public function connect($shardId = null)
174    {
175        if ($shardId === null && $this->_conn) {
176            return false;
177        }
178
179        if ($shardId !== null && $shardId === $this->activeShardId) {
180            return false;
181        }
182
183        if ($this->getTransactionNestingLevel() > 0) {
184            throw new ShardingException('Cannot switch shard when transaction is active.');
185        }
186
187        $activeShardId = $this->activeShardId = (int) $shardId;
188
189        if (isset($this->activeConnections[$activeShardId])) {
190            $this->_conn = $this->activeConnections[$activeShardId];
191
192            return false;
193        }
194
195        $this->_conn = $this->activeConnections[$activeShardId] = $this->connectTo($activeShardId);
196
197        if ($this->_eventManager->hasListeners(Events::postConnect)) {
198            $eventArgs = new ConnectionEventArgs($this);
199            $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
200        }
201
202        return true;
203    }
204
205    /**
206     * Connects to a specific connection.
207     *
208     * @param string|int $shardId
209     *
210     * @return \Doctrine\DBAL\Driver\Connection
211     */
212    protected function connectTo($shardId)
213    {
214        $params = $this->getParams();
215
216        $driverOptions = $params['driverOptions'] ?? [];
217
218        $connectionParams = $this->connectionParameters[$shardId];
219
220        $user     = $connectionParams['user'] ?? null;
221        $password = $connectionParams['password'] ?? null;
222
223        return $this->_driver->connect($connectionParams, $user, $password, $driverOptions);
224    }
225
226    /**
227     * @param string|int|null $shardId
228     *
229     * @return bool
230     */
231    public function isConnected($shardId = null)
232    {
233        if ($shardId === null) {
234            return $this->_conn !== null;
235        }
236
237        return isset($this->activeConnections[$shardId]);
238    }
239
240    /**
241     * @return void
242     */
243    public function close()
244    {
245        $this->_conn             = null;
246        $this->activeConnections = [];
247        $this->activeShardId     = null;
248    }
249}
250