1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/client/connection_pool.h"
34 
35 #include "mongo/client/connpool.h"
36 #include "mongo/client/mongo_uri.h"
37 #include "mongo/db/auth/authorization_manager_global.h"
38 #include "mongo/db/auth/internal_user_auth.h"
39 #include "mongo/executor/network_connection_hook.h"
40 #include "mongo/executor/remote_command_request.h"
41 #include "mongo/executor/remote_command_response.h"
42 #include "mongo/rpc/reply_interface.h"
43 #include "mongo/rpc/unique_message.h"
44 
45 namespace mongo {
46 namespace {
47 
48 const Date_t kNeverTooStale = Date_t::max();
49 
50 // TODO: Workaround for SERVER-19092. To be lowered back to 5 min / 30 sec once the bug is fixed
51 const Hours kCleanUpInterval(1);  // Note: Must be larger than kMaxConnectionAge below)
52 const Minutes kMaxConnectionAge(30);
53 
54 }  // namespace
55 
ConnectionPool(int messagingPortTags,std::unique_ptr<executor::NetworkConnectionHook> hook)56 ConnectionPool::ConnectionPool(int messagingPortTags,
57                                std::unique_ptr<executor::NetworkConnectionHook> hook)
58     : _messagingPortTags(messagingPortTags),
59       _lastCleanUpTime(Date_t::now()),
60       _hook(std::move(hook)) {}
61 
ConnectionPool(int messagingPortTags)62 ConnectionPool::ConnectionPool(int messagingPortTags)
63     : ConnectionPool(messagingPortTags, nullptr) {}
64 
~ConnectionPool()65 ConnectionPool::~ConnectionPool() {
66     cleanUpOlderThan(Date_t::max());
67 
68     invariant(_connections.empty());
69     invariant(_inUseConnections.empty());
70 }
71 
cleanUpOlderThan(Date_t now)72 void ConnectionPool::cleanUpOlderThan(Date_t now) {
73     stdx::lock_guard<stdx::mutex> lk(_mutex);
74 
75     HostConnectionMap::iterator hostConns = _connections.begin();
76     while (hostConns != _connections.end()) {
77         _cleanUpOlderThan_inlock(now, &hostConns->second);
78         if (hostConns->second.empty()) {
79             _connections.erase(hostConns++);
80         } else {
81             ++hostConns;
82         }
83     }
84 }
85 
_cleanUpOlderThan_inlock(Date_t now,ConnectionList * hostConns)86 void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostConns) {
87     ConnectionList::iterator iter = hostConns->begin();
88     while (iter != hostConns->end()) {
89         if (_shouldKeepConnection(now, *iter)) {
90             ++iter;
91         } else {
92             _destroyConnection_inlock(hostConns, iter++);
93         }
94     }
95 }
96 
_shouldKeepConnection(Date_t now,const ConnectionInfo & connInfo) const97 bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const {
98     const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge;
99     if (expirationDate <= now) {
100         return false;
101     }
102 
103     return true;
104 }
105 
closeAllInUseConnections()106 void ConnectionPool::closeAllInUseConnections() {
107     stdx::lock_guard<stdx::mutex> lk(_mutex);
108     for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end();
109          ++iter) {
110         iter->conn->port().shutdown();
111     }
112 }
113 
_cleanUpStaleHosts_inlock(Date_t now)114 void ConnectionPool::_cleanUpStaleHosts_inlock(Date_t now) {
115     if (now > _lastCleanUpTime + kCleanUpInterval) {
116         for (HostLastUsedMap::iterator itr = _lastUsedHosts.begin(); itr != _lastUsedHosts.end();
117              itr++) {
118             if (itr->second <= _lastCleanUpTime) {
119                 ConnectionList connList = _connections.find(itr->first)->second;
120                 _cleanUpOlderThan_inlock(now, &connList);
121                 invariant(connList.empty());
122                 itr->second = kNeverTooStale;
123             }
124         }
125 
126         _lastCleanUpTime = now;
127     }
128 }
129 
acquireConnection(const HostAndPort & target,Date_t now,Milliseconds timeout)130 ConnectionPool::ConnectionList::iterator ConnectionPool::acquireConnection(
131     const HostAndPort& target, Date_t now, Milliseconds timeout) {
132     stdx::unique_lock<stdx::mutex> lk(_mutex);
133 
134     // Clean up connections on stale/unused hosts
135     _cleanUpStaleHosts_inlock(now);
136 
137     for (HostConnectionMap::iterator hostConns;
138          (hostConns = _connections.find(target)) != _connections.end();) {
139         // Clean up the requested host to remove stale/unused connections
140         _cleanUpOlderThan_inlock(now, &hostConns->second);
141 
142         if (hostConns->second.empty()) {
143             // prevent host from causing unnecessary cleanups
144             _lastUsedHosts[hostConns->first] = kNeverTooStale;
145             break;
146         }
147 
148         _inUseConnections.splice(
149             _inUseConnections.begin(), hostConns->second, hostConns->second.begin());
150 
151         const ConnectionList::iterator candidate = _inUseConnections.begin();
152         lk.unlock();
153 
154         try {
155             if (candidate->conn->isStillConnected()) {
156                 // setSoTimeout takes a double representing the number of seconds for send and
157                 // receive timeouts.  Thus, we must express 'timeout' in milliseconds and divide by
158                 // 1000.0 to get the number of seconds with a fractional part.
159                 candidate->conn->setSoTimeout(durationCount<Milliseconds>(timeout) / 1000.0);
160                 return candidate;
161             }
162         } catch (...) {
163             lk.lock();
164             _destroyConnection_inlock(&_inUseConnections, candidate);
165             throw;
166         }
167 
168         lk.lock();
169         _destroyConnection_inlock(&_inUseConnections, candidate);
170     }
171 
172     // No idle connection in the pool; make a new one.
173     lk.unlock();
174 
175     std::unique_ptr<DBClientConnection> conn;
176     if (_hook) {
177         conn.reset(new DBClientConnection(
178             false,  // auto reconnect
179             0,      // socket timeout
180             {},     // MongoURI
181             [this, target](const executor::RemoteCommandResponse& isMasterReply) {
182                 return _hook->validateHost(target, isMasterReply);
183             }));
184     } else {
185         conn.reset(new DBClientConnection());
186     }
187 
188     // setSoTimeout takes a double representing the number of seconds for send and receive
189     // timeouts.  Thus, we must express 'timeout' in milliseconds and divide by 1000.0 to get
190     // the number of seconds with a fractional part.
191     conn->setSoTimeout(durationCount<Milliseconds>(timeout) / 1000.0);
192 
193     uassertStatusOK(conn->connect(target, StringData()));
194     conn->port().setTag(conn->port().getTag() | _messagingPortTags);
195 
196     if (isInternalAuthSet()) {
197         conn->auth(getInternalUserAuthParams());
198     }
199 
200     if (_hook) {
201         auto postConnectRequest = uassertStatusOK(_hook->makeRequest(target));
202 
203         // We might not have a postConnectRequest
204         if (postConnectRequest != boost::none) {
205             auto start = Date_t::now();
206             auto reply =
207                 conn->runCommand(OpMsgRequest::fromDBAndBody(postConnectRequest->dbname,
208                                                              postConnectRequest->cmdObj,
209                                                              postConnectRequest->metadata));
210 
211             auto rcr = executor::RemoteCommandResponse(reply->getCommandReply().getOwned(),
212                                                        reply->getMetadata().getOwned(),
213                                                        Date_t::now() - start);
214 
215             uassertStatusOK(_hook->handleReply(target, std::move(rcr)));
216         }
217     }
218 
219     lk.lock();
220     return _inUseConnections.insert(_inUseConnections.begin(), ConnectionInfo(conn.release(), now));
221 }
222 
releaseConnection(ConnectionList::iterator iter,const Date_t now)223 void ConnectionPool::releaseConnection(ConnectionList::iterator iter, const Date_t now) {
224     stdx::lock_guard<stdx::mutex> lk(_mutex);
225     if (!_shouldKeepConnection(now, *iter)) {
226         _destroyConnection_inlock(&_inUseConnections, iter);
227         return;
228     }
229 
230     ConnectionList& hostConns = _connections[iter->conn->getServerHostAndPort()];
231     _cleanUpOlderThan_inlock(now, &hostConns);
232 
233     hostConns.splice(hostConns.begin(), _inUseConnections, iter);
234     _lastUsedHosts[iter->conn->getServerHostAndPort()] = now;
235 }
236 
destroyConnection(ConnectionList::iterator iter)237 void ConnectionPool::destroyConnection(ConnectionList::iterator iter) {
238     stdx::lock_guard<stdx::mutex> lk(_mutex);
239     _destroyConnection_inlock(&_inUseConnections, iter);
240 }
241 
_destroyConnection_inlock(ConnectionList * connList,ConnectionList::iterator iter)242 void ConnectionPool::_destroyConnection_inlock(ConnectionList* connList,
243                                                ConnectionList::iterator iter) {
244     delete iter->conn;
245     connList->erase(iter);
246 }
247 
248 
249 //
250 // ConnectionPool::ConnectionPtr
251 //
252 
ConnectionPtr(ConnectionPool * pool,const HostAndPort & target,Date_t now,Milliseconds timeout)253 ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPool* pool,
254                                              const HostAndPort& target,
255                                              Date_t now,
256                                              Milliseconds timeout)
257     : _pool(pool), _connInfo(pool->acquireConnection(target, now, timeout)) {}
258 
~ConnectionPtr()259 ConnectionPool::ConnectionPtr::~ConnectionPtr() {
260     if (_pool) {
261         _pool->destroyConnection(_connInfo);
262     }
263 }
264 
ConnectionPtr(ConnectionPtr && other)265 ConnectionPool::ConnectionPtr::ConnectionPtr(ConnectionPtr&& other)
266     : _pool(std::move(other._pool)), _connInfo(std::move(other._connInfo)) {
267     other._pool = nullptr;
268 }
269 
operator =(ConnectionPtr && other)270 ConnectionPool::ConnectionPtr& ConnectionPool::ConnectionPtr::operator=(ConnectionPtr&& other) {
271     _pool = std::move(other._pool);
272     _connInfo = std::move(other._connInfo);
273     other._pool = nullptr;
274     return *this;
275 }
276 
done(Date_t now)277 void ConnectionPool::ConnectionPtr::done(Date_t now) {
278     _pool->releaseConnection(_connInfo, now);
279     _pool = NULL;
280 }
281 
282 }  // namespace mongo
283