1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32
33 #include "mongo/platform/basic.h"
34
35 #include "mongo/db/cursor_manager.h"
36
37 #include "mongo/base/data_cursor.h"
38 #include "mongo/base/init.h"
39 #include "mongo/db/audit.h"
40 #include "mongo/db/auth/authorization_session.h"
41 #include "mongo/db/background.h"
42 #include "mongo/db/catalog/collection.h"
43 #include "mongo/db/catalog/database.h"
44 #include "mongo/db/catalog/database_holder.h"
45 #include "mongo/db/client.h"
46 #include "mongo/db/cursor_server_params.h"
47 #include "mongo/db/db_raii.h"
48 #include "mongo/db/kill_sessions_common.h"
49 #include "mongo/db/logical_session_cache.h"
50 #include "mongo/db/namespace_string.h"
51 #include "mongo/db/operation_context.h"
52 #include "mongo/db/query/plan_executor.h"
53 #include "mongo/db/server_parameters.h"
54 #include "mongo/db/service_context.h"
55 #include "mongo/platform/random.h"
56 #include "mongo/stdx/memory.h"
57 #include "mongo/util/exit.h"
58 #include "mongo/util/log.h"
59 #include "mongo/util/startup_test.h"
60
61 namespace mongo {
62 using std::vector;
63
64 constexpr int CursorManager::kNumPartitions;
65
66 namespace {
idFromCursorId(CursorId id)67 uint32_t idFromCursorId(CursorId id) {
68 uint64_t x = static_cast<uint64_t>(id);
69 x = x >> 32;
70 return static_cast<uint32_t>(x);
71 }
72
cursorIdFromParts(uint32_t collectionIdentifier,uint32_t cursor)73 CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
74 // The leading two bits of a non-global CursorId should be 0.
75 invariant((collectionIdentifier & (0b11 << 30)) == 0);
76 CursorId x = static_cast<CursorId>(collectionIdentifier) << 32;
77 x |= cursor;
78 return x;
79 }
80
81 class GlobalCursorIdCache {
82 public:
83 GlobalCursorIdCache();
84 ~GlobalCursorIdCache();
85
86 /**
87 * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
88 * new CursorManager.
89 */
90 uint32_t registerCursorManager(const NamespaceString& nss);
91
92 /**
93 * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
94 * registerCursorManager().
95 */
96 void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
97
98 /**
99 * works globally
100 */
101 bool eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth);
102
103 void appendStats(BSONObjBuilder& builder);
104
105 std::size_t timeoutCursors(OperationContext* opCtx, Date_t now);
106
107 template <typename Visitor>
108 void visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor);
109
110 int64_t nextSeed();
111
112 private:
113 SimpleMutex _mutex;
114
115 typedef unordered_map<unsigned, NamespaceString> Map;
116 Map _idToNss;
117 unsigned _nextId;
118
119 std::unique_ptr<SecureRandom> _secureRandom;
120 };
121
122 // Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter
123 // calls into the former during destruction.
124 std::unique_ptr<GlobalCursorIdCache> globalCursorIdCache;
125 std::unique_ptr<CursorManager> globalCursorManager;
126
MONGO_INITIALIZER(GlobalCursorIdCache)127 MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
128 globalCursorIdCache.reset(new GlobalCursorIdCache());
129 return Status::OK();
130 }
131
132 MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
133 (InitializerContext* context) {
134 globalCursorManager.reset(new CursorManager({}));
135 return Status::OK();
136 }
137
GlobalCursorIdCache()138 GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {}
139
~GlobalCursorIdCache()140 GlobalCursorIdCache::~GlobalCursorIdCache() {}
141
nextSeed()142 int64_t GlobalCursorIdCache::nextSeed() {
143 stdx::lock_guard<SimpleMutex> lk(_mutex);
144 if (!_secureRandom)
145 _secureRandom = SecureRandom::create();
146 return _secureRandom->nextInt64();
147 }
148
registerCursorManager(const NamespaceString & nss)149 uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
150 static const uint32_t kMaxIds = 1000 * 1000 * 1000;
151 static_assert((kMaxIds & (0b11 << 30)) == 0,
152 "the first two bits of a collection identifier must always be zeroes");
153
154 stdx::lock_guard<SimpleMutex> lk(_mutex);
155
156 fassert(17359, _idToNss.size() < kMaxIds);
157
158 for (uint32_t i = 0; i <= kMaxIds; i++) {
159 uint32_t id = ++_nextId;
160 if (id == 0)
161 continue;
162 if (_idToNss.count(id) > 0)
163 continue;
164 _idToNss[id] = nss;
165 return id;
166 }
167
168 MONGO_UNREACHABLE;
169 }
170
deregisterCursorManager(uint32_t id,const NamespaceString & nss)171 void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
172 stdx::lock_guard<SimpleMutex> lk(_mutex);
173 invariant(nss == _idToNss[id]);
174 _idToNss.erase(id);
175 }
176
eraseCursor(OperationContext * opCtx,CursorId id,bool checkAuth)177 bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
178 // Figure out what the namespace of this cursor is.
179 NamespaceString nss;
180 if (CursorManager::isGloballyManagedCursor(id)) {
181 auto pin = globalCursorManager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
182 if (!pin.isOK()) {
183 invariant(pin == ErrorCodes::CursorNotFound || pin == ErrorCodes::Unauthorized);
184 // No such cursor. TODO: Consider writing to audit log here (even though we don't
185 // have a namespace).
186 return false;
187 }
188 nss = pin.getValue().getCursor()->nss();
189 } else {
190 stdx::lock_guard<SimpleMutex> lk(_mutex);
191 uint32_t nsid = idFromCursorId(id);
192 Map::const_iterator it = _idToNss.find(nsid);
193 if (it == _idToNss.end()) {
194 // No namespace corresponding to this cursor id prefix. TODO: Consider writing to
195 // audit log here (even though we don't have a namespace).
196 return false;
197 }
198 nss = it->second;
199 }
200 invariant(nss.isValid());
201
202 // Check if we are authorized to erase this cursor.
203 if (checkAuth) {
204 auto status = CursorManager::withCursorManager(
205 opCtx, id, nss, [nss, id, opCtx](CursorManager* manager) {
206 auto ccPin = manager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
207 if (!ccPin.isOK()) {
208 return ccPin.getStatus();
209 }
210 AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient());
211 auto cursorOwner = ccPin.getValue().getCursor()->getAuthenticatedUsers();
212 return as->checkAuthForKillCursors(nss, cursorOwner);
213 });
214 if (!status.isOK()) {
215 audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, status.code());
216 return false;
217 }
218 }
219
220 // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us.
221 if (CursorManager::isGloballyManagedCursor(id)) {
222 Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth);
223 massert(28697,
224 eraseStatus.reason(),
225 eraseStatus.code() == ErrorCodes::OK ||
226 eraseStatus.code() == ErrorCodes::CursorNotFound);
227 return eraseStatus.isOK();
228 }
229
230 // If not, then the cursor must be owned by a collection. Erase the cursor under the
231 // collection lock (to prevent the collection from going away during the erase).
232 AutoGetCollectionForReadCommand ctx(opCtx, nss);
233 Collection* collection = ctx.getCollection();
234 if (!collection) {
235 if (checkAuth)
236 audit::logKillCursorsAuthzCheck(
237 opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound);
238 return false;
239 }
240
241 Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth);
242 uassert(16089,
243 eraseStatus.reason(),
244 eraseStatus.code() == ErrorCodes::OK ||
245 eraseStatus.code() == ErrorCodes::CursorNotFound);
246 return eraseStatus.isOK();
247 }
248
timeoutCursors(OperationContext * opCtx,Date_t now)249 std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) {
250 size_t totalTimedOut = 0;
251
252 // Time out the cursors from the global cursor manager.
253 totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now);
254
255 // Compute the set of collection names that we have to time out cursors for.
256 vector<NamespaceString> todo;
257 {
258 stdx::lock_guard<SimpleMutex> lk(_mutex);
259 for (auto&& entry : _idToNss) {
260 todo.push_back(entry.second);
261 }
262 }
263
264 // For each collection, time out its cursors under the collection lock (to prevent the
265 // collection from going away during the erase).
266 for (unsigned i = 0; i < todo.size(); i++) {
267 AutoGetCollectionOrViewForReadCommand ctx(opCtx, NamespaceString(todo[i]));
268 if (!ctx.getDb()) {
269 continue;
270 }
271
272 Collection* collection = ctx.getCollection();
273 if (collection == NULL) {
274 continue;
275 }
276
277 totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now);
278 }
279
280 return totalTimedOut;
281 }
282 } // namespace
283
284 template <typename Visitor>
visitAllCursorManagers(OperationContext * opCtx,Visitor * visitor)285 void GlobalCursorIdCache::visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor) {
286 (*visitor)(*globalCursorManager);
287
288 // Compute the set of collection names that we have to get sessions for
289 vector<NamespaceString> namespaces;
290 {
291 stdx::lock_guard<SimpleMutex> lk(_mutex);
292 for (auto&& entry : _idToNss) {
293 namespaces.push_back(entry.second);
294 }
295 }
296
297 // For each collection, get its sessions under the collection lock (to prevent the
298 // collection from going away during the erase).
299 for (auto&& ns : namespaces) {
300 AutoGetCollectionOrView ctx(opCtx, NamespaceString(ns), MODE_IS);
301 if (!ctx.getDb()) {
302 continue;
303 }
304
305 Collection* collection = ctx.getCollection();
306 if (!collection) {
307 continue;
308 }
309
310 (*visitor)(*(collection->getCursorManager()));
311 }
312 }
313
314 // ---
315
getGlobalCursorManager()316 CursorManager* CursorManager::getGlobalCursorManager() {
317 return globalCursorManager.get();
318 }
319
appendAllActiveSessions(OperationContext * opCtx,LogicalSessionIdSet * lsids)320 void CursorManager::appendAllActiveSessions(OperationContext* opCtx, LogicalSessionIdSet* lsids) {
321 auto visitor = [&](CursorManager& mgr) { mgr.appendActiveSessions(lsids); };
322 globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
323 }
324
getAllCursors(OperationContext * opCtx)325 std::vector<GenericCursor> CursorManager::getAllCursors(OperationContext* opCtx) {
326 std::vector<GenericCursor> cursors;
327 auto visitor = [&](CursorManager& mgr) { mgr.appendActiveCursors(&cursors); };
328 globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
329
330 return cursors;
331 }
332
killCursorsWithMatchingSessions(OperationContext * opCtx,const SessionKiller::Matcher & matcher)333 std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions(
334 OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
335 auto eraser = [&](CursorManager& mgr, CursorId id) {
336 uassertStatusOK(mgr.eraseCursor(opCtx, id, true));
337 log() << "killing cursor: " << id << " as part of killing session(s)";
338 };
339
340 auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
341 globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
342
343 return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
344 }
345
timeoutCursorsGlobal(OperationContext * opCtx,Date_t now)346 std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
347 return globalCursorIdCache->timeoutCursors(opCtx, now);
348 }
349
eraseCursorGlobalIfAuthorized(OperationContext * opCtx,int n,const char * _ids)350 int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
351 ConstDataCursor ids(_ids);
352 int numDeleted = 0;
353 for (int i = 0; i < n; i++) {
354 if (eraseCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance<LittleEndian<int64_t>>()))
355 numDeleted++;
356 if (globalInShutdownDeprecated())
357 break;
358 }
359 return numDeleted;
360 }
eraseCursorGlobalIfAuthorized(OperationContext * opCtx,CursorId id)361 bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) {
362 return globalCursorIdCache->eraseCursor(opCtx, id, true);
363 }
eraseCursorGlobal(OperationContext * opCtx,CursorId id)364 bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
365 return globalCursorIdCache->eraseCursor(opCtx, id, false);
366 }
367
withCursorManager(OperationContext * opCtx,CursorId id,const NamespaceString & nss,stdx::function<Status (CursorManager *)> callback)368 Status CursorManager::withCursorManager(OperationContext* opCtx,
369 CursorId id,
370 const NamespaceString& nss,
371 stdx::function<Status(CursorManager*)> callback) {
372 boost::optional<AutoGetCollectionForReadCommand> readLock;
373 CursorManager* cursorManager = nullptr;
374
375 if (CursorManager::isGloballyManagedCursor(id)) {
376 cursorManager = CursorManager::getGlobalCursorManager();
377 } else {
378 readLock.emplace(opCtx, nss);
379 Collection* collection = readLock->getCollection();
380 if (!collection) {
381 return {ErrorCodes::CursorNotFound,
382 str::stream() << "collection does not exist: " << nss.ns()};
383 }
384 cursorManager = collection->getCursorManager();
385 }
386 invariant(cursorManager);
387
388 return callback(cursorManager);
389 }
390
391 // --------------------------
392
operator ()(const PlanExecutor * exec,const std::size_t nPartitions)393 std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec,
394 const std::size_t nPartitions) {
395 auto token = exec->getRegistrationToken();
396 invariant(token);
397 return (*token) % nPartitions;
398 }
399
CursorManager(NamespaceString nss)400 CursorManager::CursorManager(NamespaceString nss)
401 : _nss(std::move(nss)),
402 _collectionCacheRuntimeId(_nss.isEmpty() ? 0
403 : globalCursorIdCache->registerCursorManager(_nss)),
404 _random(stdx::make_unique<PseudoRandom>(globalCursorIdCache->nextSeed())),
405 _registeredPlanExecutors(),
406 _cursorMap(stdx::make_unique<Partitioned<unordered_map<CursorId, ClientCursor*>>>()) {}
407
~CursorManager()408 CursorManager::~CursorManager() {
409 // All cursors and PlanExecutors should have been deleted already.
410 invariant(_registeredPlanExecutors.empty());
411 invariant(_cursorMap->empty());
412
413 if (!isGlobalManager()) {
414 globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
415 }
416 }
417
invalidateAll(OperationContext * opCtx,bool collectionGoingAway,const std::string & reason)418 void CursorManager::invalidateAll(OperationContext* opCtx,
419 bool collectionGoingAway,
420 const std::string& reason) {
421 invariant(!isGlobalManager()); // The global cursor manager should never need to kill cursors.
422 dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
423 fassert(28819, !BackgroundOperation::inProgForNs(_nss));
424 auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
425 for (auto&& partition : allExecPartitions) {
426 for (auto&& exec : partition) {
427 // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be
428 // cleaned up later.
429 exec->markAsKilled(reason);
430 }
431 }
432 allExecPartitions.clear();
433
434 // Mark all cursors as killed, but keep around those we can in order to provide a useful error
435 // message to the user when they attempt to use it next time.
436 std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDisposeWithoutMutex;
437 {
438 auto allCurrentPartitions = _cursorMap->lockAllPartitions();
439 for (auto&& partition : allCurrentPartitions) {
440 for (auto it = partition.begin(); it != partition.end();) {
441 auto* cursor = it->second;
442 cursor->markAsKilled(reason);
443
444 // If pinned, there is an active user of this cursor, who is now responsible for
445 // cleaning it up. Otherwise, we can immediately dispose of it.
446 if (cursor->_isPinned) {
447 it = partition.erase(it);
448 continue;
449 }
450
451 if (!collectionGoingAway) {
452 // We keep around unpinned cursors so that future attempts to use the cursor
453 // will result in a useful error message.
454 ++it;
455 } else {
456 toDisposeWithoutMutex.emplace_back(cursor);
457 it = partition.erase(it);
458 }
459 }
460 }
461 }
462
463 // Dispose of the cursors we can now delete. This might involve lock acquisitions for safe
464 // cleanup, so avoid doing it while holding mutexes.
465 for (auto&& cursor : toDisposeWithoutMutex) {
466 cursor->dispose(opCtx);
467 }
468 }
469
invalidateDocument(OperationContext * opCtx,const RecordId & dl,InvalidationType type)470 void CursorManager::invalidateDocument(OperationContext* opCtx,
471 const RecordId& dl,
472 InvalidationType type) {
473 dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
474 invariant(!isGlobalManager()); // The global cursor manager should never receive invalidations.
475 if (supportsDocLocking()) {
476 // If a storage engine supports doc locking, then we do not need to invalidate.
477 // The transactional boundaries of the operation protect us.
478 return;
479 }
480
481 auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
482 for (auto&& partition : allExecPartitions) {
483 for (auto&& exec : partition) {
484 exec->invalidate(opCtx, dl, type);
485 }
486 }
487
488 auto allPartitions = _cursorMap->lockAllPartitions();
489 for (auto&& partition : allPartitions) {
490 for (auto&& entry : partition) {
491 auto exec = entry.second->getExecutor();
492 exec->invalidate(opCtx, dl, type);
493 }
494 }
495 }
496
cursorShouldTimeout_inlock(const ClientCursor * cursor,Date_t now)497 bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) {
498 if (cursor->isNoTimeout() || cursor->_isPinned) {
499 return false;
500 }
501 return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis());
502 }
503
timeoutCursors(OperationContext * opCtx,Date_t now)504 std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
505 std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDisposeWithoutMutex;
506
507 for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) {
508 auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId);
509 for (auto it = lockedPartition->begin(); it != lockedPartition->end();) {
510 auto* cursor = it->second;
511 if (cursorShouldTimeout_inlock(cursor, now)) {
512 toDisposeWithoutMutex.emplace_back(cursor);
513 it = lockedPartition->erase(it);
514 } else {
515 ++it;
516 }
517 }
518 }
519
520 // Be careful not to dispose of cursors while holding the partition lock.
521 for (auto&& cursor : toDisposeWithoutMutex) {
522 log() << "Cursor id " << cursor->cursorid() << " timed out, idle since "
523 << cursor->getLastUseDate();
524 cursor->dispose(opCtx);
525 }
526 return toDisposeWithoutMutex.size();
527 }
528
529 namespace {
530 static AtomicUInt32 registeredPlanExecutorId;
531 } // namespace
532
registerExecutor(PlanExecutor * exec)533 Partitioned<unordered_set<PlanExecutor*>>::PartitionId CursorManager::registerExecutor(
534 PlanExecutor* exec) {
535 auto partitionId = registeredPlanExecutorId.fetchAndAdd(1);
536 exec->setRegistrationToken(partitionId);
537 _registeredPlanExecutors.insert(exec);
538 return partitionId;
539 }
540
deregisterExecutor(PlanExecutor * exec)541 void CursorManager::deregisterExecutor(PlanExecutor* exec) {
542 if (auto partitionId = exec->getRegistrationToken()) {
543 _registeredPlanExecutors.erase(exec);
544 }
545 }
546
pinCursor(OperationContext * opCtx,CursorId id,AuthCheck checkSessionAuth)547 StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx,
548 CursorId id,
549 AuthCheck checkSessionAuth) {
550 auto lockedPartition = _cursorMap->lockOnePartition(id);
551 auto it = lockedPartition->find(id);
552 if (it == lockedPartition->end()) {
553 return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
554 }
555
556 ClientCursor* cursor = it->second;
557 uassert(ErrorCodes::CursorInUse,
558 str::stream() << "cursor id " << id << " is already in use",
559 !cursor->_isPinned);
560 if (cursor->getExecutor()->isMarkedAsKilled()) {
561 // This cursor was killed while it was idle.
562 Status error{ErrorCodes::QueryPlanKilled,
563 str::stream() << "cursor killed because: "
564 << cursor->getExecutor()->getKillReason()};
565 deregisterAndDestroyCursor(std::move(lockedPartition),
566 opCtx,
567 std::unique_ptr<ClientCursor, ClientCursor::Deleter>(cursor));
568 return error;
569 }
570
571 if (checkSessionAuth == kCheckSession) {
572 auto cursorPrivilegeStatus = checkCursorSessionPrivilege(opCtx, cursor->getSessionId());
573 if (!cursorPrivilegeStatus.isOK()) {
574 return cursorPrivilegeStatus;
575 }
576 }
577
578 cursor->_isPinned = true;
579
580 // We use pinning of a cursor as a proxy for active, user-initiated use of a cursor. Therefor,
581 // we pass down to the logical session cache and vivify the record (updating last use).
582 if (cursor->getSessionId()) {
583 auto vivifyCursorStatus =
584 LogicalSessionCache::get(opCtx)->vivify(opCtx, cursor->getSessionId().get());
585 if (!vivifyCursorStatus.isOK()) {
586 return vivifyCursorStatus;
587 }
588 }
589
590 return ClientCursorPin(opCtx, cursor);
591 }
592
unpin(OperationContext * opCtx,std::unique_ptr<ClientCursor,ClientCursor::Deleter> cursor)593 void CursorManager::unpin(OperationContext* opCtx,
594 std::unique_ptr<ClientCursor, ClientCursor::Deleter> cursor) {
595 // Avoid computing the current time within the critical section.
596 auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
597
598 auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid());
599 invariant(cursor->_isPinned);
600 cursor->_isPinned = false;
601 cursor->_lastUseDate = now;
602
603 // The cursor will stay around in '_cursorMap', so release the unique pointer to avoid deleting
604 // it.
605 cursor.release();
606 }
607
getCursorIds(std::set<CursorId> * openCursors) const608 void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
609 auto allPartitions = _cursorMap->lockAllPartitions();
610 for (auto&& partition : allPartitions) {
611 for (auto&& entry : partition) {
612 openCursors->insert(entry.first);
613 }
614 }
615 }
616
appendActiveSessions(LogicalSessionIdSet * lsids) const617 void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const {
618 auto allPartitions = _cursorMap->lockAllPartitions();
619 for (auto&& partition : allPartitions) {
620 for (auto&& entry : partition) {
621 auto cursor = entry.second;
622 if (auto id = cursor->getSessionId()) {
623 lsids->insert(id.value());
624 }
625 }
626 }
627 }
628
appendActiveCursors(std::vector<GenericCursor> * cursors) const629 void CursorManager::appendActiveCursors(std::vector<GenericCursor>* cursors) const {
630 auto allPartitions = _cursorMap->lockAllPartitions();
631 for (auto&& partition : allPartitions) {
632 for (auto&& entry : partition) {
633 auto cursor = entry.second;
634 cursors->emplace_back();
635 auto& gc = cursors->back();
636 gc.setId(cursor->_cursorid);
637 gc.setNs(cursor->nss());
638 gc.setLsid(cursor->getSessionId());
639 }
640 }
641 }
642
getCursorIdsForNamespace(const NamespaceString & nss) const643 std::vector<CursorId> CursorManager::getCursorIdsForNamespace(const NamespaceString& nss) const {
644 std::vector<CursorId> cursors;
645
646 auto allPartitions = _cursorMap->lockAllPartitions();
647 for (auto&& partition : allPartitions) {
648 for (auto&& entry : partition) {
649 auto cursor = entry.second;
650 if (cursor->nss() == nss) {
651 cursors.emplace_back(cursor->cursorid());
652 }
653 }
654 }
655
656 return cursors;
657 }
658
getCursorsForSession(LogicalSessionId lsid) const659 stdx::unordered_set<CursorId> CursorManager::getCursorsForSession(LogicalSessionId lsid) const {
660 stdx::unordered_set<CursorId> cursors;
661
662 auto allPartitions = _cursorMap->lockAllPartitions();
663 for (auto&& partition : allPartitions) {
664 for (auto&& entry : partition) {
665 auto cursor = entry.second;
666 if (cursor->getSessionId() == lsid) {
667 cursors.insert(cursor->cursorid());
668 }
669 }
670 }
671
672 return cursors;
673 }
674
numCursors() const675 size_t CursorManager::numCursors() const {
676 return _cursorMap->size();
677 }
678
allocateCursorId_inlock()679 CursorId CursorManager::allocateCursorId_inlock() {
680 for (int i = 0; i < 10000; i++) {
681 // The leading two bits of a CursorId are used to determine if the cursor is registered on
682 // the global cursor manager.
683 CursorId id;
684 if (isGlobalManager()) {
685 // This is the global cursor manager, so generate a random number and make sure the
686 // first two bits are 01.
687 uint64_t mask = 0x3FFFFFFFFFFFFFFF;
688 uint64_t bitToSet = 1ULL << 62;
689 id = ((_random->nextInt64() & mask) | bitToSet);
690 } else {
691 // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
692 // bits are random.
693 uint32_t myPart = static_cast<uint32_t>(_random->nextInt32());
694 id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
695 }
696 auto partition = _cursorMap->lockOnePartition(id);
697 if (partition->count(id) == 0)
698 return id;
699 }
700 fassertFailed(17360);
701 }
702
registerCursor(OperationContext * opCtx,ClientCursorParams && cursorParams)703 ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
704 ClientCursorParams&& cursorParams) {
705 // Avoid computing the current time within the critical section.
706 auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
707
708 // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping
709 // it.
710 invariant(cursorParams.exec);
711 deregisterExecutor(cursorParams.exec.get());
712 cursorParams.exec.get_deleter().dismissDisposal();
713 cursorParams.exec->unsetRegistered();
714
715 // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure
716 // we don't insert two cursors with the same cursor id.
717 stdx::lock_guard<SimpleMutex> lock(_registrationLock);
718 CursorId cursorId = allocateCursorId_inlock();
719 std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(new ClientCursor(
720 std::move(cursorParams), this, cursorId, opCtx->getLogicalSessionId(), now));
721
722 // Transfer ownership of the cursor to '_cursorMap'.
723 auto partition = _cursorMap->lockOnePartition(cursorId);
724 ClientCursor* unownedCursor = clientCursor.release();
725 partition->emplace(cursorId, unownedCursor);
726 return ClientCursorPin(opCtx, unownedCursor);
727 }
728
deregisterCursor(ClientCursor * cc)729 void CursorManager::deregisterCursor(ClientCursor* cc) {
730 _cursorMap->erase(cc->cursorid());
731 }
732
deregisterAndDestroyCursor(Partitioned<stdx::unordered_map<CursorId,ClientCursor * >,kNumPartitions>::OnePartition && lk,OperationContext * opCtx,std::unique_ptr<ClientCursor,ClientCursor::Deleter> cursor)733 void CursorManager::deregisterAndDestroyCursor(
734 Partitioned<stdx::unordered_map<CursorId, ClientCursor*>, kNumPartitions>::OnePartition&& lk,
735 OperationContext* opCtx,
736 std::unique_ptr<ClientCursor, ClientCursor::Deleter> cursor) {
737 {
738 auto lockWithRestrictedScope = std::move(lk);
739 lockWithRestrictedScope->erase(cursor->cursorid());
740 }
741 // Dispose of the cursor without holding any cursor manager mutexes. Disposal of a cursor
742 // can require taking lock manager locks, which we want to avoid while holding a mutex. If
743 // we did so, any caller of a CursorManager method which already held a lock manager lock
744 // could induce a deadlock when trying to acquire a CursorManager lock.
745 cursor->dispose(opCtx);
746 }
747
eraseCursor(OperationContext * opCtx,CursorId id,bool shouldAudit)748 Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
749 auto lockedPartition = _cursorMap->lockOnePartition(id);
750 auto it = lockedPartition->find(id);
751 if (it == lockedPartition->end()) {
752 if (shouldAudit) {
753 audit::logKillCursorsAuthzCheck(
754 opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound);
755 }
756 return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
757 }
758 auto cursor = it->second;
759
760 if (cursor->_isPinned) {
761 if (shouldAudit) {
762 audit::logKillCursorsAuthzCheck(
763 opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed);
764 }
765 return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id};
766 }
767 std::unique_ptr<ClientCursor, ClientCursor::Deleter> ownedCursor(cursor);
768
769 if (shouldAudit) {
770 audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
771 }
772
773 deregisterAndDestroyCursor(std::move(lockedPartition), opCtx, std::move(ownedCursor));
774 return Status::OK();
775 }
776
777 } // namespace mongo
778