1 /** @file connpool.h */
2 
3 
4 /**
5  *    Copyright (C) 2018-present MongoDB, Inc.
6  *
7  *    This program is free software: you can redistribute it and/or modify
8  *    it under the terms of the Server Side Public License, version 1,
9  *    as published by MongoDB, Inc.
10  *
11  *    This program is distributed in the hope that it will be useful,
12  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *    Server Side Public License for more details.
15  *
16  *    You should have received a copy of the Server Side Public License
17  *    along with this program. If not, see
18  *    <http://www.mongodb.com/licensing/server-side-public-license>.
19  *
20  *    As a special exception, the copyright holders give permission to link the
21  *    code of portions of this program with the OpenSSL library under certain
22  *    conditions as described in each individual source file and distribute
23  *    linked combinations including the program with the OpenSSL library. You
24  *    must comply with the Server Side Public License in all respects for
25  *    all of the code used other than as permitted herein. If you modify file(s)
26  *    with this exception, you may extend this exception to your version of the
27  *    file(s), but you are not obligated to do so. If you do not wish to do so,
28  *    delete this exception statement from your version. If you delete this
29  *    exception statement from all source files in the program, then also delete
30  *    it in the license file.
31  */
32 
33 #pragma once
34 
35 #include <cstdint>
36 #include <stack>
37 
38 #include "mongo/client/dbclientinterface.h"
39 #include "mongo/client/mongo_uri.h"
40 #include "mongo/platform/atomic_word.h"
41 #include "mongo/util/background.h"
42 #include "mongo/util/concurrency/mutex.h"
43 #include "mongo/util/time_support.h"
44 
45 namespace mongo {
46 
47 class BSONObjBuilder;
48 class DBConnectionPool;
49 
50 namespace executor {
51 struct ConnectionPoolStats;
52 }  // namespace executor
53 
54 /**
55  * The PoolForHost is responsible for storing a maximum of _maxPoolSize connections to a particular
56  * host. It is not responsible for creating new connections; instead, when DBConnectionPool is asked
57  * for a connection to a particular host, DBConnectionPool will check if any connections are
58  * available in the PoolForHost for that host. If so, DBConnectionPool will check out a connection
59  * from PoolForHost, and if not, DBConnectionPool will create a new connection itself, if we are
60  * below the maximum allowed number of connections. If we have already created _maxPoolSize
61  * connections, the calling thread will block until a new connection can be made for it.
62  *
63  * Once the connection is released back to DBConnectionPool, DBConnectionPool will attempt to
64  * release the connection to PoolForHost. This is how connections enter PoolForHost for the first
65  * time. If PoolForHost is below the _maxPoolSize limit, PoolForHost will take ownership of the
66  * connection, otherwise PoolForHost will clean up and destroy the connection.
67  *
68  * Additionally, PoolForHost knows how to purge itself of stale connections (since a connection can
69  * go stale while it is just sitting in the pool), but does not decide when to do so. Instead,
70  * DBConnectionPool tells PoolForHost to purge stale connections periodically.
71  *
72  * PoolForHost is not thread-safe; thread safety is handled by DBConnectionPool.
73  */
74 class PoolForHost {
75     MONGO_DISALLOW_COPYING(PoolForHost);
76 
77 public:
78     // Sentinel value indicating pool has no cleanup limit
79     static const int kPoolSizeUnlimited;
80 
81     friend class DBConnectionPool;
82 
83     PoolForHost();
84     ~PoolForHost();
85 
86     /**
87      * Returns the number of connections in this pool that went bad.
88      */
getNumBadConns()89     int getNumBadConns() const {
90         return _badConns;
91     }
92 
93     /**
94      * Returns the maximum number of connections stored in the pool
95      */
getMaxPoolSize()96     int getMaxPoolSize() const {
97         return _maxPoolSize;
98     }
99 
100     /**
101      * Sets the maximum number of connections stored in the pool
102      */
setMaxPoolSize(int maxPoolSize)103     void setMaxPoolSize(int maxPoolSize) {
104         _maxPoolSize = maxPoolSize;
105     }
106 
107     /**
108      * Sets the maximum number of in-use connections for this pool.
109      */
setMaxInUse(int maxInUse)110     void setMaxInUse(int maxInUse) {
111         _maxInUse = maxInUse;
112     }
113 
114     /**
115      * Sets the socket timeout on this host, in seconds, for reporting purposes only.
116      */
setSocketTimeout(double socketTimeout)117     void setSocketTimeout(double socketTimeout) {
118         _socketTimeoutSecs = socketTimeout;
119     }
120 
numAvailable()121     int numAvailable() const {
122         return (int)_pool.size();
123     }
124 
numInUse()125     int numInUse() const {
126         return _checkedOut;
127     }
128 
129     /**
130      * Returns the number of open connections in this pool.
131      */
openConnections()132     int openConnections() const {
133         return numInUse() + numAvailable();
134     }
135 
136     void createdOne(DBClientBase* base);
numCreated()137     long long numCreated() const {
138         return _created;
139     }
140 
type()141     ConnectionString::ConnectionType type() const {
142         verify(_created);
143         return _type;
144     }
145 
146     /**
147      * gets a connection or return NULL
148      */
149     DBClientBase* get(DBConnectionPool* pool, double socketTimeout);
150 
151     // Deletes all connections in the pool
152     void clear();
153 
154     void done(DBConnectionPool* pool, DBClientBase* c);
155 
156     void flush();
157 
158     void getStaleConnections(Date_t idleThreshold, std::vector<DBClientBase*>& stale);
159 
160     /**
161      * Sets the lower bound for creation times that can be considered as
162      *     good connections.
163      */
164     void reportBadConnectionAt(uint64_t microSec);
165 
166     /**
167      * @return true if the given creation time is considered to be not
168      *     good for use.
169      */
170     bool isBadSocketCreationTime(uint64_t microSec);
171 
172     /**
173      * Sets the host name to a new one, only if it is currently empty.
174      */
175     void initializeHostName(const std::string& hostName);
176 
177     /**
178      * If this pool has more than _maxPoolSize connections in use, blocks
179      * the calling thread until a connection is returned to the pool or
180      * is destroyed. If a non-zero timeout is given, this method will
181      * throw if a free connection cannot be acquired within that amount of
182      * time. Timeout is in seconds.
183      */
184     void waitForFreeConnection(int timeout, stdx::unique_lock<stdx::mutex>& lk);
185 
186     /**
187      * Notifies any waiters that there are new connections available.
188      */
189     void notifyWaiters();
190 
191     /**
192      * Shuts down this pool, notifying all waiters.
193      */
194     void shutdown();
195 
196 private:
197     struct StoredConnection {
198         StoredConnection(std::unique_ptr<DBClientBase> c);
199 
200         bool ok();
201 
202         /**
203          * Returns true if this connection was added before the given time.
204          */
205         bool addedBefore(Date_t time);
206 
207         std::unique_ptr<DBClientBase> conn;
208 
209         // The time when this connection was added to the pool. Will
210         // be reset if the connection is checked out and re-added.
211         Date_t added;
212     };
213 
214     std::string _hostName;
215     double _socketTimeoutSecs;
216     std::stack<StoredConnection> _pool;
217 
218     int64_t _created;
219     uint64_t _minValidCreationTimeMicroSec;
220     ConnectionString::ConnectionType _type;
221 
222     // The maximum number of connections we'll save in the pool
223     int _maxPoolSize;
224 
225     // The maximum number of connections allowed to be in-use in this pool
226     int _maxInUse;
227 
228     // The number of currently active connections from this pool
229     int _checkedOut;
230 
231     // The number of connections that we did not reuse because they went bad.
232     int _badConns;
233 
234     // Whether our parent DBConnectionPool object is in destruction
235     bool _parentDestroyed;
236 
237     stdx::condition_variable _cv;
238 
239     AtomicWord<bool> _inShutdown;
240 };
241 
242 class DBConnectionHook {
243 public:
~DBConnectionHook()244     virtual ~DBConnectionHook() {}
onCreate(DBClientBase * conn)245     virtual void onCreate(DBClientBase* conn) {}
onHandedOut(DBClientBase * conn)246     virtual void onHandedOut(DBClientBase* conn) {}
onRelease(DBClientBase * conn)247     virtual void onRelease(DBClientBase* conn) {}
onDestroy(DBClientBase * conn)248     virtual void onDestroy(DBClientBase* conn) {}
249 };
250 
251 /** Database connection pool.
252 
253     Generally, use ScopedDbConnection and do not call these directly.
254 
255     This class, so far, is suitable for use with unauthenticated connections.
256     Support for authenticated connections requires some adjustments: please
257     request...
258 
259     Usage:
260 
261     {
262        ScopedDbConnection c("myserver");
263        c.conn()...
264     }
265 */
266 class DBConnectionPool : public PeriodicTask {
267 public:
268     DBConnectionPool();
269     ~DBConnectionPool();
270 
271     /** right now just controls some asserts.  defaults to "dbconnectionpool" */
setName(const std::string & name)272     void setName(const std::string& name) {
273         _name = name;
274     }
275 
276     /**
277      * Returns the maximum number of connections pooled per-host
278      *
279      * This setting only applies to new host connection pools, previously-pooled host pools are
280      * unaffected.
281      */
getMaxPoolSize()282     int getMaxPoolSize() const {
283         return _maxPoolSize;
284     }
285 
286     /**
287      * Returns the number of connections to the given host pool.
288      */
289     int openConnections(const std::string& ident, double socketTimeout);
290 
291     /**
292      * Sets the maximum number of connections pooled per-host.
293      *
294      * This setting only applies to new host connection pools, previously-pooled host pools are
295      * unaffected.
296      */
setMaxPoolSize(int maxPoolSize)297     void setMaxPoolSize(int maxPoolSize) {
298         _maxPoolSize = maxPoolSize;
299     }
300 
301     /**
302      * Sets the maximum number of in-use connections per host.
303      */
setMaxInUse(int maxInUse)304     void setMaxInUse(int maxInUse) {
305         _maxInUse = maxInUse;
306     }
307 
308     /**
309      * Sets the timeout value for idle connections, after which we will remove them
310      * from the pool. This value is in minutes.
311      */
setIdleTimeout(int timeout)312     void setIdleTimeout(int timeout) {
313         _idleTimeout = Minutes(timeout);
314     }
315 
316     void onCreate(DBClientBase* conn);
317     void onHandedOut(DBClientBase* conn);
318     void onDestroy(DBClientBase* conn);
319     void onRelease(DBClientBase* conn);
320 
321     void flush();
322 
323     /**
324      * Gets a connection to the given host with the given timeout, in seconds.
325      */
326     DBClientBase* get(const std::string& host, double socketTimeout = 0);
327     DBClientBase* get(const ConnectionString& host, double socketTimeout = 0);
328     DBClientBase* get(const MongoURI& uri, double socketTimeout = 0);
329 
330     /**
331      * Gets the number of connections available in the pool.
332      */
333     int getNumAvailableConns(const std::string& host, double socketTimeout = 0) const;
334     int getNumBadConns(const std::string& host, double socketTimeout = 0) const;
335 
336     void release(const std::string& host, DBClientBase* c);
337     void decrementEgress(const std::string& host, DBClientBase* c);
338 
339     void addHook(DBConnectionHook* hook);  // we take ownership
340     void appendConnectionStats(executor::ConnectionPoolStats* stats) const;
341 
342     /**
343      * Clears all connections for all host.
344      */
345     void clear();
346 
347     /**
348      * Checks whether the connection for a given host is black listed or not.
349      *
350      * @param hostName the name of the host the connection connects to.
351      * @param conn the connection to check.
352      *
353      * @return true if the connection is not bad, meaning, it is good to keep it for
354      *     future use.
355      */
356     bool isConnectionGood(const std::string& host, DBClientBase* conn);
357 
358     // Removes and deletes all connections from the pool for the host (regardless of timeout)
359     void removeHost(const std::string& host);
360 
361     /** compares server namees, but is smart about replica set names */
362     struct serverNameCompare {
363         bool operator()(const std::string& a, const std::string& b) const;
364     };
365 
taskName()366     virtual std::string taskName() const {
367         return "DBConnectionPool-cleaner";
368     }
369     virtual void taskDoWork();
370 
371     /**
372      * Shuts down the connection pool, unblocking any waiters on connections.
373      */
374     void shutdown();
375 
376 private:
377     class Detail;
378 
379     DBConnectionPool(DBConnectionPool& p);
380 
381     DBClientBase* _get(const std::string& ident, double socketTimeout);
382 
383     DBClientBase* _finishCreate(const std::string& ident, double socketTimeout, DBClientBase* conn);
384 
385     struct PoolKey {
PoolKeyPoolKey386         PoolKey(const std::string& i, double t) : ident(i), timeout(t) {}
387         std::string ident;
388         double timeout;
389     };
390 
391     struct poolKeyCompare {
392         bool operator()(const PoolKey& a, const PoolKey& b) const;
393     };
394 
395     typedef std::map<PoolKey, PoolForHost, poolKeyCompare> PoolMap;  // servername -> pool
396 
397     mutable stdx::mutex _mutex;
398     std::string _name;
399 
400     // The maximum number of connections we'll save in the pool per-host
401     // PoolForHost::kPoolSizeUnlimited is a sentinel value meaning "no limit"
402     // 0 effectively disables the pool
403     int _maxPoolSize;
404 
405     int _maxInUse;
406     Minutes _idleTimeout;
407 
408     PoolMap _pools;
409 
410     AtomicWord<bool> _inShutdown;
411 
412     // pointers owned by me, right now they leak on shutdown
413     // _hooks itself also leaks because it creates a shutdown race condition
414     std::list<DBConnectionHook*>* _hooks;
415 };
416 
417 class AScopedConnection {
418     MONGO_DISALLOW_COPYING(AScopedConnection);
419 
420 public:
AScopedConnection()421     AScopedConnection() {
422         _numConnections.fetchAndAdd(1);
423     }
~AScopedConnection()424     virtual ~AScopedConnection() {
425         _numConnections.fetchAndAdd(-1);
426     }
427 
428     virtual DBClientBase* get() = 0;
429     virtual void done() = 0;
430     virtual std::string getHost() const = 0;
431 
432     /**
433      * @return true iff this has a connection to the db
434      */
435     virtual bool ok() const = 0;
436 
437     /**
438      * @return total number of current instances of AScopedConnection
439      */
getNumConnections()440     static int getNumConnections() {
441         return _numConnections.load();
442     }
443 
444 private:
445     static AtomicInt32 _numConnections;
446 };
447 
448 /** Use to get a connection from the pool.  On exceptions things
449    clean up nicely (i.e. the socket gets closed automatically when the
450    scopeddbconnection goes out of scope).
451 */
452 class ScopedDbConnection : public AScopedConnection {
453 public:
454     /** the main constructor you want to use
455         throws AssertionException if can't connect
456         */
457     explicit ScopedDbConnection(const std::string& host, double socketTimeout = 0);
458     explicit ScopedDbConnection(const ConnectionString& host, double socketTimeout = 0);
459     explicit ScopedDbConnection(const MongoURI& host, double socketTimeout = 0);
460 
ScopedDbConnection()461     ScopedDbConnection() : _host(""), _conn(0), _socketTimeoutSecs(0) {}
462 
463     /* @param conn - bind to an existing connection */
464     ScopedDbConnection(const std::string& host, DBClientBase* conn, double socketTimeout = 0)
_host(host)465         : _host(host), _conn(conn), _socketTimeoutSecs(socketTimeout) {
466         _setSocketTimeout();
467     }
468 
469     ~ScopedDbConnection();
470 
471     static void clearPool();
472 
473     /** get the associated connection object */
474     DBClientBase* operator->() {
475         uassert(11004, "connection was returned to the pool already", _conn);
476         return _conn;
477     }
478 
479     /** get the associated connection object */
conn()480     DBClientBase& conn() {
481         uassert(11005, "connection was returned to the pool already", _conn);
482         return *_conn;
483     }
484 
485     /** get the associated connection object */
get()486     DBClientBase* get() {
487         uassert(13102, "connection was returned to the pool already", _conn);
488         return _conn;
489     }
490 
ok()491     bool ok() const {
492         return _conn != NULL;
493     }
494 
getHost()495     std::string getHost() const {
496         return _host;
497     }
498 
499     /** Force closure of the connection.  You should call this if you leave it in
500         a bad state.  Destructor will do this too, but it is verbose.
501     */
502     void kill();
503 
504     /** Call this when you are done with the connection.
505 
506         If you do not call done() before this object goes out of scope,
507         we can't be sure we fully read all expected data of a reply on the socket.  so
508         we don't try to reuse the connection in that situation.
509     */
510     void done();
511 
512 private:
513     void _setSocketTimeout();
514 
515     const std::string _host;
516     DBClientBase* _conn;
517     const double _socketTimeoutSecs;
518 };
519 
520 }  // namespace mongo
521