1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/executor/connection_pool.h"
36 
37 #include "mongo/bson/bsonobjbuilder.h"
38 #include "mongo/executor/connection_pool_stats.h"
39 #include "mongo/executor/remote_command_request.h"
40 #include "mongo/stdx/memory.h"
41 #include "mongo/util/assert_util.h"
42 #include "mongo/util/destructor_guard.h"
43 #include "mongo/util/log.h"
44 #include "mongo/util/lru_cache.h"
45 #include "mongo/util/scopeguard.h"
46 
47 // One interesting implementation note herein concerns how setup() and
48 // refresh() are invoked outside of the global lock, but setTimeout is not.
49 // This implementation detail simplifies mocks, allowing them to return
50 // synchronously sometimes, whereas having timeouts fire instantly adds little
51 // value. In practice, dumping the locks is always safe (because we restrict
52 // ourselves to operations over the connection).
53 
54 namespace mongo {
55 namespace executor {
56 
57 /**
58  * A pool for a specific HostAndPort
59  *
60  * Pools come into existance the first time a connection is requested and
61  * go out of existence after hostTimeout passes without any of their
62  * connections being used.
63  */
64 class ConnectionPool::SpecificPool {
65 public:
66     /**
67      * These active client methods must be used whenever entering a specific pool outside of the
68      * shutdown background task.  The presence of an active client will bump a counter on the
69      * specific pool which will prevent the shutdown thread from deleting it.
70      *
71      * The complexity comes from the need to hold a lock when writing to the
72      * _activeClients param on the specific pool.  Because the code beneath the client needs to lock
73      * and unlock the parent mutex (and can leave unlocked), we want to start the client with the
74      * lock acquired, move it into the client, then re-acquire to decrement the counter on the way
75      * out.
76      *
77      * It's used like:
78      *
79      * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
80      */
81     template <typename Callback>
runWithActiveClient(Callback && cb)82     void runWithActiveClient(Callback&& cb) {
83         runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
84                             std::forward<Callback>(cb));
85     }
86 
87     template <typename Callback>
runWithActiveClient(stdx::unique_lock<stdx::mutex> lk,Callback && cb)88     void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
89         invariant(lk.owns_lock());
90 
91         _activeClients++;
92 
93         const auto guard = MakeGuard([&] {
94             invariant(!lk.owns_lock());
95             stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
96             _activeClients--;
97         });
98 
99         {
100             decltype(lk) localLk(std::move(lk));
101             cb(std::move(localLk));
102         }
103     }
104 
105     SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
106     ~SpecificPool();
107 
108     /**
109      * Gets a connection from the specific pool. Sinks a unique_lock from the
110      * parent to preserve the lock on _mutex
111      */
112     void getConnection(const HostAndPort& hostAndPort,
113                        Milliseconds timeout,
114                        stdx::unique_lock<stdx::mutex> lk,
115                        GetConnectionCallback cb);
116 
117     /**
118      * Cascades a failure across existing connections and requests. Invoking
119      * this function drops all current connections and fails all current
120      * requests with the passed status.
121      */
122     void processFailure(const Status& status, stdx::unique_lock<stdx::mutex> lk);
123 
124     /**
125      * Returns a connection to a specific pool. Sinks a unique_lock from the
126      * parent to preserve the lock on _mutex
127      */
128     void returnConnection(ConnectionInterface* connection, stdx::unique_lock<stdx::mutex> lk);
129 
130     /**
131      * Returns the number of connections currently checked out of the pool.
132      */
133     size_t inUseConnections(const stdx::unique_lock<stdx::mutex>& lk);
134 
135     /**
136      * Returns the number of available connections in the pool.
137      */
138     size_t availableConnections(const stdx::unique_lock<stdx::mutex>& lk);
139 
140     /**
141      * Returns the number of in progress connections in the pool.
142      */
143     size_t refreshingConnections(const stdx::unique_lock<stdx::mutex>& lk);
144 
145     /**
146      * Returns the total number of connections ever created in this pool.
147      */
148     size_t createdConnections(const stdx::unique_lock<stdx::mutex>& lk);
149 
150     /**
151      * Returns the total number of connections currently open that belong to
152      * this pool. This is the sum of refreshingConnections, availableConnections,
153      * and inUseConnections.
154      */
155     size_t openConnections(const stdx::unique_lock<stdx::mutex>& lk);
156 
157 private:
158     using OwnedConnection = std::unique_ptr<ConnectionInterface>;
159     using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
160     using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
161     using Request = std::pair<Date_t, GetConnectionCallback>;
162     struct RequestComparator {
operator ()mongo::executor::ConnectionPool::SpecificPool::RequestComparator163         bool operator()(const Request& a, const Request& b) {
164             return a.first > b.first;
165         }
166     };
167 
168     void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
169 
170     void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
171 
172     void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
173 
174     void shutdown();
175 
176     template <typename OwnershipPoolType>
177     typename OwnershipPoolType::mapped_type takeFromPool(
178         OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr);
179 
180     OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
181 
182     void updateStateInLock();
183 
184 private:
185     ConnectionPool* const _parent;
186 
187     const HostAndPort _hostAndPort;
188 
189     LRUOwnershipPool _readyPool;
190     OwnershipPool _processingPool;
191     OwnershipPool _droppedProcessingPool;
192     OwnershipPool _checkedOutPool;
193 
194     std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests;
195 
196     std::unique_ptr<TimerInterface> _requestTimer;
197     Date_t _requestTimerExpiration;
198     size_t _activeClients;
199     size_t _generation;
200     bool _inFulfillRequests;
201     bool _inSpawnConnections;
202 
203     size_t _created;
204 
205     /**
206      * The current state of the pool
207      *
208      * The pool begins in a running state. Moves to idle when no requests
209      * are pending and no connections are checked out. It finally enters
210      * shutdown after hostTimeout has passed (and waits there for current
211      * refreshes to process out).
212      *
213      * At any point a new request sets the state back to running and
214      * restarts all timers.
215      */
216     enum class State {
217         // The pool is active
218         kRunning,
219 
220         // No current activity, waiting for hostTimeout to pass
221         kIdle,
222 
223         // hostTimeout is passed, we're waiting for any processing
224         // connections to finish before shutting down
225         kInShutdown,
226     };
227 
228     State _state;
229 };
230 
231 constexpr Milliseconds ConnectionPool::kDefaultHostTimeout;
232 size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits<size_t>::max();
233 size_t const ConnectionPool::kDefaultMinConns = 1;
234 size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits<size_t>::max();
235 constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement;
236 constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout;
237 
238 const Status ConnectionPool::kConnectionStateUnknown =
239     Status(ErrorCodes::InternalError, "Connection is in an unknown state");
240 
ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,std::string name,Options options)241 ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,
242                                std::string name,
243                                Options options)
244     : _name(std::move(name)), _options(std::move(options)), _factory(std::move(impl)) {}
245 
246 ConnectionPool::~ConnectionPool() = default;
247 
dropConnections(const HostAndPort & hostAndPort)248 void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
249     stdx::unique_lock<stdx::mutex> lk(_mutex);
250 
251     auto iter = _pools.find(hostAndPort);
252 
253     if (iter == _pools.end())
254         return;
255 
256     iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
257         iter->second->processFailure(
258             Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
259             std::move(lk));
260     });
261 }
262 
get(const HostAndPort & hostAndPort,Milliseconds timeout,GetConnectionCallback cb)263 void ConnectionPool::get(const HostAndPort& hostAndPort,
264                          Milliseconds timeout,
265                          GetConnectionCallback cb) {
266     SpecificPool* pool;
267 
268     stdx::unique_lock<stdx::mutex> lk(_mutex);
269 
270     auto iter = _pools.find(hostAndPort);
271 
272     if (iter == _pools.end()) {
273         auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort);
274         pool = handle.get();
275         _pools[hostAndPort] = std::move(handle);
276     } else {
277         pool = iter->second.get();
278     }
279 
280     invariant(pool);
281 
282     pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
283         pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
284     });
285 }
286 
appendConnectionStats(ConnectionPoolStats * stats) const287 void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
288     stdx::unique_lock<stdx::mutex> lk(_mutex);
289 
290     for (const auto& kv : _pools) {
291         HostAndPort host = kv.first;
292 
293         auto& pool = kv.second;
294         ConnectionStatsPer hostStats{pool->inUseConnections(lk),
295                                      pool->availableConnections(lk),
296                                      pool->createdConnections(lk),
297                                      pool->refreshingConnections(lk)};
298         stats->updateStatsForHost(_name, host, hostStats);
299     }
300 }
301 
getNumConnectionsPerHost(const HostAndPort & hostAndPort) const302 size_t ConnectionPool::getNumConnectionsPerHost(const HostAndPort& hostAndPort) const {
303     stdx::unique_lock<stdx::mutex> lk(_mutex);
304     auto iter = _pools.find(hostAndPort);
305     if (iter != _pools.end()) {
306         return iter->second->openConnections(lk);
307     }
308 
309     return 0;
310 }
311 
returnConnection(ConnectionInterface * conn)312 void ConnectionPool::returnConnection(ConnectionInterface* conn) {
313     stdx::unique_lock<stdx::mutex> lk(_mutex);
314 
315     auto iter = _pools.find(conn->getHostAndPort());
316 
317     invariant(iter != _pools.end(),
318               str::stream() << "Tried to return connection but no pool found for "
319                             << conn->getHostAndPort());
320 
321     iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
322         iter->second->returnConnection(conn, std::move(lk));
323     });
324 }
325 
SpecificPool(ConnectionPool * parent,const HostAndPort & hostAndPort)326 ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
327     : _parent(parent),
328       _hostAndPort(hostAndPort),
329       _readyPool(std::numeric_limits<size_t>::max()),
330       _requestTimer(parent->_factory->makeTimer()),
331       _activeClients(0),
332       _generation(0),
333       _inFulfillRequests(false),
334       _inSpawnConnections(false),
335       _created(0),
336       _state(State::kRunning) {}
337 
~SpecificPool()338 ConnectionPool::SpecificPool::~SpecificPool() {
339     DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
340 }
341 
inUseConnections(const stdx::unique_lock<stdx::mutex> & lk)342 size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock<stdx::mutex>& lk) {
343     return _checkedOutPool.size();
344 }
345 
availableConnections(const stdx::unique_lock<stdx::mutex> & lk)346 size_t ConnectionPool::SpecificPool::availableConnections(
347     const stdx::unique_lock<stdx::mutex>& lk) {
348     return _readyPool.size();
349 }
350 
refreshingConnections(const stdx::unique_lock<stdx::mutex> & lk)351 size_t ConnectionPool::SpecificPool::refreshingConnections(
352     const stdx::unique_lock<stdx::mutex>& lk) {
353     return _processingPool.size();
354 }
355 
createdConnections(const stdx::unique_lock<stdx::mutex> & lk)356 size_t ConnectionPool::SpecificPool::createdConnections(const stdx::unique_lock<stdx::mutex>& lk) {
357     return _created;
358 }
359 
openConnections(const stdx::unique_lock<stdx::mutex> & lk)360 size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<stdx::mutex>& lk) {
361     return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
362 }
363 
getConnection(const HostAndPort & hostAndPort,Milliseconds timeout,stdx::unique_lock<stdx::mutex> lk,GetConnectionCallback cb)364 void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
365                                                  Milliseconds timeout,
366                                                  stdx::unique_lock<stdx::mutex> lk,
367                                                  GetConnectionCallback cb) {
368     if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
369         timeout = _parent->_options.refreshTimeout;
370     }
371 
372     const auto expiration = _parent->_factory->now() + timeout;
373 
374     _requests.push(make_pair(expiration, std::move(cb)));
375 
376     updateStateInLock();
377 
378     spawnConnections(lk);
379     fulfillRequests(lk);
380 }
381 
returnConnection(ConnectionInterface * connPtr,stdx::unique_lock<stdx::mutex> lk)382 void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
383                                                     stdx::unique_lock<stdx::mutex> lk) {
384     auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
385 
386     auto conn = takeFromPool(_checkedOutPool, connPtr);
387 
388     updateStateInLock();
389 
390     // Users are required to call indicateSuccess() or indicateFailure() before allowing
391     // a connection to be returned. Otherwise, we have entered an unknown state.
392     invariant(conn->getStatus() != kConnectionStateUnknown);
393 
394     if (conn->getGeneration() != _generation) {
395         // If the connection is from an older generation, just return.
396         return;
397     }
398 
399     if (!conn->getStatus().isOK()) {
400         // TODO: alert via some callback if the host is bad
401         log() << "Ending connection to host " << _hostAndPort << " due to bad connection status; "
402               << openConnections(lk) << " connections to that host remain open";
403         return;
404     }
405 
406     auto now = _parent->_factory->now();
407     if (needsRefreshTP <= now) {
408         // If we need to refresh this connection
409 
410         if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
411             _parent->_options.minConnections) {
412             // If we already have minConnections, just let the connection lapse
413             log() << "Ending idle connection to host " << _hostAndPort
414                   << " because the pool meets constraints; " << openConnections(lk)
415                   << " connections to that host remain open";
416             return;
417         }
418 
419         _processingPool[connPtr] = std::move(conn);
420 
421         // Unlock in case refresh can occur immediately
422         lk.unlock();
423         connPtr->refresh(
424             _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
425                 connPtr->indicateUsed();
426 
427                 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
428                     auto conn = takeFromProcessingPool(connPtr);
429 
430                     // If the host and port were dropped, let this lapse
431                     if (conn->getGeneration() != _generation) {
432                         spawnConnections(lk);
433                         return;
434                     }
435 
436                     // If we're in shutdown, we don't need refreshed connections
437                     if (_state == State::kInShutdown)
438                         return;
439 
440                     // If the connection refreshed successfully, throw it back in
441                     // the ready pool
442                     if (status.isOK()) {
443                         addToReady(lk, std::move(conn));
444                         spawnConnections(lk);
445                         return;
446                     }
447 
448                     // If we've exceeded the time limit, start a new connect,
449                     // rather than failing all operations.  We do this because the
450                     // various callers have their own time limit which is unrelated
451                     // to our internal one.
452                     if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
453                         log() << "Pending connection to host " << _hostAndPort
454                               << " did not complete within the connection timeout,"
455                               << " retrying with a new connection;" << openConnections(lk)
456                               << " connections to that host remain open";
457                         spawnConnections(lk);
458                         return;
459                     }
460 
461                     // Otherwise pass the failure on through
462                     processFailure(status, std::move(lk));
463                 });
464             });
465         lk.lock();
466     } else {
467         // If it's fine as it is, just put it in the ready queue
468         addToReady(lk, std::move(conn));
469     }
470 
471     updateStateInLock();
472 }
473 
474 // Adds a live connection to the ready pool
addToReady(stdx::unique_lock<stdx::mutex> & lk,OwnedConnection conn)475 void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk,
476                                               OwnedConnection conn) {
477     auto connPtr = conn.get();
478 
479     // This makes the connection the new most-recently-used connection.
480     _readyPool.add(connPtr, std::move(conn));
481 
482     // Our strategy for refreshing connections is to check them out and
483     // immediately check them back in (which kicks off the refresh logic in
484     // returnConnection
485     connPtr->setTimeout(_parent->_options.refreshRequirement, [this, connPtr]() {
486         OwnedConnection conn;
487 
488         runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
489             if (!_readyPool.count(connPtr)) {
490                 // We've already been checked out. We don't need to refresh
491                 // ourselves.
492                 return;
493             }
494 
495             conn = takeFromPool(_readyPool, connPtr);
496 
497             // If we're in shutdown, we don't need to refresh connections
498             if (_state == State::kInShutdown)
499                 return;
500 
501             _checkedOutPool[connPtr] = std::move(conn);
502 
503             connPtr->indicateSuccess();
504 
505             returnConnection(connPtr, std::move(lk));
506         });
507     });
508 
509     fulfillRequests(lk);
510 }
511 
512 // Drop connections and fail all requests
processFailure(const Status & status,stdx::unique_lock<stdx::mutex> lk)513 void ConnectionPool::SpecificPool::processFailure(const Status& status,
514                                                   stdx::unique_lock<stdx::mutex> lk) {
515     // Bump the generation so we don't reuse any pending or checked out
516     // connections
517     _generation++;
518 
519     // Drop ready connections
520     _readyPool.clear();
521 
522     // Log something helpful
523     log() << "Dropping all pooled connections to " << _hostAndPort
524           << " due to failed operation on a connection";
525 
526     // Migrate processing connections to the dropped pool
527     for (auto&& x : _processingPool) {
528         _droppedProcessingPool[x.first] = std::move(x.second);
529     }
530     _processingPool.clear();
531 
532     // Move the requests out so they aren't visible
533     // in other threads
534     decltype(_requests) requestsToFail;
535     {
536         using std::swap;
537         swap(requestsToFail, _requests);
538     }
539 
540     // Update state to reflect the lack of requests
541     updateStateInLock();
542 
543     // Drop the lock and process all of the requests
544     // with the same failed status
545     lk.unlock();
546 
547     while (requestsToFail.size()) {
548         requestsToFail.top().second(status);
549         requestsToFail.pop();
550     }
551 }
552 
553 // fulfills as many outstanding requests as possible
fulfillRequests(stdx::unique_lock<stdx::mutex> & lk)554 void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex>& lk) {
555     // If some other thread (possibly this thread) is fulfilling requests,
556     // don't keep padding the callstack.
557     if (_inFulfillRequests)
558         return;
559 
560     _inFulfillRequests = true;
561     auto guard = MakeGuard([&] { _inFulfillRequests = false; });
562 
563     while (_requests.size()) {
564         // _readyPool is an LRUCache, so its begin() object is the MRU item.
565         auto iter = _readyPool.begin();
566 
567         if (iter == _readyPool.end())
568             break;
569 
570         // Grab the connection and cancel its timeout
571         auto conn = std::move(iter->second);
572         _readyPool.erase(iter);
573         conn->cancelTimeout();
574 
575         if (!conn->isHealthy()) {
576             log() << "dropping unhealthy pooled connection to " << conn->getHostAndPort();
577 
578             if (_readyPool.empty()) {
579                 log() << "after drop, pool was empty, going to spawn some connections";
580                 // Spawn some more connections to the bad host if we're all out.
581                 spawnConnections(lk);
582             }
583 
584             // Drop the bad connection.
585             conn.reset();
586             // Retry.
587             continue;
588         }
589 
590         // Grab the request and callback
591         auto cb = std::move(_requests.top().second);
592         _requests.pop();
593 
594         auto connPtr = conn.get();
595 
596         // check out the connection
597         _checkedOutPool[connPtr] = std::move(conn);
598 
599         updateStateInLock();
600 
601         // pass it to the user
602         connPtr->resetToUnknown();
603         lk.unlock();
604         cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
605         lk.lock();
606     }
607 }
608 
609 // spawn enough connections to satisfy open requests and minpool, while
610 // honoring maxpool
spawnConnections(stdx::unique_lock<stdx::mutex> & lk)611 void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mutex>& lk) {
612     // If some other thread (possibly this thread) is spawning connections,
613     // don't keep padding the callstack.
614     if (_inSpawnConnections)
615         return;
616 
617     _inSpawnConnections = true;
618     auto guard = MakeGuard([&] { _inSpawnConnections = false; });
619 
620     // We want minConnections <= outstanding requests <= maxConnections
621     auto target = [&] {
622         return std::max(
623             _parent->_options.minConnections,
624             std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections));
625     };
626 
627     // While all of our inflight connections are less than our target
628     while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
629            (_processingPool.size() < _parent->_options.maxConnecting)) {
630         std::unique_ptr<ConnectionPool::ConnectionInterface> handle;
631         try {
632             // make a new connection and put it in processing
633             handle = _parent->_factory->makeConnection(_hostAndPort, _generation);
634         } catch (std::system_error& e) {
635             severe() << "Failed to construct a new connection object: " << e.what();
636             fassertFailed(40336);
637         }
638 
639         auto connPtr = handle.get();
640         _processingPool[connPtr] = std::move(handle);
641 
642         ++_created;
643 
644         // Run the setup callback
645         lk.unlock();
646         connPtr->setup(
647             _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
648                 connPtr->indicateUsed();
649 
650                 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
651                     auto conn = takeFromProcessingPool(connPtr);
652 
653                     if (conn->getGeneration() != _generation) {
654                         // If the host and port was dropped, let the
655                         // connection lapse
656                         spawnConnections(lk);
657                     } else if (status.isOK()) {
658                         addToReady(lk, std::move(conn));
659                         spawnConnections(lk);
660                     } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
661                         // If we've exceeded the time limit, restart the connect, rather than
662                         // failing all operations.  We do this because the various callers
663                         // have their own time limit which is unrelated to our internal one.
664                         spawnConnections(lk);
665                     } else {
666                         // If the setup failed, cascade the failure edge
667                         processFailure(status, std::move(lk));
668                     }
669                 });
670             });
671         // Note that this assumes that the refreshTimeout is sound for the
672         // setupTimeout
673 
674         lk.lock();
675     }
676 }
677 
678 // Called every second after hostTimeout until all processing connections reap
shutdown()679 void ConnectionPool::SpecificPool::shutdown() {
680     stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
681 
682     // We're racing:
683     //
684     // Thread A (this thread)
685     //   * Fired the shutdown timer
686     //   * Came into shutdown() and blocked
687     //
688     // Thread B (some new consumer)
689     //   * Requested a new connection
690     //   * Beat thread A to the mutex
691     //   * Cancelled timer (but thread A already made it in)
692     //   * Set state to running
693     //   * released the mutex
694     //
695     // So we end up in shutdown, but with kRunning.  If we're here we raced and
696     // we should just bail.
697     if (_state == State::kRunning) {
698         return;
699     }
700 
701     _state = State::kInShutdown;
702 
703     // If we have processing connections, wait for them to finish or timeout
704     // before shutdown
705     if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) {
706         _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
707 
708         return;
709     }
710 
711     invariant(_requests.empty());
712     invariant(_checkedOutPool.empty());
713 
714     _parent->_pools.erase(_hostAndPort);
715 }
716 
717 template <typename OwnershipPoolType>
takeFromPool(OwnershipPoolType & pool,typename OwnershipPoolType::key_type connPtr)718 typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool(
719     OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) {
720     auto iter = pool.find(connPtr);
721     invariant(iter != pool.end());
722 
723     auto conn = std::move(iter->second);
724     pool.erase(iter);
725     return conn;
726 }
727 
takeFromProcessingPool(ConnectionInterface * connPtr)728 ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool(
729     ConnectionInterface* connPtr) {
730     if (_processingPool.count(connPtr))
731         return takeFromPool(_processingPool, connPtr);
732 
733     return takeFromPool(_droppedProcessingPool, connPtr);
734 }
735 
736 
737 // Updates our state and manages the request timer
updateStateInLock()738 void ConnectionPool::SpecificPool::updateStateInLock() {
739     if (_requests.size()) {
740         // We have some outstanding requests, we're live
741 
742         // If we were already running and the timer is the same as it was
743         // before, nothing to do
744         if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first)
745             return;
746 
747         _state = State::kRunning;
748 
749         _requestTimer->cancelTimeout();
750 
751         _requestTimerExpiration = _requests.top().first;
752 
753         auto timeout = _requests.top().first - _parent->_factory->now();
754 
755         // We set a timer for the most recent request, then invoke each timed
756         // out request we couldn't service
757         _requestTimer->setTimeout(timeout, [this]() {
758             runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
759                 auto now = _parent->_factory->now();
760 
761                 while (_requests.size()) {
762                     auto& x = _requests.top();
763 
764                     if (x.first <= now) {
765                         auto cb = std::move(x.second);
766                         _requests.pop();
767 
768                         lk.unlock();
769                         cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
770                                   "Couldn't get a connection within the time limit"));
771                         lk.lock();
772                     } else {
773                         break;
774                     }
775                 }
776 
777                 updateStateInLock();
778             });
779         });
780     } else if (_checkedOutPool.size()) {
781         // If we have no requests, but someone's using a connection, we just
782         // hang around until the next request or a return
783 
784         _requestTimer->cancelTimeout();
785         _state = State::kRunning;
786         _requestTimerExpiration = _requestTimerExpiration.max();
787     } else {
788         // If we don't have any live requests and no one has checked out connections
789 
790         // If we used to be idle, just bail
791         if (_state == State::kIdle)
792             return;
793 
794         _state = State::kIdle;
795 
796         _requestTimer->cancelTimeout();
797 
798         _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout;
799 
800         auto timeout = _parent->_options.hostTimeout;
801 
802         // Set the shutdown timer
803         _requestTimer->setTimeout(timeout, [this]() { shutdown(); });
804     }
805 }
806 
807 }  // namespace executor
808 }  // namespace mongo
809