1 
2 /**
3  *    Copyright (C) 2018-present MongoDB, Inc.
4  *
5  *    This program is free software: you can redistribute it and/or modify
6  *    it under the terms of the Server Side Public License, version 1,
7  *    as published by MongoDB, Inc.
8  *
9  *    This program is distributed in the hope that it will be useful,
10  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
11  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  *    Server Side Public License for more details.
13  *
14  *    You should have received a copy of the Server Side Public License
15  *    along with this program. If not, see
16  *    <http://www.mongodb.com/licensing/server-side-public-license>.
17  *
18  *    As a special exception, the copyright holders give permission to link the
19  *    code of portions of this program with the OpenSSL library under certain
20  *    conditions as described in each individual source file and distribute
21  *    linked combinations including the program with the OpenSSL library. You
22  *    must comply with the Server Side Public License in all respects for
23  *    all of the code used other than as permitted herein. If you modify file(s)
24  *    with this exception, you may extend this exception to your version of the
25  *    file(s), but you are not obligated to do so. If you do not wish to do so,
26  *    delete this exception statement from your version. If you delete this
27  *    exception statement from all source files in the program, then also delete
28  *    it in the license file.
29  */
30 
31 #include "mongo/platform/basic.h"
32 
33 #include "mongo/db/catalog/capped_utils.h"
34 #include "mongo/db/catalog/collection.h"
35 #include "mongo/db/catalog/collection_catalog_entry.h"
36 #include "mongo/db/client.h"
37 #include "mongo/db/db_raii.h"
38 #include "mongo/db/repl/replication_coordinator.h"
39 #include "mongo/db/repl/replication_coordinator_mock.h"
40 #include "mongo/db/repl/storage_interface_impl.h"
41 #include "mongo/db/service_context_d_test_fixture.h"
42 #include "mongo/stdx/memory.h"
43 #include "mongo/stdx/thread.h"
44 #include "mongo/unittest/unittest.h"
45 
46 namespace {
47 
48 using namespace mongo;
49 
50 class CollectionTest : public ServiceContextMongoDTest {
51 private:
52     void setUp() override;
53     void tearDown() override;
54 
55 protected:
56     void makeCapped(NamespaceString nss, long long cappedSize = 8192);
57     // Use StorageInterface to access storage features below catalog interface.
58     std::unique_ptr<repl::StorageInterface> _storage;
59     ServiceContext::UniqueOperationContext _opCtxOwner;
60     OperationContext* _opCtx = nullptr;
61 };
62 
setUp()63 void CollectionTest::setUp() {
64     // Set up mongod.
65     ServiceContextMongoDTest::setUp();
66 
67     auto service = getServiceContext();
68 
69     // Set up ReplicationCoordinator and ensure that we are primary.
70     auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service);
71     ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
72     repl::ReplicationCoordinator::set(service, std::move(replCoord));
73 
74     _storage = stdx::make_unique<repl::StorageInterfaceImpl>();
75     _opCtxOwner = cc().makeOperationContext();
76     _opCtx = _opCtxOwner.get();
77 }
78 
tearDown()79 void CollectionTest::tearDown() {
80     _storage = {};
81     _opCtxOwner = {};
82 
83     // Tear down mongod.
84     ServiceContextMongoDTest::tearDown();
85 }
86 
makeCapped(NamespaceString nss,long long cappedSize)87 void CollectionTest::makeCapped(NamespaceString nss, long long cappedSize) {
88     CollectionOptions options;
89     options.capped = true;
90     options.cappedSize = cappedSize;  // Maximum size of capped collection in bytes.
91     ASSERT_OK(_storage->createCollection(_opCtx, nss, options));
92 }
93 
TEST_F(CollectionTest,CappedNotifierKillAndIsDead)94 TEST_F(CollectionTest, CappedNotifierKillAndIsDead) {
95     NamespaceString nss("test.t");
96     makeCapped(nss);
97 
98     AutoGetCollectionForRead acfr(_opCtx, nss);
99     Collection* col = acfr.getCollection();
100     auto notifier = col->getCappedInsertNotifier();
101     ASSERT_FALSE(notifier->isDead());
102     notifier->kill();
103     ASSERT(notifier->isDead());
104 }
105 
TEST_F(CollectionTest,CappedNotifierTimeouts)106 TEST_F(CollectionTest, CappedNotifierTimeouts) {
107     NamespaceString nss("test.t");
108     makeCapped(nss);
109 
110     AutoGetCollectionForRead acfr(_opCtx, nss);
111     Collection* col = acfr.getCollection();
112     auto notifier = col->getCappedInsertNotifier();
113     ASSERT_EQ(notifier->getVersion(), 0u);
114 
115     auto before = Date_t::now();
116     notifier->waitUntil(0u, before + Milliseconds(25));
117     auto after = Date_t::now();
118     ASSERT_GTE(after - before, Milliseconds(25));
119     ASSERT_EQ(notifier->getVersion(), 0u);
120 }
121 
TEST_F(CollectionTest,CappedNotifierWaitAfterNotifyIsImmediate)122 TEST_F(CollectionTest, CappedNotifierWaitAfterNotifyIsImmediate) {
123     NamespaceString nss("test.t");
124     makeCapped(nss);
125 
126     AutoGetCollectionForRead acfr(_opCtx, nss);
127     Collection* col = acfr.getCollection();
128     auto notifier = col->getCappedInsertNotifier();
129 
130     auto prevVersion = notifier->getVersion();
131     notifier->notifyAll();
132     auto thisVersion = prevVersion + 1;
133     ASSERT_EQ(notifier->getVersion(), thisVersion);
134 
135     auto before = Date_t::now();
136     notifier->waitUntil(prevVersion, before + Seconds(25));
137     auto after = Date_t::now();
138     ASSERT_LT(after - before, Seconds(25));
139 }
140 
TEST_F(CollectionTest,CappedNotifierWaitUntilAsynchronousNotifyAll)141 TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousNotifyAll) {
142     NamespaceString nss("test.t");
143     makeCapped(nss);
144 
145     AutoGetCollectionForRead acfr(_opCtx, nss);
146     Collection* col = acfr.getCollection();
147     auto notifier = col->getCappedInsertNotifier();
148     auto prevVersion = notifier->getVersion();
149     auto thisVersion = prevVersion + 1;
150 
151     auto before = Date_t::now();
152     stdx::thread thread([before, prevVersion, &notifier] {
153         notifier->waitUntil(prevVersion, before + Milliseconds(25));
154         auto after = Date_t::now();
155         ASSERT_GTE(after - before, Milliseconds(25));
156         notifier->notifyAll();
157     });
158     notifier->waitUntil(prevVersion, before + Seconds(25));
159     auto after = Date_t::now();
160     ASSERT_LT(after - before, Seconds(25));
161     ASSERT_GTE(after - before, Milliseconds(25));
162     thread.join();
163     ASSERT_EQ(notifier->getVersion(), thisVersion);
164 }
165 
TEST_F(CollectionTest,CappedNotifierWaitUntilAsynchronousKill)166 TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousKill) {
167     NamespaceString nss("test.t");
168     makeCapped(nss);
169 
170     AutoGetCollectionForRead acfr(_opCtx, nss);
171     Collection* col = acfr.getCollection();
172     auto notifier = col->getCappedInsertNotifier();
173     auto prevVersion = notifier->getVersion();
174 
175     auto before = Date_t::now();
176     stdx::thread thread([before, prevVersion, &notifier] {
177         notifier->waitUntil(prevVersion, before + Milliseconds(25));
178         auto after = Date_t::now();
179         ASSERT_GTE(after - before, Milliseconds(25));
180         notifier->kill();
181     });
182     notifier->waitUntil(prevVersion, before + Seconds(25));
183     auto after = Date_t::now();
184     ASSERT_LT(after - before, Seconds(25));
185     ASSERT_GTE(after - before, Milliseconds(25));
186     thread.join();
187     ASSERT_EQ(notifier->getVersion(), prevVersion);
188 }
189 
TEST_F(CollectionTest,HaveCappedWaiters)190 TEST_F(CollectionTest, HaveCappedWaiters) {
191     NamespaceString nss("test.t");
192     makeCapped(nss);
193 
194     AutoGetCollectionForRead acfr(_opCtx, nss);
195     Collection* col = acfr.getCollection();
196     ASSERT_FALSE(col->haveCappedWaiters());
197     {
198         auto notifier = col->getCappedInsertNotifier();
199         ASSERT(col->haveCappedWaiters());
200     }
201     ASSERT_FALSE(col->haveCappedWaiters());
202 }
203 
TEST_F(CollectionTest,NotifyCappedWaitersIfNeeded)204 TEST_F(CollectionTest, NotifyCappedWaitersIfNeeded) {
205     NamespaceString nss("test.t");
206     makeCapped(nss);
207 
208     AutoGetCollectionForRead acfr(_opCtx, nss);
209     Collection* col = acfr.getCollection();
210     col->notifyCappedWaitersIfNeeded();
211     {
212         auto notifier = col->getCappedInsertNotifier();
213         ASSERT_EQ(notifier->getVersion(), 0u);
214         col->notifyCappedWaitersIfNeeded();
215         ASSERT_EQ(notifier->getVersion(), 1u);
216     }
217 }
218 
TEST_F(CollectionTest,AsynchronouslyNotifyCappedWaitersIfNeeded)219 TEST_F(CollectionTest, AsynchronouslyNotifyCappedWaitersIfNeeded) {
220     NamespaceString nss("test.t");
221     makeCapped(nss);
222 
223     AutoGetCollectionForRead acfr(_opCtx, nss);
224     Collection* col = acfr.getCollection();
225     auto notifier = col->getCappedInsertNotifier();
226     auto prevVersion = notifier->getVersion();
227     auto thisVersion = prevVersion + 1;
228 
229     auto before = Date_t::now();
230     notifier->waitUntil(prevVersion, before + Milliseconds(25));
231     stdx::thread thread([before, prevVersion, col] {
232         auto after = Date_t::now();
233         ASSERT_GTE(after - before, Milliseconds(25));
234         col->notifyCappedWaitersIfNeeded();
235     });
236     notifier->waitUntil(prevVersion, before + Seconds(25));
237     auto after = Date_t::now();
238     ASSERT_LT(after - before, Seconds(25));
239     ASSERT_GTE(after - before, Milliseconds(25));
240     thread.join();
241     ASSERT_EQ(notifier->getVersion(), thisVersion);
242 }
243 }  // namespace
244