1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/s/client/shard_connection.h"
36 
37 #include <set>
38 
39 #include "mongo/db/commands.h"
40 #include "mongo/db/lasterror.h"
41 #include "mongo/executor/connection_pool_stats.h"
42 #include "mongo/s/chunk_manager.h"
43 #include "mongo/s/client/shard.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/client/version_manager.h"
46 #include "mongo/s/cluster_last_error_info.h"
47 #include "mongo/s/grid.h"
48 #include "mongo/s/is_mongos.h"
49 #include "mongo/util/concurrency/spin_lock.h"
50 #include "mongo/util/exit.h"
51 #include "mongo/util/log.h"
52 #include "mongo/util/stacktrace.h"
53 
54 namespace mongo {
55 
56 using std::unique_ptr;
57 using std::map;
58 using std::set;
59 using std::string;
60 using std::stringstream;
61 using std::vector;
62 
63 namespace {
64 
65 class ClientConnections;
66 
67 /**
68  * Class which tracks ClientConnections (the client connection pool) for each incoming
69  * connection, allowing stats access.
70  */
71 class ActiveClientConnections {
72 public:
add(const ClientConnections * cc)73     void add(const ClientConnections* cc) {
74         stdx::lock_guard<stdx::mutex> lock(_mutex);
75         _clientConnections.insert(cc);
76     }
77 
remove(const ClientConnections * cc)78     void remove(const ClientConnections* cc) {
79         stdx::lock_guard<stdx::mutex> lock(_mutex);
80         _clientConnections.erase(cc);
81     }
82 
83     void appendInfo(BSONObjBuilder& b);
84 
85 private:
86     stdx::mutex _mutex;
87     set<const ClientConnections*> _clientConnections;
88 
89 } activeClientConnections;
90 
91 /**
92  * Command to allow access to the sharded conn pool information in mongos.
93  */
94 class ShardedPoolStats : public BasicCommand {
95 public:
ShardedPoolStats()96     ShardedPoolStats() : BasicCommand("shardConnPoolStats") {}
help(stringstream & help) const97     virtual void help(stringstream& help) const {
98         help << "stats about the shard connection pool";
99     }
supportsWriteConcern(const BSONObj & cmd) const100     virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
101         return false;
102     }
slaveOk() const103     virtual bool slaveOk() const {
104         return true;
105     }
106 
107     // Same privs as connPoolStats
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)108     virtual void addRequiredPrivileges(const std::string& dbname,
109                                        const BSONObj& cmdObj,
110                                        std::vector<Privilege>* out) {
111         ActionSet actions;
112         actions.addAction(ActionType::connPoolStats);
113         out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
114     }
115 
run(OperationContext * opCtx,const string & dbname,const mongo::BSONObj & cmdObj,mongo::BSONObjBuilder & result)116     virtual bool run(OperationContext* opCtx,
117                      const string& dbname,
118                      const mongo::BSONObj& cmdObj,
119                      mongo::BSONObjBuilder& result) {
120         // Connection information
121         executor::ConnectionPoolStats stats{};
122         shardConnectionPool.appendConnectionStats(&stats);
123         stats.appendToBSON(result);
124 
125         // Thread connection information
126         activeClientConnections.appendInfo(result);
127 
128         return true;
129     }
130 
131 } shardedPoolStatsCmd;
132 
133 /**
134  * holds all the actual db connections for a client to various servers 1 per thread, so
135  * doesn't have to be thread safe.
136  */
137 class ClientConnections {
138     MONGO_DISALLOW_COPYING(ClientConnections);
139 
140 public:
141     struct Status {
Statusmongo::__anon7086fcda0111::ClientConnections::Status142         Status() : created(0), avail(0) {}
143 
144         // May be read concurrently, but only written from
145         // this thread.
146         long long created;
147         DBClientBase* avail;
148     };
149 
150     // Gets or creates the status object for the host
_getStatus(const string & addr)151     Status* _getStatus(const string& addr) {
152         scoped_spinlock lock(_lock);
153         Status*& temp = _hosts[addr];
154         if (!temp) {
155             temp = new Status();
156         }
157 
158         return temp;
159     }
160 
ClientConnections()161     ClientConnections() {
162         // Start tracking client connections
163         activeClientConnections.add(this);
164     }
165 
~ClientConnections()166     ~ClientConnections() {
167         // Stop tracking these client connections
168         activeClientConnections.remove(this);
169 
170         releaseAll(true);
171     }
172 
releaseAll(bool fromDestructor=false)173     void releaseAll(bool fromDestructor = false) {
174         // Don't need spinlock protection because if not in the destructor, we don't modify
175         // _hosts, and if in the destructor we are not accessible to external threads.
176         for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
177             const string addr = i->first;
178             Status* ss = i->second;
179             invariant(ss);
180 
181             if (ss->avail) {
182                 // If we're shutting down, don't want to initiate release mechanism as it is
183                 // slow, and isn't needed since all connections will be closed anyway.
184                 if (globalInShutdownDeprecated()) {
185                     if (versionManager.isVersionableCB(ss->avail)) {
186                         versionManager.resetShardVersionCB(ss->avail);
187                     }
188 
189                     delete ss->avail;
190                 } else {
191                     release(addr, ss->avail);
192                 }
193 
194                 ss->avail = 0;
195             }
196 
197             if (fromDestructor) {
198                 delete ss;
199             }
200         }
201 
202         if (fromDestructor) {
203             _hosts.clear();
204         }
205     }
206 
get(const string & addr,const string & ns)207     DBClientBase* get(const string& addr, const string& ns) {
208         {
209             // We want to report ns stats
210             scoped_spinlock lock(_lock);
211             if (ns.size() > 0)
212                 _seenNS.insert(ns);
213         }
214 
215         Status* s = _getStatus(addr);
216 
217         unique_ptr<DBClientBase> c;
218         if (s->avail) {
219             c.reset(s->avail);
220             s->avail = 0;
221 
222             // May throw an exception
223             shardConnectionPool.onHandedOut(c.get());
224         } else {
225             c.reset(shardConnectionPool.get(addr));
226 
227             // After, so failed creation doesn't get counted
228             s->created++;
229         }
230 
231         return c.release();
232     }
233 
done(const string & addr,DBClientBase * conn)234     void done(const string& addr, DBClientBase* conn) {
235         Status* s = _hosts[addr];
236         verify(s);
237 
238         const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn);
239 
240         if (s->avail != NULL) {
241             warning() << "Detected additional sharded connection in the "
242                       << "thread local pool for " << addr;
243 
244             if (DBException::traceExceptions.load()) {
245                 // There shouldn't be more than one connection checked out to the same
246                 // host on the same thread.
247                 printStackTrace();
248             }
249 
250             if (!isConnGood) {
251                 delete s->avail;
252                 s->avail = NULL;
253             }
254 
255             // Let the internal pool handle the bad connection, this can also
256             // update the lower bounds for the known good socket creation time
257             // for this host.
258             release(addr, conn);
259             return;
260         }
261 
262         if (!isConnGood) {
263             // Let the internal pool handle the bad connection.
264             release(addr, conn);
265             return;
266         }
267 
268         // Note: Although we try our best to clear bad connections as much as possible,
269         // some of them can still slip through because of how ClientConnections are being
270         // used - as thread local variables. This means that threads won't be able to
271         // see the s->avail connection of other threads.
272 
273         s->avail = conn;
274     }
275 
checkVersions(OperationContext * opCtx,const string & ns)276     void checkVersions(OperationContext* opCtx, const string& ns) {
277         vector<ShardId> all;
278         grid.shardRegistry()->getAllShardIds(&all);
279 
280         // Don't report exceptions here as errors in GetLastError
281         LastError::Disabled ignoreForGLE(&LastError::get(cc()));
282 
283         // Now only check top-level shard connections
284         for (const ShardId& shardId : all) {
285             try {
286                 auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId);
287                 if (!shardStatus.isOK()) {
288                     invariant(shardStatus == ErrorCodes::ShardNotFound);
289                     continue;
290                 }
291                 const auto shard = shardStatus.getValue();
292 
293                 string sconnString = shard->getConnString().toString();
294                 Status* s = _getStatus(sconnString);
295 
296                 if (!s->avail) {
297                     s->avail = shardConnectionPool.get(sconnString);
298                     s->created++;  // After, so failed creation doesn't get counted
299                 }
300 
301                 versionManager.checkShardVersionCB(opCtx, s->avail, ns, false, 1);
302             } catch (const DBException& ex) {
303                 warning() << "problem while initially checking shard versions on"
304                           << " " << shardId << causedBy(ex);
305 
306                 // NOTE: This is only a heuristic, to avoid multiple stale version retries
307                 // across multiple shards, and does not affect correctness.
308             }
309         }
310     }
311 
release(const string & addr,DBClientBase * conn)312     void release(const string& addr, DBClientBase* conn) {
313         shardConnectionPool.release(addr, conn);
314     }
315 
316     /**
317      * Appends info about the client connection pool to a BOBuilder
318      * Safe to call with activeClientConnections lock
319      */
appendInfo(BSONObjBuilder & b) const320     void appendInfo(BSONObjBuilder& b) const {
321         scoped_spinlock lock(_lock);
322 
323         BSONArrayBuilder hostsArrB(b.subarrayStart("hosts"));
324         for (HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
325             BSONObjBuilder bb(hostsArrB.subobjStart());
326             bb.append("host", i->first);
327             bb.append("created", i->second->created);
328             bb.appendBool("avail", static_cast<bool>(i->second->avail));
329             bb.done();
330         }
331         hostsArrB.done();
332 
333         BSONArrayBuilder nsArrB(b.subarrayStart("seenNS"));
334         for (set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) {
335             nsArrB.append(*i);
336         }
337         nsArrB.done();
338     }
339 
340     // Protects only the creation of new entries in the _hosts and _seenNS map
341     // from external threads.  Reading _hosts / _seenNS in this thread doesn't
342     // need protection.
343     mutable SpinLock _lock;
344     typedef map<string, Status*, DBConnectionPool::serverNameCompare> HostMap;
345     HostMap _hosts;
346     set<string> _seenNS;
347 
348     /**
349      * Clears the connections kept by this pool (ie, not including the global pool)
350      */
clearPool()351     void clearPool() {
352         for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
353             if (iter->second->avail != NULL) {
354                 delete iter->second->avail;
355             }
356             delete iter->second;
357         }
358 
359         _hosts.clear();
360     }
361 
forgetNS(const string & ns)362     void forgetNS(const string& ns) {
363         scoped_spinlock lock(_lock);
364         _seenNS.erase(ns);
365     }
366 
367     // -----
368 
369     static thread_local std::unique_ptr<ClientConnections> _perThread;
370 
threadInstance()371     static ClientConnections* threadInstance() {
372         if (!_perThread) {
373             _perThread = stdx::make_unique<ClientConnections>();
374         }
375         return _perThread.get();
376     }
377 };
378 
379 thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread;
380 
appendInfo(BSONObjBuilder & b)381 void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
382     BSONArrayBuilder arr(64 * 1024);  // There may be quite a few threads
383 
384     {
385         stdx::lock_guard<stdx::mutex> lock(_mutex);
386         for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin();
387              i != _clientConnections.end();
388              ++i) {
389             BSONObjBuilder bb(arr.subobjStart());
390             (*i)->appendInfo(bb);
391             bb.done();
392         }
393     }
394 
395     b.appendArray("threads", arr.obj());
396 }
397 
398 }  // namespace
399 
400 // The global connection pool
401 DBConnectionPool shardConnectionPool;
402 
ShardConnection(const ConnectionString & connectionString,const string & ns,std::shared_ptr<ChunkManager> manager)403 ShardConnection::ShardConnection(const ConnectionString& connectionString,
404                                  const string& ns,
405                                  std::shared_ptr<ChunkManager> manager)
406     : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) {
407     invariant(_cs.isValid());
408 
409     // Make sure we specified a manager for the correct namespace
410     if (_ns.size() && _manager) {
411         invariant(_manager->getns() == _ns);
412     }
413 
414     auto csString = _cs.toString();
415     _conn = ClientConnections::threadInstance()->get(csString, _ns);
416     if (isMongos()) {
417         // In mongos, we record this connection as having been used for useful work to provide
418         // useful information in getLastError.
419         ClusterLastErrorInfo::get(cc())->addShardHost(csString);
420     }
421 }
422 
~ShardConnection()423 ShardConnection::~ShardConnection() {
424     if (_conn) {
425         if (_conn->isFailed()) {
426             if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) {
427                 kill();
428             } else {
429                 // The pool takes care of deleting the failed connection - this
430                 // will also trigger disposal of older connections in the pool
431                 done();
432             }
433         } else {
434             // see done() comments above for why we log this line
435             log() << "sharded connection to " << _conn->getServerAddress()
436                   << " not being returned to the pool";
437 
438             kill();
439         }
440     }
441 }
442 
_finishInit()443 void ShardConnection::_finishInit() {
444     if (_finishedInit)
445         return;
446     _finishedInit = true;
447 
448     if (versionManager.isVersionableCB(_conn)) {
449         auto& client = cc();
450         auto opCtx = client.getOperationContext();
451         invariant(opCtx);
452         _setVersion = versionManager.checkShardVersionCB(opCtx, this, false, 1);
453     } else {
454         // Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
455         verify(!_manager);
456         _setVersion = false;
457     }
458 }
459 
done()460 void ShardConnection::done() {
461     if (_conn) {
462         ClientConnections::threadInstance()->done(_cs.toString(), _conn);
463         _conn = 0;
464         _finishedInit = true;
465     }
466 }
467 
kill()468 void ShardConnection::kill() {
469     if (_conn) {
470         if (versionManager.isVersionableCB(_conn)) {
471             versionManager.resetShardVersionCB(_conn);
472         }
473 
474         if (_conn->isFailed()) {
475             // Let the pool know about the bad connection and also delegate disposal to it.
476             ClientConnections::threadInstance()->done(_cs.toString(), _conn);
477         } else {
478             delete _conn;
479         }
480 
481         _conn = 0;
482         _finishedInit = true;
483     }
484 }
485 
checkMyConnectionVersions(OperationContext * opCtx,const string & ns)486 void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const string& ns) {
487     ClientConnections::threadInstance()->checkVersions(opCtx, ns);
488 }
489 
releaseMyConnections()490 void ShardConnection::releaseMyConnections() {
491     ClientConnections::threadInstance()->releaseAll();
492 }
493 
clearPool()494 void ShardConnection::clearPool() {
495     shardConnectionPool.clear();
496     ClientConnections::threadInstance()->clearPool();
497 }
498 
forgetNS(const string & ns)499 void ShardConnection::forgetNS(const string& ns) {
500     ClientConnections::threadInstance()->forgetNS(ns);
501 }
502 
503 }  // namespace mongo
504