1 2 /** 3 * Copyright (C) 2018-present MongoDB, Inc. 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the Server Side Public License, version 1, 7 * as published by MongoDB, Inc. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * Server Side Public License for more details. 13 * 14 * You should have received a copy of the Server Side Public License 15 * along with this program. If not, see 16 * <http://www.mongodb.com/licensing/server-side-public-license>. 17 * 18 * As a special exception, the copyright holders give permission to link the 19 * code of portions of this program with the OpenSSL library under certain 20 * conditions as described in each individual source file and distribute 21 * linked combinations including the program with the OpenSSL library. You 22 * must comply with the Server Side Public License in all respects for 23 * all of the code used other than as permitted herein. If you modify file(s) 24 * with this exception, you may extend this exception to your version of the 25 * file(s), but you are not obligated to do so. If you do not wish to do so, 26 * delete this exception statement from your version. If you delete this 27 * exception statement from all source files in the program, then also delete 28 * it in the license file. 29 */ 30 31 #pragma once 32 33 #include <memory> 34 #include <queue> 35 36 #include "mongo/base/disallow_copying.h" 37 #include "mongo/stdx/chrono.h" 38 #include "mongo/stdx/functional.h" 39 #include "mongo/stdx/mutex.h" 40 #include "mongo/stdx/unordered_map.h" 41 #include "mongo/util/net/hostandport.h" 42 #include "mongo/util/time_support.h" 43 44 namespace mongo { 45 46 class BSONObjBuilder; 47 48 namespace executor { 49 50 struct ConnectionPoolStats; 51 52 /** 53 * The actual user visible connection pool. 54 * 55 * This pool is constructed with a DependentTypeFactoryInterface which provides the tools it 56 * needs to generate connections and manage them over time. 57 * 58 * The overall workflow here is to manage separate pools for each unique 59 * HostAndPort. See comments on the various Options for how the pool operates. 60 */ 61 class ConnectionPool { 62 class ConnectionHandleDeleter; 63 class SpecificPool; 64 65 public: 66 class ConnectionInterface; 67 class DependentTypeFactoryInterface; 68 class TimerInterface; 69 70 using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; 71 72 using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>; 73 74 static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins 75 static const size_t kDefaultMaxConns; 76 static const size_t kDefaultMinConns; 77 static const size_t kDefaultMaxConnecting; 78 static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000); // 1min 79 static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000); // 20secs 80 81 static const Status kConnectionStateUnknown; 82 83 struct Options { OptionsOptions84 Options() {} 85 86 /** 87 * The minimum number of connections to keep alive while the pool is in 88 * operation 89 */ 90 size_t minConnections = kDefaultMinConns; 91 92 /** 93 * The maximum number of connections to spawn for a host. This includes 94 * pending connections in setup and connections checked out of the pool 95 * as well as the obvious live connections in the pool. 96 */ 97 size_t maxConnections = kDefaultMaxConns; 98 99 /** 100 * The maximum number of processing connections for a host. This includes pending 101 * connections in setup/refresh. It's designed to rate limit connection storms rather than 102 * steady state processing (as maxConnections does). 103 */ 104 size_t maxConnecting = kDefaultMaxConnecting; 105 106 /** 107 * Amount of time to wait before timing out a refresh attempt 108 */ 109 Milliseconds refreshTimeout = kDefaultRefreshTimeout; 110 111 /** 112 * Amount of time a connection may be idle before it cannot be returned 113 * for a user request and must instead be checked out and refreshed 114 * before handing to a user. 115 */ 116 Milliseconds refreshRequirement = kDefaultRefreshRequirement; 117 118 /** 119 * Amount of time to keep a specific pool around without any checked 120 * out connections or new requests 121 */ 122 Milliseconds hostTimeout = kDefaultHostTimeout; 123 }; 124 125 explicit ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, 126 std::string name, 127 Options options = Options{}); 128 129 ~ConnectionPool(); 130 131 void dropConnections(const HostAndPort& hostAndPort); 132 133 void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb); 134 135 void appendConnectionStats(ConnectionPoolStats* stats) const; 136 137 size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; 138 139 private: 140 void returnConnection(ConnectionInterface* connection); 141 142 std::string _name; 143 144 // Options are set at startup and never changed at run time, so these are 145 // accessed outside the lock 146 const Options _options; 147 148 const std::unique_ptr<DependentTypeFactoryInterface> _factory; 149 150 // The global mutex for specific pool access and the generation counter 151 mutable stdx::mutex _mutex; 152 stdx::unordered_map<HostAndPort, std::unique_ptr<SpecificPool>> _pools; 153 }; 154 155 class ConnectionPool::ConnectionHandleDeleter { 156 public: 157 ConnectionHandleDeleter() = default; ConnectionHandleDeleter(ConnectionPool * pool)158 ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {} 159 operator()160 void operator()(ConnectionInterface* connection) { 161 if (_pool && connection) 162 _pool->returnConnection(connection); 163 } 164 165 private: 166 ConnectionPool* _pool = nullptr; 167 }; 168 169 /** 170 * Interface for a basic timer 171 * 172 * Minimal interface sets a timer with a callback and cancels the timer. 173 */ 174 class ConnectionPool::TimerInterface { 175 MONGO_DISALLOW_COPYING(TimerInterface); 176 177 public: 178 TimerInterface() = default; 179 180 using TimeoutCallback = stdx::function<void()>; 181 182 virtual ~TimerInterface() = default; 183 184 /** 185 * Sets the timeout for the timer. Setting an already set timer should 186 * override the previous timer. 187 */ 188 virtual void setTimeout(Milliseconds timeout, TimeoutCallback cb) = 0; 189 190 /** 191 * It should be safe to cancel a previously canceled, or never set, timer. 192 */ 193 virtual void cancelTimeout() = 0; 194 }; 195 196 /** 197 * Interface for connection pool connections 198 * 199 * Provides a minimal interface to manipulate connections within the pool, 200 * specifically callbacks to set them up (connect + auth + whatever else), 201 * refresh them (issue some kind of ping) and manage a timer. 202 */ 203 class ConnectionPool::ConnectionInterface : public TimerInterface { 204 MONGO_DISALLOW_COPYING(ConnectionInterface); 205 206 friend class ConnectionPool; 207 208 public: 209 ConnectionInterface() = default; 210 virtual ~ConnectionInterface() = default; 211 212 /** 213 * Indicates that the user is now done with this connection. Users MUST call either 214 * this method or indicateFailure() before returning the connection to its pool. 215 */ 216 virtual void indicateSuccess() = 0; 217 218 /** 219 * Indicates that a connection has failed. This will prevent the connection 220 * from re-entering the connection pool. Users MUST call either this method or 221 * indicateSuccess() before returning connections to the pool. 222 */ 223 virtual void indicateFailure(Status status) = 0; 224 225 /** 226 * The HostAndPort for the connection. This should be the same as the 227 * HostAndPort passed to DependentTypeFactoryInterface::makeConnection. 228 */ 229 virtual const HostAndPort& getHostAndPort() const = 0; 230 231 /** 232 * Check if the connection is healthy using some implementation defined condition. 233 */ 234 virtual bool isHealthy() = 0; 235 236 protected: 237 /** 238 * Making these protected makes the definitions available to override in 239 * children. 240 */ 241 using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>; 242 using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>; 243 244 private: 245 /** 246 * This method updates a 'liveness' timestamp to avoid unnecessarily refreshing 247 * the connection. 248 */ 249 virtual void indicateUsed() = 0; 250 251 /** 252 * Returns the last used time point for the connection 253 */ 254 virtual Date_t getLastUsed() const = 0; 255 256 /** 257 * Returns the status associated with the connection. If the status is not 258 * OK, the connection will not be returned to the pool. 259 */ 260 virtual const Status& getStatus() const = 0; 261 262 /** 263 * Sets up the connection. This should include connection + auth + any 264 * other associated hooks. 265 */ 266 virtual void setup(Milliseconds timeout, SetupCallback cb) = 0; 267 268 /** 269 * Resets the connection's state to kConnectionStateUnknown for the next user. 270 */ 271 virtual void resetToUnknown() = 0; 272 273 /** 274 * Refreshes the connection. This should involve a network round trip and 275 * should strongly imply an active connection 276 */ 277 virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0; 278 279 /** 280 * Get the generation of the connection. This is used to track whether to 281 * continue using a connection after a call to dropConnections() by noting 282 * if the generation on the specific pool is the same as the generation on 283 * a connection (if not the connection is from a previous era and should 284 * not be re-used). 285 */ 286 virtual size_t getGeneration() const = 0; 287 }; 288 289 /** 290 * Implementation interface for the connection pool 291 * 292 * This factory provides generators for connections, timers and a clock for the 293 * connection pool. 294 */ 295 class ConnectionPool::DependentTypeFactoryInterface { 296 MONGO_DISALLOW_COPYING(DependentTypeFactoryInterface); 297 298 public: 299 DependentTypeFactoryInterface() = default; 300 301 virtual ~DependentTypeFactoryInterface() = default; 302 303 /** 304 * Makes a new connection given a host and port 305 */ 306 virtual std::unique_ptr<ConnectionInterface> makeConnection(const HostAndPort& hostAndPort, 307 size_t generation) = 0; 308 309 /** 310 * Makes a new timer 311 */ 312 virtual std::unique_ptr<TimerInterface> makeTimer() = 0; 313 314 /** 315 * Returns the current time point 316 */ 317 virtual Date_t now() = 0; 318 }; 319 320 } // namespace executor 321 } // namespace mongo 322