1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kASIO
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/executor/connection_pool.h"
36
37 #include "mongo/bson/bsonobjbuilder.h"
38 #include "mongo/executor/connection_pool_stats.h"
39 #include "mongo/executor/remote_command_request.h"
40 #include "mongo/stdx/memory.h"
41 #include "mongo/util/assert_util.h"
42 #include "mongo/util/destructor_guard.h"
43 #include "mongo/util/log.h"
44 #include "mongo/util/lru_cache.h"
45 #include "mongo/util/scopeguard.h"
46
47 // One interesting implementation note herein concerns how setup() and
48 // refresh() are invoked outside of the global lock, but setTimeout is not.
49 // This implementation detail simplifies mocks, allowing them to return
50 // synchronously sometimes, whereas having timeouts fire instantly adds little
51 // value. In practice, dumping the locks is always safe (because we restrict
52 // ourselves to operations over the connection).
53
54 namespace mongo {
55 namespace executor {
56
57 /**
58 * A pool for a specific HostAndPort
59 *
60 * Pools come into existance the first time a connection is requested and
61 * go out of existence after hostTimeout passes without any of their
62 * connections being used.
63 */
64 class ConnectionPool::SpecificPool {
65 public:
66 /**
67 * These active client methods must be used whenever entering a specific pool outside of the
68 * shutdown background task. The presence of an active client will bump a counter on the
69 * specific pool which will prevent the shutdown thread from deleting it.
70 *
71 * The complexity comes from the need to hold a lock when writing to the
72 * _activeClients param on the specific pool. Because the code beneath the client needs to lock
73 * and unlock the parent mutex (and can leave unlocked), we want to start the client with the
74 * lock acquired, move it into the client, then re-acquire to decrement the counter on the way
75 * out.
76 *
77 * It's used like:
78 *
79 * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
80 */
81 template <typename Callback>
runWithActiveClient(Callback && cb)82 void runWithActiveClient(Callback&& cb) {
83 runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
84 std::forward<Callback>(cb));
85 }
86
87 template <typename Callback>
runWithActiveClient(stdx::unique_lock<stdx::mutex> lk,Callback && cb)88 void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
89 invariant(lk.owns_lock());
90
91 _activeClients++;
92
93 const auto guard = MakeGuard([&] {
94 invariant(!lk.owns_lock());
95 stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
96 _activeClients--;
97 });
98
99 {
100 decltype(lk) localLk(std::move(lk));
101 cb(std::move(localLk));
102 }
103 }
104
105 SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
106 ~SpecificPool();
107
108 /**
109 * Gets a connection from the specific pool. Sinks a unique_lock from the
110 * parent to preserve the lock on _mutex
111 */
112 void getConnection(const HostAndPort& hostAndPort,
113 Milliseconds timeout,
114 stdx::unique_lock<stdx::mutex> lk,
115 GetConnectionCallback cb);
116
117 /**
118 * Cascades a failure across existing connections and requests. Invoking
119 * this function drops all current connections and fails all current
120 * requests with the passed status.
121 */
122 void processFailure(const Status& status, stdx::unique_lock<stdx::mutex> lk);
123
124 /**
125 * Returns a connection to a specific pool. Sinks a unique_lock from the
126 * parent to preserve the lock on _mutex
127 */
128 void returnConnection(ConnectionInterface* connection, stdx::unique_lock<stdx::mutex> lk);
129
130 /**
131 * Returns the number of connections currently checked out of the pool.
132 */
133 size_t inUseConnections(const stdx::unique_lock<stdx::mutex>& lk);
134
135 /**
136 * Returns the number of available connections in the pool.
137 */
138 size_t availableConnections(const stdx::unique_lock<stdx::mutex>& lk);
139
140 /**
141 * Returns the number of in progress connections in the pool.
142 */
143 size_t refreshingConnections(const stdx::unique_lock<stdx::mutex>& lk);
144
145 /**
146 * Returns the total number of connections ever created in this pool.
147 */
148 size_t createdConnections(const stdx::unique_lock<stdx::mutex>& lk);
149
150 /**
151 * Returns the total number of connections currently open that belong to
152 * this pool. This is the sum of refreshingConnections, availableConnections,
153 * and inUseConnections.
154 */
155 size_t openConnections(const stdx::unique_lock<stdx::mutex>& lk);
156
157 private:
158 using OwnedConnection = std::unique_ptr<ConnectionInterface>;
159 using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
160 using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
161 using Request = std::pair<Date_t, GetConnectionCallback>;
162 struct RequestComparator {
operator ()mongo::executor::ConnectionPool::SpecificPool::RequestComparator163 bool operator()(const Request& a, const Request& b) {
164 return a.first > b.first;
165 }
166 };
167
168 void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
169
170 void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
171
172 void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
173
174 void shutdown();
175
176 template <typename OwnershipPoolType>
177 typename OwnershipPoolType::mapped_type takeFromPool(
178 OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr);
179
180 OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
181
182 void updateStateInLock();
183
184 private:
185 ConnectionPool* const _parent;
186
187 const HostAndPort _hostAndPort;
188
189 LRUOwnershipPool _readyPool;
190 OwnershipPool _processingPool;
191 OwnershipPool _droppedProcessingPool;
192 OwnershipPool _checkedOutPool;
193
194 std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests;
195
196 std::unique_ptr<TimerInterface> _requestTimer;
197 Date_t _requestTimerExpiration;
198 size_t _activeClients;
199 size_t _generation;
200 bool _inFulfillRequests;
201 bool _inSpawnConnections;
202
203 size_t _created;
204
205 /**
206 * The current state of the pool
207 *
208 * The pool begins in a running state. Moves to idle when no requests
209 * are pending and no connections are checked out. It finally enters
210 * shutdown after hostTimeout has passed (and waits there for current
211 * refreshes to process out).
212 *
213 * At any point a new request sets the state back to running and
214 * restarts all timers.
215 */
216 enum class State {
217 // The pool is active
218 kRunning,
219
220 // No current activity, waiting for hostTimeout to pass
221 kIdle,
222
223 // hostTimeout is passed, we're waiting for any processing
224 // connections to finish before shutting down
225 kInShutdown,
226 };
227
228 State _state;
229 };
230
231 constexpr Milliseconds ConnectionPool::kDefaultHostTimeout;
232 size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits<size_t>::max();
233 size_t const ConnectionPool::kDefaultMinConns = 1;
234 size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits<size_t>::max();
235 constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement;
236 constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout;
237
238 const Status ConnectionPool::kConnectionStateUnknown =
239 Status(ErrorCodes::InternalError, "Connection is in an unknown state");
240
ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,std::string name,Options options)241 ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,
242 std::string name,
243 Options options)
244 : _name(std::move(name)), _options(std::move(options)), _factory(std::move(impl)) {}
245
246 ConnectionPool::~ConnectionPool() = default;
247
dropConnections(const HostAndPort & hostAndPort)248 void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
249 stdx::unique_lock<stdx::mutex> lk(_mutex);
250
251 auto iter = _pools.find(hostAndPort);
252
253 if (iter == _pools.end())
254 return;
255
256 iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
257 iter->second->processFailure(
258 Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
259 std::move(lk));
260 });
261 }
262
get(const HostAndPort & hostAndPort,Milliseconds timeout,GetConnectionCallback cb)263 void ConnectionPool::get(const HostAndPort& hostAndPort,
264 Milliseconds timeout,
265 GetConnectionCallback cb) {
266 SpecificPool* pool;
267
268 stdx::unique_lock<stdx::mutex> lk(_mutex);
269
270 auto iter = _pools.find(hostAndPort);
271
272 if (iter == _pools.end()) {
273 auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort);
274 pool = handle.get();
275 _pools[hostAndPort] = std::move(handle);
276 } else {
277 pool = iter->second.get();
278 }
279
280 invariant(pool);
281
282 pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
283 pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
284 });
285 }
286
appendConnectionStats(ConnectionPoolStats * stats) const287 void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
288 stdx::unique_lock<stdx::mutex> lk(_mutex);
289
290 for (const auto& kv : _pools) {
291 HostAndPort host = kv.first;
292
293 auto& pool = kv.second;
294 ConnectionStatsPer hostStats{pool->inUseConnections(lk),
295 pool->availableConnections(lk),
296 pool->createdConnections(lk),
297 pool->refreshingConnections(lk)};
298 stats->updateStatsForHost(_name, host, hostStats);
299 }
300 }
301
getNumConnectionsPerHost(const HostAndPort & hostAndPort) const302 size_t ConnectionPool::getNumConnectionsPerHost(const HostAndPort& hostAndPort) const {
303 stdx::unique_lock<stdx::mutex> lk(_mutex);
304 auto iter = _pools.find(hostAndPort);
305 if (iter != _pools.end()) {
306 return iter->second->openConnections(lk);
307 }
308
309 return 0;
310 }
311
returnConnection(ConnectionInterface * conn)312 void ConnectionPool::returnConnection(ConnectionInterface* conn) {
313 stdx::unique_lock<stdx::mutex> lk(_mutex);
314
315 auto iter = _pools.find(conn->getHostAndPort());
316
317 invariant(iter != _pools.end(),
318 str::stream() << "Tried to return connection but no pool found for "
319 << conn->getHostAndPort());
320
321 iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
322 iter->second->returnConnection(conn, std::move(lk));
323 });
324 }
325
SpecificPool(ConnectionPool * parent,const HostAndPort & hostAndPort)326 ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
327 : _parent(parent),
328 _hostAndPort(hostAndPort),
329 _readyPool(std::numeric_limits<size_t>::max()),
330 _requestTimer(parent->_factory->makeTimer()),
331 _activeClients(0),
332 _generation(0),
333 _inFulfillRequests(false),
334 _inSpawnConnections(false),
335 _created(0),
336 _state(State::kRunning) {}
337
~SpecificPool()338 ConnectionPool::SpecificPool::~SpecificPool() {
339 DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
340 }
341
inUseConnections(const stdx::unique_lock<stdx::mutex> & lk)342 size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock<stdx::mutex>& lk) {
343 return _checkedOutPool.size();
344 }
345
availableConnections(const stdx::unique_lock<stdx::mutex> & lk)346 size_t ConnectionPool::SpecificPool::availableConnections(
347 const stdx::unique_lock<stdx::mutex>& lk) {
348 return _readyPool.size();
349 }
350
refreshingConnections(const stdx::unique_lock<stdx::mutex> & lk)351 size_t ConnectionPool::SpecificPool::refreshingConnections(
352 const stdx::unique_lock<stdx::mutex>& lk) {
353 return _processingPool.size();
354 }
355
createdConnections(const stdx::unique_lock<stdx::mutex> & lk)356 size_t ConnectionPool::SpecificPool::createdConnections(const stdx::unique_lock<stdx::mutex>& lk) {
357 return _created;
358 }
359
openConnections(const stdx::unique_lock<stdx::mutex> & lk)360 size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<stdx::mutex>& lk) {
361 return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
362 }
363
getConnection(const HostAndPort & hostAndPort,Milliseconds timeout,stdx::unique_lock<stdx::mutex> lk,GetConnectionCallback cb)364 void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
365 Milliseconds timeout,
366 stdx::unique_lock<stdx::mutex> lk,
367 GetConnectionCallback cb) {
368 if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
369 timeout = _parent->_options.refreshTimeout;
370 }
371
372 const auto expiration = _parent->_factory->now() + timeout;
373
374 _requests.push(make_pair(expiration, std::move(cb)));
375
376 updateStateInLock();
377
378 spawnConnections(lk);
379 fulfillRequests(lk);
380 }
381
returnConnection(ConnectionInterface * connPtr,stdx::unique_lock<stdx::mutex> lk)382 void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
383 stdx::unique_lock<stdx::mutex> lk) {
384 auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
385
386 auto conn = takeFromPool(_checkedOutPool, connPtr);
387
388 updateStateInLock();
389
390 // Users are required to call indicateSuccess() or indicateFailure() before allowing
391 // a connection to be returned. Otherwise, we have entered an unknown state.
392 invariant(conn->getStatus() != kConnectionStateUnknown);
393
394 if (conn->getGeneration() != _generation) {
395 // If the connection is from an older generation, just return.
396 return;
397 }
398
399 if (!conn->getStatus().isOK()) {
400 // TODO: alert via some callback if the host is bad
401 log() << "Ending connection to host " << _hostAndPort << " due to bad connection status; "
402 << openConnections(lk) << " connections to that host remain open";
403 return;
404 }
405
406 auto now = _parent->_factory->now();
407 if (needsRefreshTP <= now) {
408 // If we need to refresh this connection
409
410 if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
411 _parent->_options.minConnections) {
412 // If we already have minConnections, just let the connection lapse
413 log() << "Ending idle connection to host " << _hostAndPort
414 << " because the pool meets constraints; " << openConnections(lk)
415 << " connections to that host remain open";
416 return;
417 }
418
419 _processingPool[connPtr] = std::move(conn);
420
421 // Unlock in case refresh can occur immediately
422 lk.unlock();
423 connPtr->refresh(
424 _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
425 connPtr->indicateUsed();
426
427 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
428 auto conn = takeFromProcessingPool(connPtr);
429
430 // If the host and port were dropped, let this lapse
431 if (conn->getGeneration() != _generation) {
432 spawnConnections(lk);
433 return;
434 }
435
436 // If we're in shutdown, we don't need refreshed connections
437 if (_state == State::kInShutdown)
438 return;
439
440 // If the connection refreshed successfully, throw it back in
441 // the ready pool
442 if (status.isOK()) {
443 addToReady(lk, std::move(conn));
444 spawnConnections(lk);
445 return;
446 }
447
448 // If we've exceeded the time limit, start a new connect,
449 // rather than failing all operations. We do this because the
450 // various callers have their own time limit which is unrelated
451 // to our internal one.
452 if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
453 log() << "Pending connection to host " << _hostAndPort
454 << " did not complete within the connection timeout,"
455 << " retrying with a new connection;" << openConnections(lk)
456 << " connections to that host remain open";
457 spawnConnections(lk);
458 return;
459 }
460
461 // Otherwise pass the failure on through
462 processFailure(status, std::move(lk));
463 });
464 });
465 lk.lock();
466 } else {
467 // If it's fine as it is, just put it in the ready queue
468 addToReady(lk, std::move(conn));
469 }
470
471 updateStateInLock();
472 }
473
474 // Adds a live connection to the ready pool
addToReady(stdx::unique_lock<stdx::mutex> & lk,OwnedConnection conn)475 void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk,
476 OwnedConnection conn) {
477 auto connPtr = conn.get();
478
479 // This makes the connection the new most-recently-used connection.
480 _readyPool.add(connPtr, std::move(conn));
481
482 // Our strategy for refreshing connections is to check them out and
483 // immediately check them back in (which kicks off the refresh logic in
484 // returnConnection
485 connPtr->setTimeout(_parent->_options.refreshRequirement, [this, connPtr]() {
486 OwnedConnection conn;
487
488 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
489 if (!_readyPool.count(connPtr)) {
490 // We've already been checked out. We don't need to refresh
491 // ourselves.
492 return;
493 }
494
495 conn = takeFromPool(_readyPool, connPtr);
496
497 // If we're in shutdown, we don't need to refresh connections
498 if (_state == State::kInShutdown)
499 return;
500
501 _checkedOutPool[connPtr] = std::move(conn);
502
503 connPtr->indicateSuccess();
504
505 returnConnection(connPtr, std::move(lk));
506 });
507 });
508
509 fulfillRequests(lk);
510 }
511
512 // Drop connections and fail all requests
processFailure(const Status & status,stdx::unique_lock<stdx::mutex> lk)513 void ConnectionPool::SpecificPool::processFailure(const Status& status,
514 stdx::unique_lock<stdx::mutex> lk) {
515 // Bump the generation so we don't reuse any pending or checked out
516 // connections
517 _generation++;
518
519 // Drop ready connections
520 _readyPool.clear();
521
522 // Log something helpful
523 log() << "Dropping all pooled connections to " << _hostAndPort
524 << " due to failed operation on a connection";
525
526 // Migrate processing connections to the dropped pool
527 for (auto&& x : _processingPool) {
528 _droppedProcessingPool[x.first] = std::move(x.second);
529 }
530 _processingPool.clear();
531
532 // Move the requests out so they aren't visible
533 // in other threads
534 decltype(_requests) requestsToFail;
535 {
536 using std::swap;
537 swap(requestsToFail, _requests);
538 }
539
540 // Update state to reflect the lack of requests
541 updateStateInLock();
542
543 // Drop the lock and process all of the requests
544 // with the same failed status
545 lk.unlock();
546
547 while (requestsToFail.size()) {
548 requestsToFail.top().second(status);
549 requestsToFail.pop();
550 }
551 }
552
553 // fulfills as many outstanding requests as possible
fulfillRequests(stdx::unique_lock<stdx::mutex> & lk)554 void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex>& lk) {
555 // If some other thread (possibly this thread) is fulfilling requests,
556 // don't keep padding the callstack.
557 if (_inFulfillRequests)
558 return;
559
560 _inFulfillRequests = true;
561 auto guard = MakeGuard([&] { _inFulfillRequests = false; });
562
563 while (_requests.size()) {
564 // _readyPool is an LRUCache, so its begin() object is the MRU item.
565 auto iter = _readyPool.begin();
566
567 if (iter == _readyPool.end())
568 break;
569
570 // Grab the connection and cancel its timeout
571 auto conn = std::move(iter->second);
572 _readyPool.erase(iter);
573 conn->cancelTimeout();
574
575 if (!conn->isHealthy()) {
576 log() << "dropping unhealthy pooled connection to " << conn->getHostAndPort();
577
578 if (_readyPool.empty()) {
579 log() << "after drop, pool was empty, going to spawn some connections";
580 // Spawn some more connections to the bad host if we're all out.
581 spawnConnections(lk);
582 }
583
584 // Drop the bad connection.
585 conn.reset();
586 // Retry.
587 continue;
588 }
589
590 // Grab the request and callback
591 auto cb = std::move(_requests.top().second);
592 _requests.pop();
593
594 auto connPtr = conn.get();
595
596 // check out the connection
597 _checkedOutPool[connPtr] = std::move(conn);
598
599 updateStateInLock();
600
601 // pass it to the user
602 connPtr->resetToUnknown();
603 lk.unlock();
604 cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
605 lk.lock();
606 }
607 }
608
609 // spawn enough connections to satisfy open requests and minpool, while
610 // honoring maxpool
spawnConnections(stdx::unique_lock<stdx::mutex> & lk)611 void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mutex>& lk) {
612 // If some other thread (possibly this thread) is spawning connections,
613 // don't keep padding the callstack.
614 if (_inSpawnConnections)
615 return;
616
617 _inSpawnConnections = true;
618 auto guard = MakeGuard([&] { _inSpawnConnections = false; });
619
620 // We want minConnections <= outstanding requests <= maxConnections
621 auto target = [&] {
622 return std::max(
623 _parent->_options.minConnections,
624 std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections));
625 };
626
627 // While all of our inflight connections are less than our target
628 while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
629 (_processingPool.size() < _parent->_options.maxConnecting)) {
630 std::unique_ptr<ConnectionPool::ConnectionInterface> handle;
631 try {
632 // make a new connection and put it in processing
633 handle = _parent->_factory->makeConnection(_hostAndPort, _generation);
634 } catch (std::system_error& e) {
635 severe() << "Failed to construct a new connection object: " << e.what();
636 fassertFailed(40336);
637 }
638
639 auto connPtr = handle.get();
640 _processingPool[connPtr] = std::move(handle);
641
642 ++_created;
643
644 // Run the setup callback
645 lk.unlock();
646 connPtr->setup(
647 _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
648 connPtr->indicateUsed();
649
650 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
651 auto conn = takeFromProcessingPool(connPtr);
652
653 if (conn->getGeneration() != _generation) {
654 // If the host and port was dropped, let the
655 // connection lapse
656 spawnConnections(lk);
657 } else if (status.isOK()) {
658 addToReady(lk, std::move(conn));
659 spawnConnections(lk);
660 } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
661 // If we've exceeded the time limit, restart the connect, rather than
662 // failing all operations. We do this because the various callers
663 // have their own time limit which is unrelated to our internal one.
664 spawnConnections(lk);
665 } else {
666 // If the setup failed, cascade the failure edge
667 processFailure(status, std::move(lk));
668 }
669 });
670 });
671 // Note that this assumes that the refreshTimeout is sound for the
672 // setupTimeout
673
674 lk.lock();
675 }
676 }
677
678 // Called every second after hostTimeout until all processing connections reap
shutdown()679 void ConnectionPool::SpecificPool::shutdown() {
680 stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
681
682 // We're racing:
683 //
684 // Thread A (this thread)
685 // * Fired the shutdown timer
686 // * Came into shutdown() and blocked
687 //
688 // Thread B (some new consumer)
689 // * Requested a new connection
690 // * Beat thread A to the mutex
691 // * Cancelled timer (but thread A already made it in)
692 // * Set state to running
693 // * released the mutex
694 //
695 // So we end up in shutdown, but with kRunning. If we're here we raced and
696 // we should just bail.
697 if (_state == State::kRunning) {
698 return;
699 }
700
701 _state = State::kInShutdown;
702
703 // If we have processing connections, wait for them to finish or timeout
704 // before shutdown
705 if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) {
706 _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
707
708 return;
709 }
710
711 invariant(_requests.empty());
712 invariant(_checkedOutPool.empty());
713
714 _parent->_pools.erase(_hostAndPort);
715 }
716
717 template <typename OwnershipPoolType>
takeFromPool(OwnershipPoolType & pool,typename OwnershipPoolType::key_type connPtr)718 typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool(
719 OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) {
720 auto iter = pool.find(connPtr);
721 invariant(iter != pool.end());
722
723 auto conn = std::move(iter->second);
724 pool.erase(iter);
725 return conn;
726 }
727
takeFromProcessingPool(ConnectionInterface * connPtr)728 ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool(
729 ConnectionInterface* connPtr) {
730 if (_processingPool.count(connPtr))
731 return takeFromPool(_processingPool, connPtr);
732
733 return takeFromPool(_droppedProcessingPool, connPtr);
734 }
735
736
737 // Updates our state and manages the request timer
updateStateInLock()738 void ConnectionPool::SpecificPool::updateStateInLock() {
739 if (_requests.size()) {
740 // We have some outstanding requests, we're live
741
742 // If we were already running and the timer is the same as it was
743 // before, nothing to do
744 if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first)
745 return;
746
747 _state = State::kRunning;
748
749 _requestTimer->cancelTimeout();
750
751 _requestTimerExpiration = _requests.top().first;
752
753 auto timeout = _requests.top().first - _parent->_factory->now();
754
755 // We set a timer for the most recent request, then invoke each timed
756 // out request we couldn't service
757 _requestTimer->setTimeout(timeout, [this]() {
758 runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
759 auto now = _parent->_factory->now();
760
761 while (_requests.size()) {
762 auto& x = _requests.top();
763
764 if (x.first <= now) {
765 auto cb = std::move(x.second);
766 _requests.pop();
767
768 lk.unlock();
769 cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
770 "Couldn't get a connection within the time limit"));
771 lk.lock();
772 } else {
773 break;
774 }
775 }
776
777 updateStateInLock();
778 });
779 });
780 } else if (_checkedOutPool.size()) {
781 // If we have no requests, but someone's using a connection, we just
782 // hang around until the next request or a return
783
784 _requestTimer->cancelTimeout();
785 _state = State::kRunning;
786 _requestTimerExpiration = _requestTimerExpiration.max();
787 } else {
788 // If we don't have any live requests and no one has checked out connections
789
790 // If we used to be idle, just bail
791 if (_state == State::kIdle)
792 return;
793
794 _state = State::kIdle;
795
796 _requestTimer->cancelTimeout();
797
798 _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout;
799
800 auto timeout = _parent->_options.hostTimeout;
801
802 // Set the shutdown timer
803 _requestTimer->setTimeout(timeout, [this]() { shutdown(); });
804 }
805 }
806
807 } // namespace executor
808 } // namespace mongo
809