1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/client/connection_pool.h"
34
35 #include "mongo/client/connpool.h"
36 #include "mongo/client/mongo_uri.h"
37 #include "mongo/db/auth/authorization_manager_global.h"
38 #include "mongo/db/auth/internal_user_auth.h"
39 #include "mongo/executor/network_connection_hook.h"
40 #include "mongo/executor/remote_command_request.h"
41 #include "mongo/executor/remote_command_response.h"
42 #include "mongo/rpc/reply_interface.h"
43 #include "mongo/rpc/unique_message.h"
44
45 namespace mongo {
46 namespace {
47
48 const Date_t kNeverTooStale = Date_t::max();
49
50 // TODO: Workaround for SERVER-19092. To be lowered back to 5 min / 30 sec once the bug is fixed
51 const Hours kCleanUpInterval(1); // Note: Must be larger than kMaxConnectionAge below)
52 const Minutes kMaxConnectionAge(30);
53
54 } // namespace
55
ConnectionPool(int messagingPortTags,std::unique_ptr<executor::NetworkConnectionHook> hook)56 ConnectionPool::ConnectionPool(int messagingPortTags,
57 std::unique_ptr<executor::NetworkConnectionHook> hook)
58 : _messagingPortTags(messagingPortTags),
59 _lastCleanUpTime(Date_t::now()),
60 _hook(std::move(hook)) {}
61
ConnectionPool(int messagingPortTags)62 ConnectionPool::ConnectionPool(int messagingPortTags)
63 : ConnectionPool(messagingPortTags, nullptr) {}
64
~ConnectionPool()65 ConnectionPool::~ConnectionPool() {
66 cleanUpOlderThan(Date_t::max());
67
68 invariant(_connections.empty());
69 invariant(_inUseConnections.empty());
70 }
71
cleanUpOlderThan(Date_t now)72 void ConnectionPool::cleanUpOlderThan(Date_t now) {
73 stdx::lock_guard<stdx::mutex> lk(_mutex);
74
75 HostConnectionMap::iterator hostConns = _connections.begin();
76 while (hostConns != _connections.end()) {
77 _cleanUpOlderThan_inlock(now, &hostConns->second);
78 if (hostConns->second.empty()) {
79 _connections.erase(hostConns++);
80 } else {
81 ++hostConns;
82 }
83 }
84 }
85
_cleanUpOlderThan_inlock(Date_t now,ConnectionList * hostConns)86 void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns) {
87 ConnectionList::iterator iter = hostConns->begin();
88 while (iter != hostConns->end()) {
89 if (_shouldKeepConnection(now, *iter)) {
90 ++iter;
91 } else {
92 _destroyConnection_inlock(hostConns, iter++);
93 }
94 }
95 }
96
_shouldKeepConnection(Date_t now,const ConnectionInfo & connInfo) const97 bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const {
98 const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge;
99 if (expirationDate <= now) {
100 return false;
101 }
102
103 return true;
104 }
105
closeAllInUseConnections()106 void ConnectionPool::closeAllInUseConnections() {
107 stdx::lock_guard<stdx::mutex> lk(_mutex);
108 for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end();
109 ++iter) {
110 iter->conn->port().shutdown();
111 }
112 }
113
_cleanUpStaleHosts_inlock(Date_t now)114 void ConnectionPool::_cleanUpStaleHosts_inlock(Date_t now) {
115 if (now > _lastCleanUpTime + kCleanUpInterval) {
116 for (HostLastUsedMap::iterator itr = _lastUsedHosts.begin(); itr != _lastUsedHosts.end();
117 itr++) {
118 if (itr->second <= _lastCleanUpTime) {
119 ConnectionList connList = _connections.find(itr->first)->second;
120 _cleanUpOlderThan_inlock(now, &connList);
121 invariant(connList.empty());
122 itr->second = kNeverTooStale;
123 }
124 }
125
126 _lastCleanUpTime = now;
127 }
128 }
129
acquireConnection(const HostAndPort & target,Date_t now,Milliseconds timeout)130 ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection(
131 const HostAndPort& target, Date_t now, Milliseconds timeout) {
132 stdx::unique_lock<stdx::mutex> lk(_mutex);
133
134 // Clean up connections on stale/unused hosts
135 _cleanUpStaleHosts_inlock(now);
136
137 for (HostConnectionMap::iterator hostConns;
138 (hostConns = _connections.find(target)) != _connections.end();) {
139 // Clean up the requested host to remove stale/unused connections
140 _cleanUpOlderThan_inlock(now, &hostConns->second);
141
142 if (hostConns->second.empty()) {
143 // prevent host from causing unnecessary cleanups
144 _lastUsedHosts[hostConns->first] = kNeverTooStale;
145 break;
146 }
147
148 _inUseConnections.splice(
149 _inUseConnections.begin(), hostConns->second, hostConns->second.begin());
150
151 const ConnectionList::iterator candidate = _inUseConnections.begin();
152 lk.unlock();
153
154 try {
155 if (candidate->conn->isStillConnected()) {
156 // setSoTimeout takes a double representing the number of seconds for send and
157 // receive timeouts. Thus, we must express 'timeout' in milliseconds and divide by
158 // 1000.0 to get the number of seconds with a fractional part.
159 candidate->conn->setSoTimeout(durationCount<Milliseconds>(timeout) / 1000.0);
160 return candidate;
161 }
162 } catch (...) {
163 lk.lock();
164 _destroyConnection_inlock(&_inUseConnections, candidate);
165 throw;
166 }
167
168 lk.lock();
169 _destroyConnection_inlock(&_inUseConnections, candidate);
170 }
171
172 // No idle connection in the pool; make a new one.
173 lk.unlock();
174
175 std::unique_ptr<DBClientConnection> conn;
176 if (_hook) {
177 conn.reset(new DBClientConnection(
178 false, // auto reconnect
179 0, // socket timeout
180 {}, // MongoURI
181 [this, target](const executor::RemoteCommandResponse& isMasterReply) {
182 return _hook->validateHost(target, isMasterReply);
183 }));
184 } else {
185 conn.reset(new DBClientConnection());
186 }
187
188 // setSoTimeout takes a double representing the number of seconds for send and receive
189 // timeouts. Thus, we must express 'timeout' in milliseconds and divide by 1000.0 to get
190 // the number of seconds with a fractional part.
191 conn->setSoTimeout(durationCount<Milliseconds>(timeout) / 1000.0);
192
193 uassertStatusOK(conn->connect(target, StringData()));
194 conn->port().setTag(conn->port().getTag() | _messagingPortTags);
195
196 if (isInternalAuthSet()) {
197 conn->auth(getInternalUserAuthParams());
198 }
199
200 if (_hook) {
201 auto postConnectRequest = uassertStatusOK(_hook->makeRequest(target));
202
203 // We might not have a postConnectRequest
204 if (postConnectRequest != boost::none) {
205 auto start = Date_t::now();
206 auto reply =
207 conn->runCommand(OpMsgRequest::fromDBAndBody(postConnectRequest->dbname,
208 postConnectRequest->cmdObj,
209 postConnectRequest->metadata));
210
211 auto rcr = executor::RemoteCommandResponse(reply->getCommandReply().getOwned(),
212 reply->getMetadata().getOwned(),
213 Date_t::now() - start);
214
215 uassertStatusOK(_hook->handleReply(target, std::move(rcr)));
216 }
217 }
218
219 lk.lock();
220 return _inUseConnections.insert(_inUseConnections.begin(), ConnectionInfo(conn.release(), now));
221 }
222
releaseConnection(ConnectionList::iterator iter,const Date_t now)223 void ConnectionPool::releaseConnection(ConnectionList::iterator iter, const Date_t now) {
224 stdx::lock_guard<stdx::mutex> lk(_mutex);
225 if (!_shouldKeepConnection(now, *iter)) {
226 _destroyConnection_inlock(&_inUseConnections, iter);
227 return;
228 }
229
230 ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()];
231 _cleanUpOlderThan_inlock(now, &hostConns);
232
233 hostConns.splice(hostConns.begin(), _inUseConnections, iter);
234 _lastUsedHosts[iter->conn->getServerHostAndPort()] = now;
235 }
236
destroyConnection(ConnectionList::iterator iter)237 void ConnectionPool::destroyConnection(ConnectionList::iterator iter) {
238 stdx::lock_guard<stdx::mutex> lk(_mutex);
239 _destroyConnection_inlock(&_inUseConnections, iter);
240 }
241
_destroyConnection_inlock(ConnectionList * connList,ConnectionList::iterator iter)242 void ConnectionPool::_destroyConnection_inlock(ConnectionList* connList,
243 ConnectionList::iterator iter) {
244 delete iter->conn;
245 connList->erase(iter);
246 }
247
248
249 //
250 // ConnectionPool::ConnectionPtr
251 //
252
ConnectionPtr(ConnectionPool * pool,const HostAndPort & target,Date_t now,Milliseconds timeout)253 ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPool* pool,
254 const HostAndPort& target,
255 Date_t now,
256 Milliseconds timeout)
257 : _pool(pool), _connInfo(pool->acquireConnection(target, now, timeout)) {}
258
~ConnectionPtr()259 ConnectionPool::ConnectionPtr::~ConnectionPtr() {
260 if (_pool) {
261 _pool->destroyConnection(_connInfo);
262 }
263 }
264
ConnectionPtr(ConnectionPtr && other)265 ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPtr&& other)
266 : _pool(std::move(other._pool)), _connInfo(std::move(other._connInfo)) {
267 other._pool = nullptr;
268 }
269
operator =(ConnectionPtr && other)270 ConnectionPool::ConnectionPtr& ConnectionPool::ConnectionPtr::operator=(ConnectionPtr&& other) {
271 _pool = std::move(other._pool);
272 _connInfo = std::move(other._connInfo);
273 other._pool = nullptr;
274 return *this;
275 }
276
done(Date_t now)277 void ConnectionPool::ConnectionPtr::done(Date_t now) {
278 _pool->releaseConnection(_connInfo, now);
279 _pool = NULL;
280 }
281
282 } // namespace mongo
283