1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #pragma once
32 
33 #include <memory>
34 #include <queue>
35 
36 #include "mongo/base/disallow_copying.h"
37 #include "mongo/stdx/chrono.h"
38 #include "mongo/stdx/functional.h"
39 #include "mongo/stdx/mutex.h"
40 #include "mongo/stdx/unordered_map.h"
41 #include "mongo/util/net/hostandport.h"
42 #include "mongo/util/time_support.h"
43 
44 namespace mongo {
45 
46 class BSONObjBuilder;
47 
48 namespace executor {
49 
50 struct ConnectionPoolStats;
51 
52 /**
53  * The actual user visible connection pool.
54  *
55  * This pool is constructed with a DependentTypeFactoryInterface which provides the tools it
56  * needs to generate connections and manage them over time.
57  *
58  * The overall workflow here is to manage separate pools for each unique
59  * HostAndPort. See comments on the various Options for how the pool operates.
60  */
61 class ConnectionPool {
62     class ConnectionHandleDeleter;
63     class SpecificPool;
64 
65 public:
66     class ConnectionInterface;
67     class DependentTypeFactoryInterface;
68     class TimerInterface;
69 
70     using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
71 
72     using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>;
73 
74     static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000);  // 5mins
75     static const size_t kDefaultMaxConns;
76     static const size_t kDefaultMinConns;
77     static const size_t kDefaultMaxConnecting;
78     static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000);  // 1min
79     static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000);      // 20secs
80 
81     static const Status kConnectionStateUnknown;
82 
83     struct Options {
OptionsOptions84         Options() {}
85 
86         /**
87          * The minimum number of connections to keep alive while the pool is in
88          * operation
89          */
90         size_t minConnections = kDefaultMinConns;
91 
92         /**
93          * The maximum number of connections to spawn for a host. This includes
94          * pending connections in setup and connections checked out of the pool
95          * as well as the obvious live connections in the pool.
96          */
97         size_t maxConnections = kDefaultMaxConns;
98 
99         /**
100          * The maximum number of processing connections for a host.  This includes pending
101          * connections in setup/refresh. It's designed to rate limit connection storms rather than
102          * steady state processing (as maxConnections does).
103          */
104         size_t maxConnecting = kDefaultMaxConnecting;
105 
106         /**
107          * Amount of time to wait before timing out a refresh attempt
108          */
109         Milliseconds refreshTimeout = kDefaultRefreshTimeout;
110 
111         /**
112          * Amount of time a connection may be idle before it cannot be returned
113          * for a user request and must instead be checked out and refreshed
114          * before handing to a user.
115          */
116         Milliseconds refreshRequirement = kDefaultRefreshRequirement;
117 
118         /**
119          * Amount of time to keep a specific pool around without any checked
120          * out connections or new requests
121          */
122         Milliseconds hostTimeout = kDefaultHostTimeout;
123     };
124 
125     explicit ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,
126                             std::string name,
127                             Options options = Options{});
128 
129     ~ConnectionPool();
130 
131     void dropConnections(const HostAndPort& hostAndPort);
132 
133     void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb);
134 
135     void appendConnectionStats(ConnectionPoolStats* stats) const;
136 
137     size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
138 
139 private:
140     void returnConnection(ConnectionInterface* connection);
141 
142     std::string _name;
143 
144     // Options are set at startup and never changed at run time, so these are
145     // accessed outside the lock
146     const Options _options;
147 
148     const std::unique_ptr<DependentTypeFactoryInterface> _factory;
149 
150     // The global mutex for specific pool access and the generation counter
151     mutable stdx::mutex _mutex;
152     stdx::unordered_map<HostAndPort, std::unique_ptr<SpecificPool>> _pools;
153 };
154 
155 class ConnectionPool::ConnectionHandleDeleter {
156 public:
157     ConnectionHandleDeleter() = default;
ConnectionHandleDeleter(ConnectionPool * pool)158     ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
159 
operator()160     void operator()(ConnectionInterface* connection) {
161         if (_pool && connection)
162             _pool->returnConnection(connection);
163     }
164 
165 private:
166     ConnectionPool* _pool = nullptr;
167 };
168 
169 /**
170  * Interface for a basic timer
171  *
172  * Minimal interface sets a timer with a callback and cancels the timer.
173  */
174 class ConnectionPool::TimerInterface {
175     MONGO_DISALLOW_COPYING(TimerInterface);
176 
177 public:
178     TimerInterface() = default;
179 
180     using TimeoutCallback = stdx::function<void()>;
181 
182     virtual ~TimerInterface() = default;
183 
184     /**
185      * Sets the timeout for the timer. Setting an already set timer should
186      * override the previous timer.
187      */
188     virtual void setTimeout(Milliseconds timeout, TimeoutCallback cb) = 0;
189 
190     /**
191      * It should be safe to cancel a previously canceled, or never set, timer.
192      */
193     virtual void cancelTimeout() = 0;
194 };
195 
196 /**
197  * Interface for connection pool connections
198  *
199  * Provides a minimal interface to manipulate connections within the pool,
200  * specifically callbacks to set them up (connect + auth + whatever else),
201  * refresh them (issue some kind of ping) and manage a timer.
202  */
203 class ConnectionPool::ConnectionInterface : public TimerInterface {
204     MONGO_DISALLOW_COPYING(ConnectionInterface);
205 
206     friend class ConnectionPool;
207 
208 public:
209     ConnectionInterface() = default;
210     virtual ~ConnectionInterface() = default;
211 
212     /**
213      * Indicates that the user is now done with this connection. Users MUST call either
214      * this method or indicateFailure() before returning the connection to its pool.
215      */
216     virtual void indicateSuccess() = 0;
217 
218     /**
219      * Indicates that a connection has failed. This will prevent the connection
220      * from re-entering the connection pool. Users MUST call either this method or
221      * indicateSuccess() before returning connections to the pool.
222      */
223     virtual void indicateFailure(Status status) = 0;
224 
225     /**
226      * The HostAndPort for the connection. This should be the same as the
227      * HostAndPort passed to DependentTypeFactoryInterface::makeConnection.
228      */
229     virtual const HostAndPort& getHostAndPort() const = 0;
230 
231     /**
232      * Check if the connection is healthy using some implementation defined condition.
233      */
234     virtual bool isHealthy() = 0;
235 
236 protected:
237     /**
238      * Making these protected makes the definitions available to override in
239      * children.
240      */
241     using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>;
242     using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>;
243 
244 private:
245     /**
246      * This method updates a 'liveness' timestamp to avoid unnecessarily refreshing
247      * the connection.
248      */
249     virtual void indicateUsed() = 0;
250 
251     /**
252      * Returns the last used time point for the connection
253      */
254     virtual Date_t getLastUsed() const = 0;
255 
256     /**
257      * Returns the status associated with the connection. If the status is not
258      * OK, the connection will not be returned to the pool.
259      */
260     virtual const Status& getStatus() const = 0;
261 
262     /**
263      * Sets up the connection. This should include connection + auth + any
264      * other associated hooks.
265      */
266     virtual void setup(Milliseconds timeout, SetupCallback cb) = 0;
267 
268     /**
269      * Resets the connection's state to kConnectionStateUnknown for the next user.
270      */
271     virtual void resetToUnknown() = 0;
272 
273     /**
274      * Refreshes the connection. This should involve a network round trip and
275      * should strongly imply an active connection
276      */
277     virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0;
278 
279     /**
280      * Get the generation of the connection. This is used to track whether to
281      * continue using a connection after a call to dropConnections() by noting
282      * if the generation on the specific pool is the same as the generation on
283      * a connection (if not the connection is from a previous era and should
284      * not be re-used).
285      */
286     virtual size_t getGeneration() const = 0;
287 };
288 
289 /**
290  * Implementation interface for the connection pool
291  *
292  * This factory provides generators for connections, timers and a clock for the
293  * connection pool.
294  */
295 class ConnectionPool::DependentTypeFactoryInterface {
296     MONGO_DISALLOW_COPYING(DependentTypeFactoryInterface);
297 
298 public:
299     DependentTypeFactoryInterface() = default;
300 
301     virtual ~DependentTypeFactoryInterface() = default;
302 
303     /**
304      * Makes a new connection given a host and port
305      */
306     virtual std::unique_ptr<ConnectionInterface> makeConnection(const HostAndPort& hostAndPort,
307                                                                 size_t generation) = 0;
308 
309     /**
310      * Makes a new timer
311      */
312     virtual std::unique_ptr<TimerInterface> makeTimer() = 0;
313 
314     /**
315      * Returns the current time point
316      */
317     virtual Date_t now() = 0;
318 };
319 
320 }  // namespace executor
321 }  // namespace mongo
322