1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include <algorithm>
34 #include <map>
35 #include <utility>
36 #include <vector>
37 
38 #include "mongo/db/client.h"
39 #include "mongo/db/concurrency/lock_manager_test_help.h"
40 #include "mongo/db/db_raii.h"
41 #include "mongo/db/field_parser.h"
42 #include "mongo/db/repl/oplog.h"
43 #include "mongo/db/repl/oplog_entry.h"
44 #include "mongo/db/repl/oplog_interface_local.h"
45 #include "mongo/db/repl/repl_client_info.h"
46 #include "mongo/db/repl/replication_coordinator_mock.h"
47 #include "mongo/db/service_context_d_test_fixture.h"
48 #include "mongo/stdx/functional.h"
49 #include "mongo/stdx/mutex.h"
50 #include "mongo/unittest/barrier.h"
51 #include "mongo/util/concurrency/thread_pool.h"
52 
53 
54 namespace mongo {
55 namespace repl {
56 namespace {
57 
58 class OplogTest : public ServiceContextMongoDTest {
59 private:
60     void setUp() override;
61 };
62 
setUp()63 void OplogTest::setUp() {
64     // Set up mongod.
65     ServiceContextMongoDTest::setUp();
66 
67     auto service = getServiceContext();
68     auto opCtx = cc().makeOperationContext();
69 
70     // Set up ReplicationCoordinator and create oplog.
71     ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service));
72     setOplogCollectionName();
73     createOplog(opCtx.get());
74 
75     // Ensure that we are primary.
76     auto replCoord = ReplicationCoordinator::get(opCtx.get());
77     ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
78 }
79 
80 /**
81  * Assert that oplog only has a single entry and return that oplog entry.
82  */
_getSingleOplogEntry(OperationContext * opCtx)83 OplogEntry _getSingleOplogEntry(OperationContext* opCtx) {
84     OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
85     auto oplogIter = oplogInterface.makeIterator();
86     auto opEntry = unittest::assertGet(oplogIter->next());
87     ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus())
88         << "Expected only 1 document in the oplog collection " << NamespaceString::kRsOplogNamespace
89         << " but found more than 1 document instead";
90     return unittest::assertGet(OplogEntry::parse(opEntry.first));
91 }
92 
TEST_F(OplogTest,LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection)93 TEST_F(OplogTest, LogOpReturnsOpTimeOnSuccessfulInsertIntoOplogCollection) {
94     auto opCtx = cc().makeOperationContext();
95 
96     const NamespaceString nss("test.coll");
97     auto msgObj = BSON("msg"
98                        << "hello, world!");
99 
100     // Write to the oplog.
101     OpTime opTime;
102     {
103         AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
104         WriteUnitOfWork wunit(opCtx.get());
105         opTime = logOp(opCtx.get(),
106                        "n",
107                        nss,
108                        {},
109                        msgObj,
110                        nullptr,
111                        false,
112                        Date_t::now(),
113                        {},
114                        kUninitializedStmtId,
115                        {},
116                        OplogSlot());
117         ASSERT_FALSE(opTime.isNull());
118         wunit.commit();
119     }
120 
121     OplogEntry oplogEntry = _getSingleOplogEntry(opCtx.get());
122 
123     // Ensure that msg fields were properly added to the oplog entry.
124     ASSERT_EQUALS(opTime, oplogEntry.getOpTime())
125         << "OpTime returned from logOp() did not match that in the oplog entry written to the "
126            "oplog: "
127         << oplogEntry.toBSON();
128     ASSERT(OpTypeEnum::kNoop == oplogEntry.getOpType()) << "Expected 'n' op type but found '"
129                                                         << OpType_serializer(oplogEntry.getOpType())
130                                                         << "' instead: " << oplogEntry.toBSON();
131     ASSERT_BSONOBJ_EQ(msgObj, oplogEntry.getObject());
132 
133     // Ensure that the msg optime returned is the same as the last optime in the ReplClientInfo.
134     ASSERT_EQUALS(ReplClientInfo::forClient(&cc()).getLastOp(), opTime);
135 }
136 
137 /**
138  * Checks optime and namespace in oplog entry.
139  */
_checkOplogEntry(const OplogEntry & oplogEntry,const OpTime & expectedOpTime,const NamespaceString & expectedNss)140 void _checkOplogEntry(const OplogEntry& oplogEntry,
141                       const OpTime& expectedOpTime,
142                       const NamespaceString& expectedNss) {
143     ASSERT_EQUALS(expectedOpTime, oplogEntry.getOpTime()) << oplogEntry.toBSON();
144     ASSERT_EQUALS(expectedNss, oplogEntry.getNamespace()) << oplogEntry.toBSON();
145 }
_checkOplogEntry(const OplogEntry & oplogEntry,const std::pair<OpTime,NamespaceString> & expectedOpTimeAndNss)146 void _checkOplogEntry(const OplogEntry& oplogEntry,
147                       const std::pair<OpTime, NamespaceString>& expectedOpTimeAndNss) {
148     _checkOplogEntry(oplogEntry, expectedOpTimeAndNss.first, expectedOpTimeAndNss.second);
149 }
150 
151 /**
152  * Test function that schedules two concurrent logOp() tasks using a thread pool.
153  * Checks the state of the oplog collection against the optimes returned from logOp().
154  * Before returning, updates 'opTimeNssMap' with the optimes from logOp() and 'oplogEntries' with
155  * the contents of the oplog collection.
156  */
157 using OpTimeNamespaceStringMap = std::map<OpTime, NamespaceString>;
158 using MakeTaskFunction =
159     stdx::function<ThreadPoolInterface::Task(const NamespaceString& nss,
160                                              stdx::mutex* mtx,
161                                              OpTimeNamespaceStringMap* opTimeNssMap,
162                                              unittest::Barrier* barrier)>;
_testConcurrentLogOp(const MakeTaskFunction & makeTaskFunction,OpTimeNamespaceStringMap * opTimeNssMap,std::vector<OplogEntry> * oplogEntries,std::size_t expectedNumOplogEntries)163 void _testConcurrentLogOp(const MakeTaskFunction& makeTaskFunction,
164                           OpTimeNamespaceStringMap* opTimeNssMap,
165                           std::vector<OplogEntry>* oplogEntries,
166                           std::size_t expectedNumOplogEntries) {
167     ASSERT_LESS_THAN_OR_EQUALS(expectedNumOplogEntries, 2U);
168 
169     // Run 2 concurrent logOp() requests using the thread pool.
170     ThreadPool::Options options;
171     options.maxThreads = 2U;
172     options.onCreateThread = [](const std::string& name) { Client::initThread(name); };
173     ThreadPool pool(options);
174     pool.startup();
175 
176     // Run 2 concurrent logOp() requests using the thread pool.
177     // Use a barrier with a thread count of 3 to ensure both logOp() tasks are complete before this
178     // test thread can proceed with shutting the thread pool down.
179     stdx::mutex mtx;
180     unittest::Barrier barrier(3U);
181     const NamespaceString nss1("test1.coll");
182     const NamespaceString nss2("test2.coll");
183     ASSERT_OK(pool.schedule(makeTaskFunction(nss1, &mtx, opTimeNssMap, &barrier)))
184         << "Failed to schedule logOp() task for namespace " << nss1;
185     ASSERT_OK(pool.schedule(makeTaskFunction(nss2, &mtx, opTimeNssMap, &barrier)))
186         << "Failed to schedule logOp() task for namespace " << nss2;
187     barrier.countDownAndWait();
188 
189     // Shut thread pool down.
190     pool.shutdown();
191     pool.join();
192 
193     // Read oplog entries from the oplog collection starting with the entry with the most recent
194     // optime.
195     auto opCtx = cc().makeOperationContext();
196     OplogInterfaceLocal oplogInterface(opCtx.get(), NamespaceString::kRsOplogNamespace.ns());
197     auto oplogIter = oplogInterface.makeIterator();
198     auto nextValue = oplogIter->next();
199     while (nextValue.isOK()) {
200         const auto& doc = nextValue.getValue().first;
201         oplogEntries->emplace_back(unittest::assertGet(OplogEntry::parse(doc)));
202         nextValue = oplogIter->next();
203     }
204     ASSERT_EQUALS(expectedNumOplogEntries, oplogEntries->size());
205 
206     // Reverse 'oplogEntries' because we inserted the oplog entries in descending order by optime.
207     std::reverse(oplogEntries->begin(), oplogEntries->end());
208 
209     // Look up namespaces and their respective optimes (returned by logOp()) in the map.
210     stdx::lock_guard<stdx::mutex> lock(mtx);
211     ASSERT_EQUALS(2U, opTimeNssMap->size());
212 }
213 
214 /**
215  * Inserts noop oplog entry with embedded namespace string.
216  * Inserts optime/namespace pair into map while holding a lock on the mutex.
217  * Returns optime of generated oplog entry.
218  */
_logOpNoopWithMsg(OperationContext * opCtx,stdx::mutex * mtx,OpTimeNamespaceStringMap * opTimeNssMap,const NamespaceString & nss)219 OpTime _logOpNoopWithMsg(OperationContext* opCtx,
220                          stdx::mutex* mtx,
221                          OpTimeNamespaceStringMap* opTimeNssMap,
222                          const NamespaceString& nss) {
223     stdx::lock_guard<stdx::mutex> lock(*mtx);
224 
225     // logOp() must be called while holding lock because ephemeralForTest storage engine does not
226     // support concurrent updates to its internal state.
227     const auto msgObj = BSON("msg" << nss.ns());
228     auto opTime = logOp(opCtx,
229                         "n",
230                         nss,
231                         {},
232                         msgObj,
233                         nullptr,
234                         false,
235                         Date_t::now(),
236                         {},
237                         kUninitializedStmtId,
238                         {},
239                         OplogSlot());
240     ASSERT_FALSE(opTime.isNull());
241 
242     ASSERT(opTimeNssMap->find(opTime) == opTimeNssMap->end())
243         << "Unable to add namespace " << nss << " to map - map contains duplicate entry for optime "
244         << opTime;
245     opTimeNssMap->insert(std::make_pair(opTime, nss));
246 
247     return opTime;
248 }
249 
TEST_F(OplogTest,ConcurrentLogOpWithoutDocLockingSupport)250 TEST_F(OplogTest, ConcurrentLogOpWithoutDocLockingSupport) {
251     OpTimeNamespaceStringMap opTimeNssMap;
252     std::vector<OplogEntry> oplogEntries;
253 
254     _testConcurrentLogOp(
255         [](const NamespaceString& nss,
256            stdx::mutex* mtx,
257            OpTimeNamespaceStringMap* opTimeNssMap,
258            unittest::Barrier* barrier) {
259             return [=] {
260                 auto opCtx = cc().makeOperationContext();
261                 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
262                 WriteUnitOfWork wunit(opCtx.get());
263 
264                 _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
265 
266                 // In a storage engine that does not support doc locking, upon returning from
267                 // logOp(), this thread still holds an implicit MODE_X lock on the oplog collection
268                 // until it commits the WriteUnitOfWork. Therefore, we must wait on the barrier
269                 // after the WUOW is committed.
270                 wunit.commit();
271                 barrier->countDownAndWait();
272             };
273         },
274         &opTimeNssMap,
275         &oplogEntries,
276         2U);
277 
278     _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.begin()));
279     _checkOplogEntry(oplogEntries[1], *(opTimeNssMap.rbegin()));
280 }
281 
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupport)282 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupport) {
283     OpTimeNamespaceStringMap opTimeNssMap;
284     std::vector<OplogEntry> oplogEntries;
285 
286     ForceSupportsDocLocking support(true);
287     _testConcurrentLogOp(
288         [](const NamespaceString& nss,
289            stdx::mutex* mtx,
290            OpTimeNamespaceStringMap* opTimeNssMap,
291            unittest::Barrier* barrier) {
292             return [=] {
293                 auto opCtx = cc().makeOperationContext();
294                 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
295                 WriteUnitOfWork wunit(opCtx.get());
296 
297                 _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
298 
299                 // In a storage engine that supports doc locking, it is okay for multiple threads to
300                 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
301                 // hold an implicit MODE_IX lock on the oplog collection.
302                 barrier->countDownAndWait();
303                 wunit.commit();
304             };
305         },
306         &opTimeNssMap,
307         &oplogEntries,
308         2U);
309 
310     _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.begin()));
311     _checkOplogEntry(oplogEntries[1], *(opTimeNssMap.rbegin()));
312 }
313 
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry)314 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertFirstOplogEntry) {
315     OpTimeNamespaceStringMap opTimeNssMap;
316     std::vector<OplogEntry> oplogEntries;
317 
318     ForceSupportsDocLocking support(true);
319     _testConcurrentLogOp(
320         [](const NamespaceString& nss,
321            stdx::mutex* mtx,
322            OpTimeNamespaceStringMap* opTimeNssMap,
323            unittest::Barrier* barrier) {
324             return [=] {
325                 auto opCtx = cc().makeOperationContext();
326                 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
327                 WriteUnitOfWork wunit(opCtx.get());
328 
329                 auto opTime = _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
330 
331                 // In a storage engine that supports doc locking, it is okay for multiple threads to
332                 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
333                 // hold an implicit MODE_IX lock on the oplog collection.
334                 barrier->countDownAndWait();
335 
336                 // Revert the first logOp() call and confirm that there are no holes in the
337                 // oplog after committing the oplog entry with the more recent optime.
338                 {
339                     stdx::lock_guard<stdx::mutex> lock(*mtx);
340                     auto firstOpTimeAndNss = *(opTimeNssMap->cbegin());
341                     if (opTime == firstOpTimeAndNss.first) {
342                         ASSERT_EQUALS(nss, firstOpTimeAndNss.second)
343                             << "optime matches entry in optime->nss map but namespace in map  is "
344                                "different.";
345                         // Abort WUOW by returning early. The oplog entry for this task should not
346                         // be present in the oplog.
347                         return;
348                     }
349                 }
350 
351                 wunit.commit();
352             };
353         },
354         &opTimeNssMap,
355         &oplogEntries,
356         1U);
357 
358     _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.crbegin()));
359 }
360 
TEST_F(OplogTest,ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry)361 TEST_F(OplogTest, ConcurrentLogOpWithDocLockingSupportRevertLastOplogEntry) {
362     OpTimeNamespaceStringMap opTimeNssMap;
363     std::vector<OplogEntry> oplogEntries;
364 
365     ForceSupportsDocLocking support(true);
366     _testConcurrentLogOp(
367         [](const NamespaceString& nss,
368            stdx::mutex* mtx,
369            OpTimeNamespaceStringMap* opTimeNssMap,
370            unittest::Barrier* barrier) {
371             return [=] {
372                 auto opCtx = cc().makeOperationContext();
373                 AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
374                 WriteUnitOfWork wunit(opCtx.get());
375 
376                 auto opTime = _logOpNoopWithMsg(opCtx.get(), mtx, opTimeNssMap, nss);
377 
378                 // In a storage engine that supports doc locking, it is okay for multiple threads to
379                 // maintain uncommitted WUOWs upon returning from logOp() because each thread will
380                 // hold an implicit MODE_IX lock on the oplog collection.
381                 barrier->countDownAndWait();
382 
383                 // Revert the last logOp() call and confirm that there are no holes in the
384                 // oplog after committing the oplog entry with the earlier optime.
385                 {
386                     stdx::lock_guard<stdx::mutex> lock(*mtx);
387                     auto lastOpTimeAndNss = *(opTimeNssMap->crbegin());
388                     if (opTime == lastOpTimeAndNss.first) {
389                         ASSERT_EQUALS(nss, lastOpTimeAndNss.second)
390                             << "optime matches entry in optime->nss map but namespace in map is "
391                                "different.";
392                         // Abort WUOW by returning early. The oplog entry for this task should not
393                         // be present in the oplog.
394                         return;
395                     }
396                 }
397 
398                 wunit.commit();
399             };
400         },
401         &opTimeNssMap,
402         &oplogEntries,
403         1U);
404 
405     _checkOplogEntry(oplogEntries[0], *(opTimeNssMap.cbegin()));
406 }
407 
408 }  // namespace
409 }  // namespace repl
410 }  // namespace mongo
411