1 // ttl.cpp
2
3
4 /**
5 * Copyright (C) 2018-present MongoDB, Inc.
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the Server Side Public License, version 1,
9 * as published by MongoDB, Inc.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * Server Side Public License for more details.
15 *
16 * You should have received a copy of the Server Side Public License
17 * along with this program. If not, see
18 * <http://www.mongodb.com/licensing/server-side-public-license>.
19 *
20 * As a special exception, the copyright holders give permission to link the
21 * code of portions of this program with the OpenSSL library under certain
22 * conditions as described in each individual source file and distribute
23 * linked combinations including the program with the OpenSSL library. You
24 * must comply with the Server Side Public License in all respects for
25 * all of the code used other than as permitted herein. If you modify file(s)
26 * with this exception, you may extend this exception to your version of the
27 * file(s), but you are not obligated to do so. If you do not wish to do so,
28 * delete this exception statement from your version. If you delete this
29 * exception statement from all source files in the program, then also delete
30 * it in the license file.
31 */
32
33 #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex
34
35 #include "mongo/platform/basic.h"
36
37 #include "mongo/db/ttl.h"
38
39 #include "mongo/base/counter.h"
40 #include "mongo/db/auth/authorization_session.h"
41 #include "mongo/db/auth/user_name.h"
42 #include "mongo/db/catalog/collection.h"
43 #include "mongo/db/catalog/collection_catalog_entry.h"
44 #include "mongo/db/catalog/database_catalog_entry.h"
45 #include "mongo/db/catalog/database_holder.h"
46 #include "mongo/db/catalog/index_catalog.h"
47 #include "mongo/db/client.h"
48 #include "mongo/db/commands/fsync.h"
49 #include "mongo/db/commands/server_status_metric.h"
50 #include "mongo/db/concurrency/write_conflict_exception.h"
51 #include "mongo/db/db_raii.h"
52 #include "mongo/db/exec/delete.h"
53 #include "mongo/db/index/index_descriptor.h"
54 #include "mongo/db/namespace_string.h"
55 #include "mongo/db/ops/insert.h"
56 #include "mongo/db/query/internal_plans.h"
57 #include "mongo/db/repl/replication_coordinator_global.h"
58 #include "mongo/db/server_parameters.h"
59 #include "mongo/db/ttl_collection_cache.h"
60 #include "mongo/util/background.h"
61 #include "mongo/util/concurrency/idle_thread_block.h"
62 #include "mongo/util/exit.h"
63 #include "mongo/util/log.h"
64
65 namespace mongo {
66
67 Counter64 ttlPasses;
68 Counter64 ttlDeletedDocuments;
69
70 ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses);
71 ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments",
72 &ttlDeletedDocuments);
73
74 MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorEnabled, bool, true);
75 MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorSleepSecs, int, 60); // used for testing
76
77 class TTLMonitor : public BackgroundJob {
78 public:
TTLMonitor()79 TTLMonitor() {}
~TTLMonitor()80 virtual ~TTLMonitor() {}
81
name() const82 virtual std::string name() const {
83 return "TTLMonitor";
84 }
85
86 static std::string secondsExpireField;
87
run()88 virtual void run() {
89 Client::initThread(name().c_str());
90 AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
91
92 while (!globalInShutdownDeprecated()) {
93 {
94 MONGO_IDLE_THREAD_BLOCK;
95 sleepsecs(ttlMonitorSleepSecs.load());
96 }
97
98 LOG(3) << "thread awake";
99
100 if (!ttlMonitorEnabled.load()) {
101 LOG(1) << "disabled";
102 continue;
103 }
104
105 if (lockedForWriting()) {
106 // Note: this is not perfect as you can go into fsync+lock between this and actually
107 // doing the delete later.
108 LOG(3) << "locked for writing";
109 continue;
110 }
111
112 try {
113 doTTLPass();
114 } catch (const WriteConflictException&) {
115 LOG(1) << "got WriteConflictException";
116 }
117 }
118 }
119
120 private:
doTTLPass()121 void doTTLPass() {
122 const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
123 OperationContext& opCtx = *opCtxPtr;
124
125 // If part of replSet but not in a readable state (e.g. during initial sync), skip.
126 if (repl::getGlobalReplicationCoordinator()->getReplicationMode() ==
127 repl::ReplicationCoordinator::modeReplSet &&
128 !repl::getGlobalReplicationCoordinator()->getMemberState().readable())
129 return;
130
131 TTLCollectionCache& ttlCollectionCache = TTLCollectionCache::get(getGlobalServiceContext());
132 std::vector<std::string> ttlCollections = ttlCollectionCache.getCollections();
133 std::vector<BSONObj> ttlIndexes;
134
135 ttlPasses.increment();
136
137 // Get all TTL indexes from every collection.
138 for (const std::string& collectionNS : ttlCollections) {
139 NamespaceString collectionNSS(collectionNS);
140 AutoGetCollection autoGetCollection(&opCtx, collectionNSS, MODE_IS);
141 Collection* coll = autoGetCollection.getCollection();
142 if (!coll) {
143 // Skip since collection has been dropped.
144 continue;
145 }
146
147 CollectionCatalogEntry* collEntry = coll->getCatalogEntry();
148 std::vector<std::string> indexNames;
149 collEntry->getAllIndexes(&opCtx, &indexNames);
150 for (const std::string& name : indexNames) {
151 BSONObj spec = collEntry->getIndexSpec(&opCtx, name);
152 if (spec.hasField(secondsExpireField)) {
153 ttlIndexes.push_back(spec.getOwned());
154 }
155 }
156 }
157
158 for (const BSONObj& idx : ttlIndexes) {
159 try {
160 doTTLForIndex(&opCtx, idx);
161 } catch (const DBException& dbex) {
162 error() << "Error processing ttl index: " << idx << " -- " << dbex.toString();
163 // Continue on to the next index.
164 continue;
165 }
166 }
167 }
168
169 /**
170 * Remove documents from the collection using the specified TTL index after a sufficient amount
171 * of time has passed according to its expiry specification.
172 */
doTTLForIndex(OperationContext * opCtx,BSONObj idx)173 void doTTLForIndex(OperationContext* opCtx, BSONObj idx) {
174 const NamespaceString collectionNSS(idx["ns"].String());
175 if (collectionNSS.isDropPendingNamespace()) {
176 return;
177 }
178 if (!userAllowedWriteNS(collectionNSS).isOK()) {
179 error() << "namespace '" << collectionNSS
180 << "' doesn't allow deletes, skipping ttl job for: " << idx;
181 return;
182 }
183
184 const BSONObj key = idx["key"].Obj();
185 const StringData name = idx["name"].valueStringData();
186 if (key.nFields() != 1) {
187 error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx;
188 return;
189 }
190
191 LOG(1) << "ns: " << collectionNSS << " key: " << key << " name: " << name;
192
193 AutoGetCollection autoGetCollection(opCtx, collectionNSS, MODE_IX);
194 Collection* collection = autoGetCollection.getCollection();
195 if (!collection) {
196 // Collection was dropped.
197 return;
198 }
199
200 if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, collectionNSS)) {
201 return;
202 }
203
204 IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, name);
205 if (!desc) {
206 LOG(1) << "index not found (index build in progress? index dropped?), skipping "
207 << "ttl job for: " << idx;
208 return;
209 }
210
211 // Re-read 'idx' from the descriptor, in case the collection or index definition changed
212 // before we re-acquired the collection lock.
213 idx = desc->infoObj();
214
215 if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
216 error() << "special index can't be used as a ttl index, skipping ttl job for: " << idx;
217 return;
218 }
219
220 BSONElement secondsExpireElt = idx[secondsExpireField];
221 if (!secondsExpireElt.isNumber()) {
222 error() << "ttl indexes require the " << secondsExpireField << " field to be "
223 << "numeric but received a type of " << typeName(secondsExpireElt.type())
224 << ", skipping ttl job for: " << idx;
225 return;
226 }
227
228 const Date_t kDawnOfTime =
229 Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min());
230 const Date_t expirationTime = Date_t::now() - Seconds(secondsExpireElt.numberLong());
231 const BSONObj startKey = BSON("" << kDawnOfTime);
232 const BSONObj endKey = BSON("" << expirationTime);
233 // The canonical check as to whether a key pattern element is "ascending" or
234 // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
235 const InternalPlanner::Direction direction = (key.firstElement().number() >= 0)
236 ? InternalPlanner::Direction::FORWARD
237 : InternalPlanner::Direction::BACKWARD;
238
239 // We need to pass into the DeleteStageParams (below) a CanonicalQuery with a BSONObj that
240 // queries for the expired documents correctly so that we do not delete documents that are
241 // not actually expired when our snapshot changes during deletion.
242 const char* keyFieldName = key.firstElement().fieldName();
243 BSONObj query =
244 BSON(keyFieldName << BSON("$gte" << kDawnOfTime << "$lte" << expirationTime));
245 auto qr = stdx::make_unique<QueryRequest>(collectionNSS);
246 qr->setFilter(query);
247 auto canonicalQuery = CanonicalQuery::canonicalize(opCtx, std::move(qr));
248 invariantOK(canonicalQuery.getStatus());
249
250 DeleteStageParams params;
251 params.isMulti = true;
252 params.canonicalQuery = canonicalQuery.getValue().get();
253
254 auto exec =
255 InternalPlanner::deleteWithIndexScan(opCtx,
256 collection,
257 params,
258 desc,
259 startKey,
260 endKey,
261 BoundInclusion::kIncludeBothStartAndEndKeys,
262 PlanExecutor::YIELD_AUTO,
263 direction);
264
265 Status result = exec->executePlan();
266 if (!result.isOK()) {
267 error() << "ttl query execution for index " << idx
268 << " failed with status: " << redact(result);
269 return;
270 }
271
272 const long long numDeleted = DeleteStage::getNumDeleted(*exec);
273 ttlDeletedDocuments.increment(numDeleted);
274 LOG(1) << "deleted: " << numDeleted;
275 }
276 };
277
278 namespace {
279 // The global TTLMonitor object is intentionally leaked. Even though it is only used in one
280 // function, we declare it here to indicate to the leak sanitizer that the leak of this object
281 // should not be reported.
282 TTLMonitor* ttlMonitor = nullptr;
283 } // namespace
284
startTTLBackgroundJob()285 void startTTLBackgroundJob() {
286 ttlMonitor = new TTLMonitor();
287 ttlMonitor->go();
288 }
289
290 std::string TTLMonitor::secondsExpireField = "expireAfterSeconds";
291 } // namespace mongo
292