1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/s/client/shard_connection.h"
36
37 #include <set>
38
39 #include "mongo/db/commands.h"
40 #include "mongo/db/lasterror.h"
41 #include "mongo/executor/connection_pool_stats.h"
42 #include "mongo/s/chunk_manager.h"
43 #include "mongo/s/client/shard.h"
44 #include "mongo/s/client/shard_registry.h"
45 #include "mongo/s/client/version_manager.h"
46 #include "mongo/s/cluster_last_error_info.h"
47 #include "mongo/s/grid.h"
48 #include "mongo/s/is_mongos.h"
49 #include "mongo/util/concurrency/spin_lock.h"
50 #include "mongo/util/exit.h"
51 #include "mongo/util/log.h"
52 #include "mongo/util/stacktrace.h"
53
54 namespace mongo {
55
56 using std::unique_ptr;
57 using std::map;
58 using std::set;
59 using std::string;
60 using std::stringstream;
61 using std::vector;
62
63 namespace {
64
65 class ClientConnections;
66
67 /**
68 * Class which tracks ClientConnections (the client connection pool) for each incoming
69 * connection, allowing stats access.
70 */
71 class ActiveClientConnections {
72 public:
add(const ClientConnections * cc)73 void add(const ClientConnections* cc) {
74 stdx::lock_guard<stdx::mutex> lock(_mutex);
75 _clientConnections.insert(cc);
76 }
77
remove(const ClientConnections * cc)78 void remove(const ClientConnections* cc) {
79 stdx::lock_guard<stdx::mutex> lock(_mutex);
80 _clientConnections.erase(cc);
81 }
82
83 void appendInfo(BSONObjBuilder& b);
84
85 private:
86 stdx::mutex _mutex;
87 set<const ClientConnections*> _clientConnections;
88
89 } activeClientConnections;
90
91 /**
92 * Command to allow access to the sharded conn pool information in mongos.
93 */
94 class ShardedPoolStats : public BasicCommand {
95 public:
ShardedPoolStats()96 ShardedPoolStats() : BasicCommand("shardConnPoolStats") {}
help(stringstream & help) const97 virtual void help(stringstream& help) const {
98 help << "stats about the shard connection pool";
99 }
supportsWriteConcern(const BSONObj & cmd) const100 virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
101 return false;
102 }
slaveOk() const103 virtual bool slaveOk() const {
104 return true;
105 }
106
107 // Same privs as connPoolStats
addRequiredPrivileges(const std::string & dbname,const BSONObj & cmdObj,std::vector<Privilege> * out)108 virtual void addRequiredPrivileges(const std::string& dbname,
109 const BSONObj& cmdObj,
110 std::vector<Privilege>* out) {
111 ActionSet actions;
112 actions.addAction(ActionType::connPoolStats);
113 out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
114 }
115
run(OperationContext * opCtx,const string & dbname,const mongo::BSONObj & cmdObj,mongo::BSONObjBuilder & result)116 virtual bool run(OperationContext* opCtx,
117 const string& dbname,
118 const mongo::BSONObj& cmdObj,
119 mongo::BSONObjBuilder& result) {
120 // Connection information
121 executor::ConnectionPoolStats stats{};
122 shardConnectionPool.appendConnectionStats(&stats);
123 stats.appendToBSON(result);
124
125 // Thread connection information
126 activeClientConnections.appendInfo(result);
127
128 return true;
129 }
130
131 } shardedPoolStatsCmd;
132
133 /**
134 * holds all the actual db connections for a client to various servers 1 per thread, so
135 * doesn't have to be thread safe.
136 */
137 class ClientConnections {
138 MONGO_DISALLOW_COPYING(ClientConnections);
139
140 public:
141 struct Status {
Statusmongo::__anon7086fcda0111::ClientConnections::Status142 Status() : created(0), avail(0) {}
143
144 // May be read concurrently, but only written from
145 // this thread.
146 long long created;
147 DBClientBase* avail;
148 };
149
150 // Gets or creates the status object for the host
_getStatus(const string & addr)151 Status* _getStatus(const string& addr) {
152 scoped_spinlock lock(_lock);
153 Status*& temp = _hosts[addr];
154 if (!temp) {
155 temp = new Status();
156 }
157
158 return temp;
159 }
160
ClientConnections()161 ClientConnections() {
162 // Start tracking client connections
163 activeClientConnections.add(this);
164 }
165
~ClientConnections()166 ~ClientConnections() {
167 // Stop tracking these client connections
168 activeClientConnections.remove(this);
169
170 releaseAll(true);
171 }
172
releaseAll(bool fromDestructor=false)173 void releaseAll(bool fromDestructor = false) {
174 // Don't need spinlock protection because if not in the destructor, we don't modify
175 // _hosts, and if in the destructor we are not accessible to external threads.
176 for (HostMap::iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
177 const string addr = i->first;
178 Status* ss = i->second;
179 invariant(ss);
180
181 if (ss->avail) {
182 // If we're shutting down, don't want to initiate release mechanism as it is
183 // slow, and isn't needed since all connections will be closed anyway.
184 if (globalInShutdownDeprecated()) {
185 if (versionManager.isVersionableCB(ss->avail)) {
186 versionManager.resetShardVersionCB(ss->avail);
187 }
188
189 delete ss->avail;
190 } else {
191 release(addr, ss->avail);
192 }
193
194 ss->avail = 0;
195 }
196
197 if (fromDestructor) {
198 delete ss;
199 }
200 }
201
202 if (fromDestructor) {
203 _hosts.clear();
204 }
205 }
206
get(const string & addr,const string & ns)207 DBClientBase* get(const string& addr, const string& ns) {
208 {
209 // We want to report ns stats
210 scoped_spinlock lock(_lock);
211 if (ns.size() > 0)
212 _seenNS.insert(ns);
213 }
214
215 Status* s = _getStatus(addr);
216
217 unique_ptr<DBClientBase> c;
218 if (s->avail) {
219 c.reset(s->avail);
220 s->avail = 0;
221
222 // May throw an exception
223 shardConnectionPool.onHandedOut(c.get());
224 } else {
225 c.reset(shardConnectionPool.get(addr));
226
227 // After, so failed creation doesn't get counted
228 s->created++;
229 }
230
231 return c.release();
232 }
233
done(const string & addr,DBClientBase * conn)234 void done(const string& addr, DBClientBase* conn) {
235 Status* s = _hosts[addr];
236 verify(s);
237
238 const bool isConnGood = shardConnectionPool.isConnectionGood(addr, conn);
239
240 if (s->avail != NULL) {
241 warning() << "Detected additional sharded connection in the "
242 << "thread local pool for " << addr;
243
244 if (DBException::traceExceptions.load()) {
245 // There shouldn't be more than one connection checked out to the same
246 // host on the same thread.
247 printStackTrace();
248 }
249
250 if (!isConnGood) {
251 delete s->avail;
252 s->avail = NULL;
253 }
254
255 // Let the internal pool handle the bad connection, this can also
256 // update the lower bounds for the known good socket creation time
257 // for this host.
258 release(addr, conn);
259 return;
260 }
261
262 if (!isConnGood) {
263 // Let the internal pool handle the bad connection.
264 release(addr, conn);
265 return;
266 }
267
268 // Note: Although we try our best to clear bad connections as much as possible,
269 // some of them can still slip through because of how ClientConnections are being
270 // used - as thread local variables. This means that threads won't be able to
271 // see the s->avail connection of other threads.
272
273 s->avail = conn;
274 }
275
checkVersions(OperationContext * opCtx,const string & ns)276 void checkVersions(OperationContext* opCtx, const string& ns) {
277 vector<ShardId> all;
278 grid.shardRegistry()->getAllShardIds(&all);
279
280 // Don't report exceptions here as errors in GetLastError
281 LastError::Disabled ignoreForGLE(&LastError::get(cc()));
282
283 // Now only check top-level shard connections
284 for (const ShardId& shardId : all) {
285 try {
286 auto shardStatus = grid.shardRegistry()->getShard(opCtx, shardId);
287 if (!shardStatus.isOK()) {
288 invariant(shardStatus == ErrorCodes::ShardNotFound);
289 continue;
290 }
291 const auto shard = shardStatus.getValue();
292
293 string sconnString = shard->getConnString().toString();
294 Status* s = _getStatus(sconnString);
295
296 if (!s->avail) {
297 s->avail = shardConnectionPool.get(sconnString);
298 s->created++; // After, so failed creation doesn't get counted
299 }
300
301 versionManager.checkShardVersionCB(opCtx, s->avail, ns, false, 1);
302 } catch (const DBException& ex) {
303 warning() << "problem while initially checking shard versions on"
304 << " " << shardId << causedBy(ex);
305
306 // NOTE: This is only a heuristic, to avoid multiple stale version retries
307 // across multiple shards, and does not affect correctness.
308 }
309 }
310 }
311
release(const string & addr,DBClientBase * conn)312 void release(const string& addr, DBClientBase* conn) {
313 shardConnectionPool.release(addr, conn);
314 }
315
316 /**
317 * Appends info about the client connection pool to a BOBuilder
318 * Safe to call with activeClientConnections lock
319 */
appendInfo(BSONObjBuilder & b) const320 void appendInfo(BSONObjBuilder& b) const {
321 scoped_spinlock lock(_lock);
322
323 BSONArrayBuilder hostsArrB(b.subarrayStart("hosts"));
324 for (HostMap::const_iterator i = _hosts.begin(); i != _hosts.end(); ++i) {
325 BSONObjBuilder bb(hostsArrB.subobjStart());
326 bb.append("host", i->first);
327 bb.append("created", i->second->created);
328 bb.appendBool("avail", static_cast<bool>(i->second->avail));
329 bb.done();
330 }
331 hostsArrB.done();
332
333 BSONArrayBuilder nsArrB(b.subarrayStart("seenNS"));
334 for (set<string>::const_iterator i = _seenNS.begin(); i != _seenNS.end(); ++i) {
335 nsArrB.append(*i);
336 }
337 nsArrB.done();
338 }
339
340 // Protects only the creation of new entries in the _hosts and _seenNS map
341 // from external threads. Reading _hosts / _seenNS in this thread doesn't
342 // need protection.
343 mutable SpinLock _lock;
344 typedef map<string, Status*, DBConnectionPool::serverNameCompare> HostMap;
345 HostMap _hosts;
346 set<string> _seenNS;
347
348 /**
349 * Clears the connections kept by this pool (ie, not including the global pool)
350 */
clearPool()351 void clearPool() {
352 for (HostMap::iterator iter = _hosts.begin(); iter != _hosts.end(); ++iter) {
353 if (iter->second->avail != NULL) {
354 delete iter->second->avail;
355 }
356 delete iter->second;
357 }
358
359 _hosts.clear();
360 }
361
forgetNS(const string & ns)362 void forgetNS(const string& ns) {
363 scoped_spinlock lock(_lock);
364 _seenNS.erase(ns);
365 }
366
367 // -----
368
369 static thread_local std::unique_ptr<ClientConnections> _perThread;
370
threadInstance()371 static ClientConnections* threadInstance() {
372 if (!_perThread) {
373 _perThread = stdx::make_unique<ClientConnections>();
374 }
375 return _perThread.get();
376 }
377 };
378
379 thread_local std::unique_ptr<ClientConnections> ClientConnections::_perThread;
380
appendInfo(BSONObjBuilder & b)381 void ActiveClientConnections::appendInfo(BSONObjBuilder& b) {
382 BSONArrayBuilder arr(64 * 1024); // There may be quite a few threads
383
384 {
385 stdx::lock_guard<stdx::mutex> lock(_mutex);
386 for (set<const ClientConnections*>::const_iterator i = _clientConnections.begin();
387 i != _clientConnections.end();
388 ++i) {
389 BSONObjBuilder bb(arr.subobjStart());
390 (*i)->appendInfo(bb);
391 bb.done();
392 }
393 }
394
395 b.appendArray("threads", arr.obj());
396 }
397
398 } // namespace
399
400 // The global connection pool
401 DBConnectionPool shardConnectionPool;
402
ShardConnection(const ConnectionString & connectionString,const string & ns,std::shared_ptr<ChunkManager> manager)403 ShardConnection::ShardConnection(const ConnectionString& connectionString,
404 const string& ns,
405 std::shared_ptr<ChunkManager> manager)
406 : _cs(connectionString), _ns(ns), _manager(manager), _finishedInit(false) {
407 invariant(_cs.isValid());
408
409 // Make sure we specified a manager for the correct namespace
410 if (_ns.size() && _manager) {
411 invariant(_manager->getns() == _ns);
412 }
413
414 auto csString = _cs.toString();
415 _conn = ClientConnections::threadInstance()->get(csString, _ns);
416 if (isMongos()) {
417 // In mongos, we record this connection as having been used for useful work to provide
418 // useful information in getLastError.
419 ClusterLastErrorInfo::get(cc())->addShardHost(csString);
420 }
421 }
422
~ShardConnection()423 ShardConnection::~ShardConnection() {
424 if (_conn) {
425 if (_conn->isFailed()) {
426 if (_conn->getSockCreationMicroSec() == DBClientBase::INVALID_SOCK_CREATION_TIME) {
427 kill();
428 } else {
429 // The pool takes care of deleting the failed connection - this
430 // will also trigger disposal of older connections in the pool
431 done();
432 }
433 } else {
434 // see done() comments above for why we log this line
435 log() << "sharded connection to " << _conn->getServerAddress()
436 << " not being returned to the pool";
437
438 kill();
439 }
440 }
441 }
442
_finishInit()443 void ShardConnection::_finishInit() {
444 if (_finishedInit)
445 return;
446 _finishedInit = true;
447
448 if (versionManager.isVersionableCB(_conn)) {
449 auto& client = cc();
450 auto opCtx = client.getOperationContext();
451 invariant(opCtx);
452 _setVersion = versionManager.checkShardVersionCB(opCtx, this, false, 1);
453 } else {
454 // Make sure we didn't specify a manager for a non-versionable connection (i.e. config)
455 verify(!_manager);
456 _setVersion = false;
457 }
458 }
459
done()460 void ShardConnection::done() {
461 if (_conn) {
462 ClientConnections::threadInstance()->done(_cs.toString(), _conn);
463 _conn = 0;
464 _finishedInit = true;
465 }
466 }
467
kill()468 void ShardConnection::kill() {
469 if (_conn) {
470 if (versionManager.isVersionableCB(_conn)) {
471 versionManager.resetShardVersionCB(_conn);
472 }
473
474 if (_conn->isFailed()) {
475 // Let the pool know about the bad connection and also delegate disposal to it.
476 ClientConnections::threadInstance()->done(_cs.toString(), _conn);
477 } else {
478 delete _conn;
479 }
480
481 _conn = 0;
482 _finishedInit = true;
483 }
484 }
485
checkMyConnectionVersions(OperationContext * opCtx,const string & ns)486 void ShardConnection::checkMyConnectionVersions(OperationContext* opCtx, const string& ns) {
487 ClientConnections::threadInstance()->checkVersions(opCtx, ns);
488 }
489
releaseMyConnections()490 void ShardConnection::releaseMyConnections() {
491 ClientConnections::threadInstance()->releaseAll();
492 }
493
clearPool()494 void ShardConnection::clearPool() {
495 shardConnectionPool.clear();
496 ClientConnections::threadInstance()->clearPool();
497 }
498
forgetNS(const string & ns)499 void ShardConnection::forgetNS(const string& ns) {
500 ClientConnections::threadInstance()->forgetNS(ns);
501 }
502
503 } // namespace mongo
504