1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include <algorithm>
34 #include <map>
35 #include <utility>
36 #include <vector>
37
38 #include "mongo/db/client.h"
39 #include "mongo/db/concurrency/lock_manager_test_help.h"
40 #include "mongo/db/db_raii.h"
41 #include "mongo/db/field_parser.h"
42 #include "mongo/db/repl/oplog.h"
43 #include "mongo/db/repl/oplog_entry.h"
44 #include "mongo/db/repl/oplog_interface_local.h"
45 #include "mongo/db/repl/repl_client_info.h"
46 #include "mongo/db/repl/replication_coordinator_mock.h"
47 #include "mongo/db/service_context_d_test_fixture.h"
48 #include "mongo/stdx/functional.h"
49 #include "mongo/stdx/mutex.h"
50 #include "mongo/unittest/barrier.h"
51 #include "mongo/util/concurrency/thread_pool.h"
52
53
54 namespace mongo {
55 namespace repl {
56 namespace {
57
58 class OplogTest : public ServiceContextMongoDTest {
59 private:
60 void setUp() override;
61 };
62
setUp()63 void OplogTest::setUp() {
64 // Set up mongod.
65 ServiceContextMongoDTest::setUp();
66
67 auto service = getServiceContext();
68 auto opCtx = cc().makeOperationContext();
69
70 // Set up ReplicationCoordinator and create oplog.
71 ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service));
72 setOplogCollectionName();
73 createOplog(opCtx.get());
74
75 // Ensure that we are primary.
76 auto replCoord = ReplicationCoordinator::get(opCtx.get());
77 ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
78 }
79
80 /**
81 * Assert that oplog only has a single entry and return that oplog entry.
82 */
_getSingleOplogEntry(OperationContext * opCtx)83 OplogEntry _getSingleOplogEntry(OperationContext* opCtx) {
84 OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
85 auto oplogIter = oplogInterface.makeIterator();
86 auto opEntry = unittest::assertGet(oplogIter->next());
87 ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus())
88 << "Expected only 1 document in the oplog collection " << NamespaceString::kRsOplogNamespace
89 << " but found more than 1 document instead";
90 return unittest::assertGet(OplogEntry::parse(opEntry.first));
91 }
92
TEST_F(OplogTest,LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection)93 TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) {
94 auto opCtx = cc().makeOperationContext();
95
96 const NamespaceString nss("test.coll");
97 auto msgObj = BSON("msg"
98 << "hello, world!");
99
100 // Write to the oplog.
101 OpTime opTime;
102 {
103 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
104 WriteUnitOfWork wunit(opCtx.get());
105 opTime = logOp(opCtx.get(),
106 "n",
107 nss,
108 {},
109 msgObj,
110 nullptr,
111 false,
112 Date_t::now(),
113 {},
114 kUninitializedStmtId,
115 {},
116 OplogSlot());
117 ASSERT_FALSE(opTime.isNull());
118 wunit.commit();
119 }
120
121 OplogEntry oplogEntry = _getSingleOplogEntry(opCtx.get());
122
123 // Ensure that msg fields were properly added to the oplog entry.
124 ASSERT_EQUALS(opTime, oplogEntry.getOpTime())
125 << "OpTime returned from logOp() did not match that in the oplog entry written to the "
126 "oplog: "
127 << oplogEntry.toBSON();
128 ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType()) << "Expected 'n' op type but found '"
129 << OpType_serializer(oplogEntry.getOpType())
130 << "' instead: " << oplogEntry.toBSON();
131 ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject());
132
133 // Ensure that the msg optime returned is the same as the last optime in the ReplClientInfo.
134 ASSERT_EQUALS(ReplClientInfo::forClient(&cc()).getLastOp(), opTime);
135 }
136
137 /**
138 * Checks optime and namespace in oplog entry.
139 */
_checkOplogEntry(const OplogEntry & oplogEntry,const OpTime & expectedOpTime,const NamespaceString & expectedNss)140 void _checkOplogEntry(const OplogEntry& oplogEntry,
141 const OpTime& expectedOpTime,
142 const NamespaceString& expectedNss) {
143 ASSERT_EQUALS(expectedOpTime, oplogEntry.getOpTime()) << oplogEntry.toBSON();
144 ASSERT_EQUALS(expectedNss, oplogEntry.getNamespace()) << oplogEntry.toBSON();
145 }
_checkOplogEntry(const OplogEntry & oplogEntry,const std::pair<OpTime,NamespaceString> & expectedOpTimeAndNss)146 void _checkOplogEntry(const OplogEntry& oplogEntry,
147 const std::pair<OpTime, NamespaceString>& expectedOpTimeAndNss) {
148 _checkOplogEntry(oplogEntry, expectedOpTimeAndNss.first, expectedOpTimeAndNss.second);
149 }
150
151 /**
152 * Test function that schedules two concurrent logOp() tasks using a thread pool.
153 * Checks the state of the oplog collection against the optimes returned from logOp().
154 * Before returning, updates 'opTimeNssMap' with the optimes from logOp() and 'oplogEntries' with
155 * the contents of the oplog collection.
156 */
157 using OpTimeNamespaceStringMap = std::map<OpTime, NamespaceString>;
158 using MakeTaskFunction =
159 stdx::function<ThreadPoolInterface::Task(const NamespaceString& nss,
160 stdx::mutex* mtx,
161 OpTimeNamespaceStringMap* opTimeNssMap,
162 unittest::Barrier* barrier)>;
_testConcurrentLogOp(const MakeTaskFunction & makeTaskFunction,OpTimeNamespaceStringMap * opTimeNssMap,std::vector<OplogEntry> * oplogEntries,std::size_t expectedNumOplogEntries)163 void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction,
164 OpTimeNamespaceStringMap* opTimeNssMap,
165 std::vector<OplogEntry>* oplogEntries,
166 std::size_t expectedNumOplogEntries) {
167 ASSERT_LESS_THAN_OR_EQUALS(expectedNumOplogEntries, 2U);
168
169 // Run 2 concurrent logOp() requests using the thread pool.
170 ThreadPool::Options options;
171 options.maxThreads = 2U;
172 options.onCreateThread = [](const std::string& name) { Client::initThread(name); };
173 ThreadPool pool(options);
174 pool.startup();
175
176 // Run 2 concurrent logOp() requests using the thread pool.
177 // Use a barrier with a thread count of 3 to ensure both logOp() tasks are complete before this
178 // test thread can proceed with shutting the thread pool down.
179 stdx::mutex mtx;
180 unittest::Barrier barrier(3U);
181 const NamespaceString nss1("test1.coll");
182 const NamespaceString nss2("test2.coll");
183 ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)))
184 << "Failed to schedule logOp() task for namespace " << nss1;
185 ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)))
186 << "Failed to schedule logOp() task for namespace " << nss2;
187 barrier.countDownAndWait();
188
189 // Shut thread pool down.
190 pool.shutdown();
191 pool.join();
192
193 // Read oplog entries from the oplog collection starting with the entry with the most recent
194 // optime.
195 auto opCtx = cc().makeOperationContext();
196 OplogInterfaceLocal oplogInterface(opCtx.get(), NamespaceString::kRsOplogNamespace.ns());
197 auto oplogIter = oplogInterface.makeIterator();
198 auto nextValue = oplogIter->next();
199 while (nextValue.isOK()) {
200 const auto& doc = nextValue.getValue().first;
201 oplogEntries->emplace_back(unittest::assertGet(OplogEntry::parse(doc)));
202 nextValue = oplogIter->next();
203 }
204 ASSERT_EQUALS(expectedNumOplogEntries, oplogEntries->size());
205
206 // Reverse 'oplogEntries' because we inserted the oplog entries in descending order by optime.
207 std::reverse(oplogEntries->begin(), oplogEntries->end());
208
209 // Look up namespaces and their respective optimes (returned by logOp()) in the map.
210 stdx::lock_guard<stdx::mutex> lock(mtx);
211 ASSERT_EQUALS(2U, opTimeNssMap->size());
212 }
213
214 /**
215 * Inserts noop oplog entry with embedded namespace string.
216 * Inserts optime/namespace pair into map while holding a lock on the mutex.
217 * Returns optime of generated oplog entry.
218 */
_logOpNoopWithMsg(OperationContext * opCtx,stdx::mutex * mtx,OpTimeNamespaceStringMap * opTimeNssMap,const NamespaceString & nss)219 OpTime _logOpNoopWithMsg(OperationContext* opCtx,
220 stdx::mutex* mtx,
221 OpTimeNamespaceStringMap* opTimeNssMap,
222 const NamespaceString& nss) {
223 stdx::lock_guard<stdx::mutex> lock(*mtx);
224
225 // logOp() must be called while holding lock because ephemeralForTest storage engine does not
226 // support concurrent updates to its internal state.
227 const auto msgObj = BSON("msg" << nss.ns());
228 auto opTime = logOp(opCtx,
229 "n",
230 nss,
231 {},
232 msgObj,
233 nullptr,
234 false,
235 Date_t::now(),
236 {},
237 kUninitializedStmtId,
238 {},
239 OplogSlot());
240 ASSERT_FALSE(opTime.isNull());
241
242 ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end())
243 << "Unable to add namespace " << nss << " to map - map contains duplicate entry for optime "
244 << opTime;
245 opTimeNssMap->insert(std::make_pair(opTime, nss));
246
247 return opTime;
248 }
249
TEST_F(OplogTest,ConcurrentLogOpWithoutDocLockingSupport)250 TEST_F(OplogTest, ConcurrentLogOpWithoutDocLockingSupport) {
251 OpTimeNamespaceStringMap opTimeNssMap;
252 std::vector<OplogEntry> oplogEntries;
253
254 _testConcurrentLogOp(
255 [](const NamespaceString& nss,
256 stdx::mutex* mtx,
257 OpTimeNamespaceStringMap* opTimeNssMap,
258 unittest::Barrier* barrier) {
259 return [=] {
260 auto opCtx = cc().makeOperationContext();
261 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
262 WriteUnitOfWork wunit(opCtx.get());
263
264 _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
265
266 // In a storage engine that does not support doc locking, upon returning from
267 // logOp(), this thread still holds an implicit MODE_X lock on the oplog collection
268 // until it commits the WriteUnitOfWork. Therefore, we must wait on the barrier
269 // after the WUOW is committed.
270 wunit.commit();
271 barrier->countDownAndWait();
272 };
273 },
274 &opTimeNssMap,
275 &oplogEntries,
276 2U);
277
278 _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.begin()));
279 _checkOplogEntry(oplogEntries[1], *(opTimeNssMap.rbegin()));
280 }
281
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupport)282 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupport) {
283 OpTimeNamespaceStringMap opTimeNssMap;
284 std::vector<OplogEntry> oplogEntries;
285
286 ForceSupportsDocLocking support(true);
287 _testConcurrentLogOp(
288 [](const NamespaceString& nss,
289 stdx::mutex* mtx,
290 OpTimeNamespaceStringMap* opTimeNssMap,
291 unittest::Barrier* barrier) {
292 return [=] {
293 auto opCtx = cc().makeOperationContext();
294 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
295 WriteUnitOfWork wunit(opCtx.get());
296
297 _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
298
299 // In a storage engine that supports doc locking, it is okay for multiple threads to
300 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
301 // hold an implicit MODE_IX lock on the oplog collection.
302 barrier->countDownAndWait();
303 wunit.commit();
304 };
305 },
306 &opTimeNssMap,
307 &oplogEntries,
308 2U);
309
310 _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.begin()));
311 _checkOplogEntry(oplogEntries[1], *(opTimeNssMap.rbegin()));
312 }
313
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry)314 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry) {
315 OpTimeNamespaceStringMap opTimeNssMap;
316 std::vector<OplogEntry> oplogEntries;
317
318 ForceSupportsDocLocking support(true);
319 _testConcurrentLogOp(
320 [](const NamespaceString& nss,
321 stdx::mutex* mtx,
322 OpTimeNamespaceStringMap* opTimeNssMap,
323 unittest::Barrier* barrier) {
324 return [=] {
325 auto opCtx = cc().makeOperationContext();
326 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
327 WriteUnitOfWork wunit(opCtx.get());
328
329 auto opTime = _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
330
331 // In a storage engine that supports doc locking, it is okay for multiple threads to
332 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
333 // hold an implicit MODE_IX lock on the oplog collection.
334 barrier->countDownAndWait();
335
336 // Revert the first logOp() call and confirm that there are no holes in the
337 // oplog after committing the oplog entry with the more recent optime.
338 {
339 stdx::lock_guard<stdx::mutex> lock(*mtx);
340 auto firstOpTimeAndNss = *(opTimeNssMap->cbegin());
341 if (opTime == firstOpTimeAndNss.first) {
342 ASSERT_EQUALS(nss, firstOpTimeAndNss.second)
343 << "optime matches entry in optime->nss map but namespace in map is "
344 "different.";
345 // Abort WUOW by returning early. The oplog entry for this task should not
346 // be present in the oplog.
347 return;
348 }
349 }
350
351 wunit.commit();
352 };
353 },
354 &opTimeNssMap,
355 &oplogEntries,
356 1U);
357
358 _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.crbegin()));
359 }
360
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry)361 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry) {
362 OpTimeNamespaceStringMap opTimeNssMap;
363 std::vector<OplogEntry> oplogEntries;
364
365 ForceSupportsDocLocking support(true);
366 _testConcurrentLogOp(
367 [](const NamespaceString& nss,
368 stdx::mutex* mtx,
369 OpTimeNamespaceStringMap* opTimeNssMap,
370 unittest::Barrier* barrier) {
371 return [=] {
372 auto opCtx = cc().makeOperationContext();
373 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
374 WriteUnitOfWork wunit(opCtx.get());
375
376 auto opTime = _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
377
378 // In a storage engine that supports doc locking, it is okay for multiple threads to
379 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
380 // hold an implicit MODE_IX lock on the oplog collection.
381 barrier->countDownAndWait();
382
383 // Revert the last logOp() call and confirm that there are no holes in the
384 // oplog after committing the oplog entry with the earlier optime.
385 {
386 stdx::lock_guard<stdx::mutex> lock(*mtx);
387 auto lastOpTimeAndNss = *(opTimeNssMap->crbegin());
388 if (opTime == lastOpTimeAndNss.first) {
389 ASSERT_EQUALS(nss, lastOpTimeAndNss.second)
390 << "optime matches entry in optime->nss map but namespace in map is "
391 "different.";
392 // Abort WUOW by returning early. The oplog entry for this task should not
393 // be present in the oplog.
394 return;
395 }
396 }
397
398 wunit.commit();
399 };
400 },
401 &opTimeNssMap,
402 &oplogEntries,
403 1U);
404
405 _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.cbegin()));
406 }
407
408 } // namespace
409 } // namespace repl
410 } // namespace mongo
411