1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
32 
33 #include "mongo/platform/basic.h"
34 
35 #include "mongo/db/cursor_manager.h"
36 
37 #include "mongo/base/data_cursor.h"
38 #include "mongo/base/init.h"
39 #include "mongo/db/audit.h"
40 #include "mongo/db/auth/authorization_session.h"
41 #include "mongo/db/background.h"
42 #include "mongo/db/catalog/collection.h"
43 #include "mongo/db/catalog/database.h"
44 #include "mongo/db/catalog/database_holder.h"
45 #include "mongo/db/client.h"
46 #include "mongo/db/cursor_server_params.h"
47 #include "mongo/db/db_raii.h"
48 #include "mongo/db/kill_sessions_common.h"
49 #include "mongo/db/logical_session_cache.h"
50 #include "mongo/db/namespace_string.h"
51 #include "mongo/db/operation_context.h"
52 #include "mongo/db/query/plan_executor.h"
53 #include "mongo/db/server_parameters.h"
54 #include "mongo/db/service_context.h"
55 #include "mongo/platform/random.h"
56 #include "mongo/stdx/memory.h"
57 #include "mongo/util/exit.h"
58 #include "mongo/util/log.h"
59 #include "mongo/util/startup_test.h"
60 
61 namespace mongo {
62 using std::vector;
63 
64 constexpr int CursorManager::kNumPartitions;
65 
66 namespace {
idFromCursorId(CursorId id)67 uint32_t idFromCursorId(CursorId id) {
68     uint64_t x = static_cast<uint64_t>(id);
69     x = x >> 32;
70     return static_cast<uint32_t>(x);
71 }
72 
cursorIdFromParts(uint32_t collectionIdentifier,uint32_t cursor)73 CursorId cursorIdFromParts(uint32_t collectionIdentifier, uint32_t cursor) {
74     // The leading two bits of a non-global CursorId should be 0.
75     invariant((collectionIdentifier & (0b11 << 30)) == 0);
76     CursorId x = static_cast<CursorId>(collectionIdentifier) << 32;
77     x |= cursor;
78     return x;
79 }
80 
81 class GlobalCursorIdCache {
82 public:
83     GlobalCursorIdCache();
84     ~GlobalCursorIdCache();
85 
86     /**
87      * Returns a unique 32-bit identifier to be used as the first 32 bits of all cursor ids for a
88      * new CursorManager.
89      */
90     uint32_t registerCursorManager(const NamespaceString& nss);
91 
92     /**
93      * Must be called when a CursorManager is deleted. 'id' must be the identifier returned by
94      * registerCursorManager().
95      */
96     void deregisterCursorManager(uint32_t id, const NamespaceString& nss);
97 
98     /**
99      * works globally
100      */
101     bool eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth);
102 
103     void appendStats(BSONObjBuilder& builder);
104 
105     std::size_t timeoutCursors(OperationContext* opCtx, Date_t now);
106 
107     template <typename Visitor>
108     void visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor);
109 
110     int64_t nextSeed();
111 
112 private:
113     SimpleMutex _mutex;
114 
115     typedef unordered_map<unsigned, NamespaceString> Map;
116     Map _idToNss;
117     unsigned _nextId;
118 
119     std::unique_ptr<SecureRandom> _secureRandom;
120 };
121 
122 // Note that "globalCursorIdCache" must be declared before "globalCursorManager", as the latter
123 // calls into the former during destruction.
124 std::unique_ptr<GlobalCursorIdCache> globalCursorIdCache;
125 std::unique_ptr<CursorManager> globalCursorManager;
126 
MONGO_INITIALIZER(GlobalCursorIdCache)127 MONGO_INITIALIZER(GlobalCursorIdCache)(InitializerContext* context) {
128     globalCursorIdCache.reset(new GlobalCursorIdCache());
129     return Status::OK();
130 }
131 
132 MONGO_INITIALIZER_WITH_PREREQUISITES(GlobalCursorManager, ("GlobalCursorIdCache"))
133 (InitializerContext* context) {
134     globalCursorManager.reset(new CursorManager({}));
135     return Status::OK();
136 }
137 
GlobalCursorIdCache()138 GlobalCursorIdCache::GlobalCursorIdCache() : _nextId(0), _secureRandom() {}
139 
~GlobalCursorIdCache()140 GlobalCursorIdCache::~GlobalCursorIdCache() {}
141 
nextSeed()142 int64_t GlobalCursorIdCache::nextSeed() {
143     stdx::lock_guard<SimpleMutex> lk(_mutex);
144     if (!_secureRandom)
145         _secureRandom = SecureRandom::create();
146     return _secureRandom->nextInt64();
147 }
148 
registerCursorManager(const NamespaceString & nss)149 uint32_t GlobalCursorIdCache::registerCursorManager(const NamespaceString& nss) {
150     static const uint32_t kMaxIds = 1000 * 1000 * 1000;
151     static_assert((kMaxIds & (0b11 << 30)) == 0,
152                   "the first two bits of a collection identifier must always be zeroes");
153 
154     stdx::lock_guard<SimpleMutex> lk(_mutex);
155 
156     fassert(17359, _idToNss.size() < kMaxIds);
157 
158     for (uint32_t i = 0; i <= kMaxIds; i++) {
159         uint32_t id = ++_nextId;
160         if (id == 0)
161             continue;
162         if (_idToNss.count(id) > 0)
163             continue;
164         _idToNss[id] = nss;
165         return id;
166     }
167 
168     MONGO_UNREACHABLE;
169 }
170 
deregisterCursorManager(uint32_t id,const NamespaceString & nss)171 void GlobalCursorIdCache::deregisterCursorManager(uint32_t id, const NamespaceString& nss) {
172     stdx::lock_guard<SimpleMutex> lk(_mutex);
173     invariant(nss == _idToNss[id]);
174     _idToNss.erase(id);
175 }
176 
eraseCursor(OperationContext * opCtx,CursorId id,bool checkAuth)177 bool GlobalCursorIdCache::eraseCursor(OperationContext* opCtx, CursorId id, bool checkAuth) {
178     // Figure out what the namespace of this cursor is.
179     NamespaceString nss;
180     if (CursorManager::isGloballyManagedCursor(id)) {
181         auto pin = globalCursorManager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
182         if (!pin.isOK()) {
183             invariant(pin == ErrorCodes::CursorNotFound || pin == ErrorCodes::Unauthorized);
184             // No such cursor.  TODO: Consider writing to audit log here (even though we don't
185             // have a namespace).
186             return false;
187         }
188         nss = pin.getValue().getCursor()->nss();
189     } else {
190         stdx::lock_guard<SimpleMutex> lk(_mutex);
191         uint32_t nsid = idFromCursorId(id);
192         Map::const_iterator it = _idToNss.find(nsid);
193         if (it == _idToNss.end()) {
194             // No namespace corresponding to this cursor id prefix.  TODO: Consider writing to
195             // audit log here (even though we don't have a namespace).
196             return false;
197         }
198         nss = it->second;
199     }
200     invariant(nss.isValid());
201 
202     // Check if we are authorized to erase this cursor.
203     if (checkAuth) {
204         auto status = CursorManager::withCursorManager(
205             opCtx, id, nss, [nss, id, opCtx](CursorManager* manager) {
206                 auto ccPin = manager->pinCursor(opCtx, id, CursorManager::kNoCheckSession);
207                 if (!ccPin.isOK()) {
208                     return ccPin.getStatus();
209                 }
210                 AuthorizationSession* as = AuthorizationSession::get(opCtx->getClient());
211                 auto cursorOwner = ccPin.getValue().getCursor()->getAuthenticatedUsers();
212                 return as->checkAuthForKillCursors(nss, cursorOwner);
213             });
214         if (!status.isOK()) {
215             audit::logKillCursorsAuthzCheck(opCtx->getClient(), nss, id, status.code());
216             return false;
217         }
218     }
219 
220     // If this cursor is owned by the global cursor manager, ask it to erase the cursor for us.
221     if (CursorManager::isGloballyManagedCursor(id)) {
222         Status eraseStatus = globalCursorManager->eraseCursor(opCtx, id, checkAuth);
223         massert(28697,
224                 eraseStatus.reason(),
225                 eraseStatus.code() == ErrorCodes::OK ||
226                     eraseStatus.code() == ErrorCodes::CursorNotFound);
227         return eraseStatus.isOK();
228     }
229 
230     // If not, then the cursor must be owned by a collection.  Erase the cursor under the
231     // collection lock (to prevent the collection from going away during the erase).
232     AutoGetCollectionForReadCommand ctx(opCtx, nss);
233     Collection* collection = ctx.getCollection();
234     if (!collection) {
235         if (checkAuth)
236             audit::logKillCursorsAuthzCheck(
237                 opCtx->getClient(), nss, id, ErrorCodes::CursorNotFound);
238         return false;
239     }
240 
241     Status eraseStatus = collection->getCursorManager()->eraseCursor(opCtx, id, checkAuth);
242     uassert(16089,
243             eraseStatus.reason(),
244             eraseStatus.code() == ErrorCodes::OK ||
245                 eraseStatus.code() == ErrorCodes::CursorNotFound);
246     return eraseStatus.isOK();
247 }
248 
timeoutCursors(OperationContext * opCtx,Date_t now)249 std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t now) {
250     size_t totalTimedOut = 0;
251 
252     // Time out the cursors from the global cursor manager.
253     totalTimedOut += globalCursorManager->timeoutCursors(opCtx, now);
254 
255     // Compute the set of collection names that we have to time out cursors for.
256     vector<NamespaceString> todo;
257     {
258         stdx::lock_guard<SimpleMutex> lk(_mutex);
259         for (auto&& entry : _idToNss) {
260             todo.push_back(entry.second);
261         }
262     }
263 
264     // For each collection, time out its cursors under the collection lock (to prevent the
265     // collection from going away during the erase).
266     for (unsigned i = 0; i < todo.size(); i++) {
267         AutoGetCollectionOrViewForReadCommand ctx(opCtx, NamespaceString(todo[i]));
268         if (!ctx.getDb()) {
269             continue;
270         }
271 
272         Collection* collection = ctx.getCollection();
273         if (collection == NULL) {
274             continue;
275         }
276 
277         totalTimedOut += collection->getCursorManager()->timeoutCursors(opCtx, now);
278     }
279 
280     return totalTimedOut;
281 }
282 }  // namespace
283 
284 template <typename Visitor>
visitAllCursorManagers(OperationContext * opCtx,Visitor * visitor)285 void GlobalCursorIdCache::visitAllCursorManagers(OperationContext* opCtx, Visitor* visitor) {
286     (*visitor)(*globalCursorManager);
287 
288     // Compute the set of collection names that we have to get sessions for
289     vector<NamespaceString> namespaces;
290     {
291         stdx::lock_guard<SimpleMutex> lk(_mutex);
292         for (auto&& entry : _idToNss) {
293             namespaces.push_back(entry.second);
294         }
295     }
296 
297     // For each collection, get its sessions under the collection lock (to prevent the
298     // collection from going away during the erase).
299     for (auto&& ns : namespaces) {
300         AutoGetCollectionOrView ctx(opCtx, NamespaceString(ns), MODE_IS);
301         if (!ctx.getDb()) {
302             continue;
303         }
304 
305         Collection* collection = ctx.getCollection();
306         if (!collection) {
307             continue;
308         }
309 
310         (*visitor)(*(collection->getCursorManager()));
311     }
312 }
313 
314 // ---
315 
getGlobalCursorManager()316 CursorManager* CursorManager::getGlobalCursorManager() {
317     return globalCursorManager.get();
318 }
319 
appendAllActiveSessions(OperationContext * opCtx,LogicalSessionIdSet * lsids)320 void CursorManager::appendAllActiveSessions(OperationContext* opCtx, LogicalSessionIdSet* lsids) {
321     auto visitor = [&](CursorManager& mgr) { mgr.appendActiveSessions(lsids); };
322     globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
323 }
324 
getAllCursors(OperationContext * opCtx)325 std::vector<GenericCursor> CursorManager::getAllCursors(OperationContext* opCtx) {
326     std::vector<GenericCursor> cursors;
327     auto visitor = [&](CursorManager& mgr) { mgr.appendActiveCursors(&cursors); };
328     globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
329 
330     return cursors;
331 }
332 
killCursorsWithMatchingSessions(OperationContext * opCtx,const SessionKiller::Matcher & matcher)333 std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions(
334     OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
335     auto eraser = [&](CursorManager& mgr, CursorId id) {
336         uassertStatusOK(mgr.eraseCursor(opCtx, id, true));
337         log() << "killing cursor: " << id << " as part of killing session(s)";
338     };
339 
340     auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
341     globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
342 
343     return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
344 }
345 
timeoutCursorsGlobal(OperationContext * opCtx,Date_t now)346 std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
347     return globalCursorIdCache->timeoutCursors(opCtx, now);
348 }
349 
eraseCursorGlobalIfAuthorized(OperationContext * opCtx,int n,const char * _ids)350 int CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, int n, const char* _ids) {
351     ConstDataCursor ids(_ids);
352     int numDeleted = 0;
353     for (int i = 0; i < n; i++) {
354         if (eraseCursorGlobalIfAuthorized(opCtx, ids.readAndAdvance<LittleEndian<int64_t>>()))
355             numDeleted++;
356         if (globalInShutdownDeprecated())
357             break;
358     }
359     return numDeleted;
360 }
eraseCursorGlobalIfAuthorized(OperationContext * opCtx,CursorId id)361 bool CursorManager::eraseCursorGlobalIfAuthorized(OperationContext* opCtx, CursorId id) {
362     return globalCursorIdCache->eraseCursor(opCtx, id, true);
363 }
eraseCursorGlobal(OperationContext * opCtx,CursorId id)364 bool CursorManager::eraseCursorGlobal(OperationContext* opCtx, CursorId id) {
365     return globalCursorIdCache->eraseCursor(opCtx, id, false);
366 }
367 
withCursorManager(OperationContext * opCtx,CursorId id,const NamespaceString & nss,stdx::function<Status (CursorManager *)> callback)368 Status CursorManager::withCursorManager(OperationContext* opCtx,
369                                         CursorId id,
370                                         const NamespaceString& nss,
371                                         stdx::function<Status(CursorManager*)> callback) {
372     boost::optional<AutoGetCollectionForReadCommand> readLock;
373     CursorManager* cursorManager = nullptr;
374 
375     if (CursorManager::isGloballyManagedCursor(id)) {
376         cursorManager = CursorManager::getGlobalCursorManager();
377     } else {
378         readLock.emplace(opCtx, nss);
379         Collection* collection = readLock->getCollection();
380         if (!collection) {
381             return {ErrorCodes::CursorNotFound,
382                     str::stream() << "collection does not exist: " << nss.ns()};
383         }
384         cursorManager = collection->getCursorManager();
385     }
386     invariant(cursorManager);
387 
388     return callback(cursorManager);
389 }
390 
391 // --------------------------
392 
operator ()(const PlanExecutor * exec,const std::size_t nPartitions)393 std::size_t CursorManager::PlanExecutorPartitioner::operator()(const PlanExecutor* exec,
394                                                                const std::size_t nPartitions) {
395     auto token = exec->getRegistrationToken();
396     invariant(token);
397     return (*token) % nPartitions;
398 }
399 
CursorManager(NamespaceString nss)400 CursorManager::CursorManager(NamespaceString nss)
401     : _nss(std::move(nss)),
402       _collectionCacheRuntimeId(_nss.isEmpty() ? 0
403                                                : globalCursorIdCache->registerCursorManager(_nss)),
404       _random(stdx::make_unique<PseudoRandom>(globalCursorIdCache->nextSeed())),
405       _registeredPlanExecutors(),
406       _cursorMap(stdx::make_unique<Partitioned<unordered_map<CursorId, ClientCursor*>>>()) {}
407 
~CursorManager()408 CursorManager::~CursorManager() {
409     // All cursors and PlanExecutors should have been deleted already.
410     invariant(_registeredPlanExecutors.empty());
411     invariant(_cursorMap->empty());
412 
413     if (!isGlobalManager()) {
414         globalCursorIdCache->deregisterCursorManager(_collectionCacheRuntimeId, _nss);
415     }
416 }
417 
invalidateAll(OperationContext * opCtx,bool collectionGoingAway,const std::string & reason)418 void CursorManager::invalidateAll(OperationContext* opCtx,
419                                   bool collectionGoingAway,
420                                   const std::string& reason) {
421     invariant(!isGlobalManager());  // The global cursor manager should never need to kill cursors.
422     dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X));
423     fassert(28819, !BackgroundOperation::inProgForNs(_nss));
424     auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
425     for (auto&& partition : allExecPartitions) {
426         for (auto&& exec : partition) {
427             // The PlanExecutor is owned elsewhere, so we just mark it as killed and let it be
428             // cleaned up later.
429             exec->markAsKilled(reason);
430         }
431     }
432     allExecPartitions.clear();
433 
434     // Mark all cursors as killed, but keep around those we can in order to provide a useful error
435     // message to the user when they attempt to use it next time.
436     std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDisposeWithoutMutex;
437     {
438         auto allCurrentPartitions = _cursorMap->lockAllPartitions();
439         for (auto&& partition : allCurrentPartitions) {
440             for (auto it = partition.begin(); it != partition.end();) {
441                 auto* cursor = it->second;
442                 cursor->markAsKilled(reason);
443 
444                 // If pinned, there is an active user of this cursor, who is now responsible for
445                 // cleaning it up. Otherwise, we can immediately dispose of it.
446                 if (cursor->_isPinned) {
447                     it = partition.erase(it);
448                     continue;
449                 }
450 
451                 if (!collectionGoingAway) {
452                     // We keep around unpinned cursors so that future attempts to use the cursor
453                     // will result in a useful error message.
454                     ++it;
455                 } else {
456                     toDisposeWithoutMutex.emplace_back(cursor);
457                     it = partition.erase(it);
458                 }
459             }
460         }
461     }
462 
463     // Dispose of the cursors we can now delete. This might involve lock acquisitions for safe
464     // cleanup, so avoid doing it while holding mutexes.
465     for (auto&& cursor : toDisposeWithoutMutex) {
466         cursor->dispose(opCtx);
467     }
468 }
469 
invalidateDocument(OperationContext * opCtx,const RecordId & dl,InvalidationType type)470 void CursorManager::invalidateDocument(OperationContext* opCtx,
471                                        const RecordId& dl,
472                                        InvalidationType type) {
473     dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
474     invariant(!isGlobalManager());  // The global cursor manager should never receive invalidations.
475     if (supportsDocLocking()) {
476         // If a storage engine supports doc locking, then we do not need to invalidate.
477         // The transactional boundaries of the operation protect us.
478         return;
479     }
480 
481     auto allExecPartitions = _registeredPlanExecutors.lockAllPartitions();
482     for (auto&& partition : allExecPartitions) {
483         for (auto&& exec : partition) {
484             exec->invalidate(opCtx, dl, type);
485         }
486     }
487 
488     auto allPartitions = _cursorMap->lockAllPartitions();
489     for (auto&& partition : allPartitions) {
490         for (auto&& entry : partition) {
491             auto exec = entry.second->getExecutor();
492             exec->invalidate(opCtx, dl, type);
493         }
494     }
495 }
496 
cursorShouldTimeout_inlock(const ClientCursor * cursor,Date_t now)497 bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_t now) {
498     if (cursor->isNoTimeout() || cursor->_isPinned) {
499         return false;
500     }
501     return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis());
502 }
503 
timeoutCursors(OperationContext * opCtx,Date_t now)504 std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
505     std::vector<std::unique_ptr<ClientCursor, ClientCursor::Deleter>> toDisposeWithoutMutex;
506 
507     for (size_t partitionId = 0; partitionId < kNumPartitions; ++partitionId) {
508         auto lockedPartition = _cursorMap->lockOnePartitionById(partitionId);
509         for (auto it = lockedPartition->begin(); it != lockedPartition->end();) {
510             auto* cursor = it->second;
511             if (cursorShouldTimeout_inlock(cursor, now)) {
512                 toDisposeWithoutMutex.emplace_back(cursor);
513                 it = lockedPartition->erase(it);
514             } else {
515                 ++it;
516             }
517         }
518     }
519 
520     // Be careful not to dispose of cursors while holding the partition lock.
521     for (auto&& cursor : toDisposeWithoutMutex) {
522         log() << "Cursor id " << cursor->cursorid() << " timed out, idle since "
523               << cursor->getLastUseDate();
524         cursor->dispose(opCtx);
525     }
526     return toDisposeWithoutMutex.size();
527 }
528 
529 namespace {
530 static AtomicUInt32 registeredPlanExecutorId;
531 }  // namespace
532 
registerExecutor(PlanExecutor * exec)533 Partitioned<unordered_set<PlanExecutor*>>::PartitionId CursorManager::registerExecutor(
534     PlanExecutor* exec) {
535     auto partitionId = registeredPlanExecutorId.fetchAndAdd(1);
536     exec->setRegistrationToken(partitionId);
537     _registeredPlanExecutors.insert(exec);
538     return partitionId;
539 }
540 
deregisterExecutor(PlanExecutor * exec)541 void CursorManager::deregisterExecutor(PlanExecutor* exec) {
542     if (auto partitionId = exec->getRegistrationToken()) {
543         _registeredPlanExecutors.erase(exec);
544     }
545 }
546 
pinCursor(OperationContext * opCtx,CursorId id,AuthCheck checkSessionAuth)547 StatusWith<ClientCursorPin> CursorManager::pinCursor(OperationContext* opCtx,
548                                                      CursorId id,
549                                                      AuthCheck checkSessionAuth) {
550     auto lockedPartition = _cursorMap->lockOnePartition(id);
551     auto it = lockedPartition->find(id);
552     if (it == lockedPartition->end()) {
553         return {ErrorCodes::CursorNotFound, str::stream() << "cursor id " << id << " not found"};
554     }
555 
556     ClientCursor* cursor = it->second;
557     uassert(ErrorCodes::CursorInUse,
558             str::stream() << "cursor id " << id << " is already in use",
559             !cursor->_isPinned);
560     if (cursor->getExecutor()->isMarkedAsKilled()) {
561         // This cursor was killed while it was idle.
562         Status error{ErrorCodes::QueryPlanKilled,
563                      str::stream() << "cursor killed because: "
564                                    << cursor->getExecutor()->getKillReason()};
565         deregisterAndDestroyCursor(std::move(lockedPartition),
566                                    opCtx,
567                                    std::unique_ptr<ClientCursor, ClientCursor::Deleter>(cursor));
568         return error;
569     }
570 
571     if (checkSessionAuth == kCheckSession) {
572         auto cursorPrivilegeStatus = checkCursorSessionPrivilege(opCtx, cursor->getSessionId());
573         if (!cursorPrivilegeStatus.isOK()) {
574             return cursorPrivilegeStatus;
575         }
576     }
577 
578     cursor->_isPinned = true;
579 
580     // We use pinning of a cursor as a proxy for active, user-initiated use of a cursor.  Therefor,
581     // we pass down to the logical session cache and vivify the record (updating last use).
582     if (cursor->getSessionId()) {
583         auto vivifyCursorStatus =
584             LogicalSessionCache::get(opCtx)->vivify(opCtx, cursor->getSessionId().get());
585         if (!vivifyCursorStatus.isOK()) {
586             return vivifyCursorStatus;
587         }
588     }
589 
590     return ClientCursorPin(opCtx, cursor);
591 }
592 
unpin(OperationContext * opCtx,std::unique_ptr<ClientCursor,ClientCursor::Deleter> cursor)593 void CursorManager::unpin(OperationContext* opCtx,
594                           std::unique_ptr<ClientCursor, ClientCursor::Deleter> cursor) {
595     // Avoid computing the current time within the critical section.
596     auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
597 
598     auto partitionLock = _cursorMap->lockOnePartition(cursor->cursorid());
599     invariant(cursor->_isPinned);
600     cursor->_isPinned = false;
601     cursor->_lastUseDate = now;
602 
603     // The cursor will stay around in '_cursorMap', so release the unique pointer to avoid deleting
604     // it.
605     cursor.release();
606 }
607 
getCursorIds(std::set<CursorId> * openCursors) const608 void CursorManager::getCursorIds(std::set<CursorId>* openCursors) const {
609     auto allPartitions = _cursorMap->lockAllPartitions();
610     for (auto&& partition : allPartitions) {
611         for (auto&& entry : partition) {
612             openCursors->insert(entry.first);
613         }
614     }
615 }
616 
appendActiveSessions(LogicalSessionIdSet * lsids) const617 void CursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) const {
618     auto allPartitions = _cursorMap->lockAllPartitions();
619     for (auto&& partition : allPartitions) {
620         for (auto&& entry : partition) {
621             auto cursor = entry.second;
622             if (auto id = cursor->getSessionId()) {
623                 lsids->insert(id.value());
624             }
625         }
626     }
627 }
628 
appendActiveCursors(std::vector<GenericCursor> * cursors) const629 void CursorManager::appendActiveCursors(std::vector<GenericCursor>* cursors) const {
630     auto allPartitions = _cursorMap->lockAllPartitions();
631     for (auto&& partition : allPartitions) {
632         for (auto&& entry : partition) {
633             auto cursor = entry.second;
634             cursors->emplace_back();
635             auto& gc = cursors->back();
636             gc.setId(cursor->_cursorid);
637             gc.setNs(cursor->nss());
638             gc.setLsid(cursor->getSessionId());
639         }
640     }
641 }
642 
getCursorIdsForNamespace(const NamespaceString & nss) const643 std::vector<CursorId> CursorManager::getCursorIdsForNamespace(const NamespaceString& nss) const {
644     std::vector<CursorId> cursors;
645 
646     auto allPartitions = _cursorMap->lockAllPartitions();
647     for (auto&& partition : allPartitions) {
648         for (auto&& entry : partition) {
649             auto cursor = entry.second;
650             if (cursor->nss() == nss) {
651                 cursors.emplace_back(cursor->cursorid());
652             }
653         }
654     }
655 
656     return cursors;
657 }
658 
getCursorsForSession(LogicalSessionId lsid) const659 stdx::unordered_set<CursorId> CursorManager::getCursorsForSession(LogicalSessionId lsid) const {
660     stdx::unordered_set<CursorId> cursors;
661 
662     auto allPartitions = _cursorMap->lockAllPartitions();
663     for (auto&& partition : allPartitions) {
664         for (auto&& entry : partition) {
665             auto cursor = entry.second;
666             if (cursor->getSessionId() == lsid) {
667                 cursors.insert(cursor->cursorid());
668             }
669         }
670     }
671 
672     return cursors;
673 }
674 
numCursors() const675 size_t CursorManager::numCursors() const {
676     return _cursorMap->size();
677 }
678 
allocateCursorId_inlock()679 CursorId CursorManager::allocateCursorId_inlock() {
680     for (int i = 0; i < 10000; i++) {
681         // The leading two bits of a CursorId are used to determine if the cursor is registered on
682         // the global cursor manager.
683         CursorId id;
684         if (isGlobalManager()) {
685             // This is the global cursor manager, so generate a random number and make sure the
686             // first two bits are 01.
687             uint64_t mask = 0x3FFFFFFFFFFFFFFF;
688             uint64_t bitToSet = 1ULL << 62;
689             id = ((_random->nextInt64() & mask) | bitToSet);
690         } else {
691             // The first 2 bits are 0, the next 30 bits are the collection identifier, the next 32
692             // bits are random.
693             uint32_t myPart = static_cast<uint32_t>(_random->nextInt32());
694             id = cursorIdFromParts(_collectionCacheRuntimeId, myPart);
695         }
696         auto partition = _cursorMap->lockOnePartition(id);
697         if (partition->count(id) == 0)
698             return id;
699     }
700     fassertFailed(17360);
701 }
702 
registerCursor(OperationContext * opCtx,ClientCursorParams && cursorParams)703 ClientCursorPin CursorManager::registerCursor(OperationContext* opCtx,
704                                               ClientCursorParams&& cursorParams) {
705     // Avoid computing the current time within the critical section.
706     auto now = opCtx->getServiceContext()->getPreciseClockSource()->now();
707 
708     // Make sure the PlanExecutor isn't registered, since we will register the ClientCursor wrapping
709     // it.
710     invariant(cursorParams.exec);
711     deregisterExecutor(cursorParams.exec.get());
712     cursorParams.exec.get_deleter().dismissDisposal();
713     cursorParams.exec->unsetRegistered();
714 
715     // Note we must hold the registration lock from now until insertion into '_cursorMap' to ensure
716     // we don't insert two cursors with the same cursor id.
717     stdx::lock_guard<SimpleMutex> lock(_registrationLock);
718     CursorId cursorId = allocateCursorId_inlock();
719     std::unique_ptr<ClientCursor, ClientCursor::Deleter> clientCursor(new ClientCursor(
720         std::move(cursorParams), this, cursorId, opCtx->getLogicalSessionId(), now));
721 
722     // Transfer ownership of the cursor to '_cursorMap'.
723     auto partition = _cursorMap->lockOnePartition(cursorId);
724     ClientCursor* unownedCursor = clientCursor.release();
725     partition->emplace(cursorId, unownedCursor);
726     return ClientCursorPin(opCtx, unownedCursor);
727 }
728 
deregisterCursor(ClientCursor * cc)729 void CursorManager::deregisterCursor(ClientCursor* cc) {
730     _cursorMap->erase(cc->cursorid());
731 }
732 
deregisterAndDestroyCursor(Partitioned<stdx::unordered_map<CursorId,ClientCursor * >,kNumPartitions>::OnePartition && lk,OperationContext * opCtx,std::unique_ptr<ClientCursor,ClientCursor::Deleter> cursor)733 void CursorManager::deregisterAndDestroyCursor(
734     Partitioned<stdx::unordered_map<CursorId, ClientCursor*>, kNumPartitions>::OnePartition&& lk,
735     OperationContext* opCtx,
736     std::unique_ptr<ClientCursor, ClientCursor::Deleter> cursor) {
737     {
738         auto lockWithRestrictedScope = std::move(lk);
739         lockWithRestrictedScope->erase(cursor->cursorid());
740     }
741     // Dispose of the cursor without holding any cursor manager mutexes. Disposal of a cursor
742     // can require taking lock manager locks, which we want to avoid while holding a mutex. If
743     // we did so, any caller of a CursorManager method which already held a lock manager lock
744     // could induce a deadlock when trying to acquire a CursorManager lock.
745     cursor->dispose(opCtx);
746 }
747 
eraseCursor(OperationContext * opCtx,CursorId id,bool shouldAudit)748 Status CursorManager::eraseCursor(OperationContext* opCtx, CursorId id, bool shouldAudit) {
749     auto lockedPartition = _cursorMap->lockOnePartition(id);
750     auto it = lockedPartition->find(id);
751     if (it == lockedPartition->end()) {
752         if (shouldAudit) {
753             audit::logKillCursorsAuthzCheck(
754                 opCtx->getClient(), _nss, id, ErrorCodes::CursorNotFound);
755         }
756         return {ErrorCodes::CursorNotFound, str::stream() << "Cursor id not found: " << id};
757     }
758     auto cursor = it->second;
759 
760     if (cursor->_isPinned) {
761         if (shouldAudit) {
762             audit::logKillCursorsAuthzCheck(
763                 opCtx->getClient(), _nss, id, ErrorCodes::OperationFailed);
764         }
765         return {ErrorCodes::OperationFailed, str::stream() << "Cannot kill pinned cursor: " << id};
766     }
767     std::unique_ptr<ClientCursor, ClientCursor::Deleter> ownedCursor(cursor);
768 
769     if (shouldAudit) {
770         audit::logKillCursorsAuthzCheck(opCtx->getClient(), _nss, id, ErrorCodes::OK);
771     }
772 
773     deregisterAndDestroyCursor(std::move(lockedPartition), opCtx, std::move(ownedCursor));
774     return Status::OK();
775 }
776 
777 }  // namespace mongo
778