1
2 /**
3 * Copyright (C) 2018-present MongoDB, Inc.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the Server Side Public License, version 1,
7 * as published by MongoDB, Inc.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * Server Side Public License for more details.
13 *
14 * You should have received a copy of the Server Side Public License
15 * along with this program. If not, see
16 * <http://www.mongodb.com/licensing/server-side-public-license>.
17 *
18 * As a special exception, the copyright holders give permission to link the
19 * code of portions of this program with the OpenSSL library under certain
20 * conditions as described in each individual source file and distribute
21 * linked combinations including the program with the OpenSSL library. You
22 * must comply with the Server Side Public License in all respects for
23 * all of the code used other than as permitted herein. If you modify file(s)
24 * with this exception, you may extend this exception to your version of the
25 * file(s), but you are not obligated to do so. If you do not wish to do so,
26 * delete this exception statement from your version. If you delete this
27 * exception statement from all source files in the program, then also delete
28 * it in the license file.
29 */
30
31 #include "mongo/platform/basic.h"
32
33 #include "mongo/db/catalog/capped_utils.h"
34 #include "mongo/db/catalog/collection.h"
35 #include "mongo/db/catalog/collection_catalog_entry.h"
36 #include "mongo/db/client.h"
37 #include "mongo/db/db_raii.h"
38 #include "mongo/db/repl/replication_coordinator.h"
39 #include "mongo/db/repl/replication_coordinator_mock.h"
40 #include "mongo/db/repl/storage_interface_impl.h"
41 #include "mongo/db/service_context_d_test_fixture.h"
42 #include "mongo/stdx/memory.h"
43 #include "mongo/stdx/thread.h"
44 #include "mongo/unittest/unittest.h"
45
46 namespace {
47
48 using namespace mongo;
49
50 class CollectionTest : public ServiceContextMongoDTest {
51 private:
52 void setUp() override;
53 void tearDown() override;
54
55 protected:
56 void makeCapped(NamespaceString nss, long long cappedSize = 8192);
57 // Use StorageInterface to access storage features below catalog interface.
58 std::unique_ptr<repl::StorageInterface> _storage;
59 ServiceContext::UniqueOperationContext _opCtxOwner;
60 OperationContext* _opCtx = nullptr;
61 };
62
setUp()63 void CollectionTest::setUp() {
64 // Set up mongod.
65 ServiceContextMongoDTest::setUp();
66
67 auto service = getServiceContext();
68
69 // Set up ReplicationCoordinator and ensure that we are primary.
70 auto replCoord = stdx::make_unique<repl::ReplicationCoordinatorMock>(service);
71 ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
72 repl::ReplicationCoordinator::set(service, std::move(replCoord));
73
74 _storage = stdx::make_unique<repl::StorageInterfaceImpl>();
75 _opCtxOwner = cc().makeOperationContext();
76 _opCtx = _opCtxOwner.get();
77 }
78
tearDown()79 void CollectionTest::tearDown() {
80 _storage = {};
81 _opCtxOwner = {};
82
83 // Tear down mongod.
84 ServiceContextMongoDTest::tearDown();
85 }
86
makeCapped(NamespaceString nss,long long cappedSize)87 void CollectionTest::makeCapped(NamespaceString nss, long long cappedSize) {
88 CollectionOptions options;
89 options.capped = true;
90 options.cappedSize = cappedSize; // Maximum size of capped collection in bytes.
91 ASSERT_OK(_storage->createCollection(_opCtx, nss, options));
92 }
93
TEST_F(CollectionTest,CappedNotifierKillAndIsDead)94 TEST_F(CollectionTest, CappedNotifierKillAndIsDead) {
95 NamespaceString nss("test.t");
96 makeCapped(nss);
97
98 AutoGetCollectionForRead acfr(_opCtx, nss);
99 Collection* col = acfr.getCollection();
100 auto notifier = col->getCappedInsertNotifier();
101 ASSERT_FALSE(notifier->isDead());
102 notifier->kill();
103 ASSERT(notifier->isDead());
104 }
105
TEST_F(CollectionTest,CappedNotifierTimeouts)106 TEST_F(CollectionTest, CappedNotifierTimeouts) {
107 NamespaceString nss("test.t");
108 makeCapped(nss);
109
110 AutoGetCollectionForRead acfr(_opCtx, nss);
111 Collection* col = acfr.getCollection();
112 auto notifier = col->getCappedInsertNotifier();
113 ASSERT_EQ(notifier->getVersion(), 0u);
114
115 auto before = Date_t::now();
116 notifier->waitUntil(0u, before + Milliseconds(25));
117 auto after = Date_t::now();
118 ASSERT_GTE(after - before, Milliseconds(25));
119 ASSERT_EQ(notifier->getVersion(), 0u);
120 }
121
TEST_F(CollectionTest,CappedNotifierWaitAfterNotifyIsImmediate)122 TEST_F(CollectionTest, CappedNotifierWaitAfterNotifyIsImmediate) {
123 NamespaceString nss("test.t");
124 makeCapped(nss);
125
126 AutoGetCollectionForRead acfr(_opCtx, nss);
127 Collection* col = acfr.getCollection();
128 auto notifier = col->getCappedInsertNotifier();
129
130 auto prevVersion = notifier->getVersion();
131 notifier->notifyAll();
132 auto thisVersion = prevVersion + 1;
133 ASSERT_EQ(notifier->getVersion(), thisVersion);
134
135 auto before = Date_t::now();
136 notifier->waitUntil(prevVersion, before + Seconds(25));
137 auto after = Date_t::now();
138 ASSERT_LT(after - before, Seconds(25));
139 }
140
TEST_F(CollectionTest,CappedNotifierWaitUntilAsynchronousNotifyAll)141 TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousNotifyAll) {
142 NamespaceString nss("test.t");
143 makeCapped(nss);
144
145 AutoGetCollectionForRead acfr(_opCtx, nss);
146 Collection* col = acfr.getCollection();
147 auto notifier = col->getCappedInsertNotifier();
148 auto prevVersion = notifier->getVersion();
149 auto thisVersion = prevVersion + 1;
150
151 auto before = Date_t::now();
152 stdx::thread thread([before, prevVersion, ¬ifier] {
153 notifier->waitUntil(prevVersion, before + Milliseconds(25));
154 auto after = Date_t::now();
155 ASSERT_GTE(after - before, Milliseconds(25));
156 notifier->notifyAll();
157 });
158 notifier->waitUntil(prevVersion, before + Seconds(25));
159 auto after = Date_t::now();
160 ASSERT_LT(after - before, Seconds(25));
161 ASSERT_GTE(after - before, Milliseconds(25));
162 thread.join();
163 ASSERT_EQ(notifier->getVersion(), thisVersion);
164 }
165
TEST_F(CollectionTest,CappedNotifierWaitUntilAsynchronousKill)166 TEST_F(CollectionTest, CappedNotifierWaitUntilAsynchronousKill) {
167 NamespaceString nss("test.t");
168 makeCapped(nss);
169
170 AutoGetCollectionForRead acfr(_opCtx, nss);
171 Collection* col = acfr.getCollection();
172 auto notifier = col->getCappedInsertNotifier();
173 auto prevVersion = notifier->getVersion();
174
175 auto before = Date_t::now();
176 stdx::thread thread([before, prevVersion, ¬ifier] {
177 notifier->waitUntil(prevVersion, before + Milliseconds(25));
178 auto after = Date_t::now();
179 ASSERT_GTE(after - before, Milliseconds(25));
180 notifier->kill();
181 });
182 notifier->waitUntil(prevVersion, before + Seconds(25));
183 auto after = Date_t::now();
184 ASSERT_LT(after - before, Seconds(25));
185 ASSERT_GTE(after - before, Milliseconds(25));
186 thread.join();
187 ASSERT_EQ(notifier->getVersion(), prevVersion);
188 }
189
TEST_F(CollectionTest,HaveCappedWaiters)190 TEST_F(CollectionTest, HaveCappedWaiters) {
191 NamespaceString nss("test.t");
192 makeCapped(nss);
193
194 AutoGetCollectionForRead acfr(_opCtx, nss);
195 Collection* col = acfr.getCollection();
196 ASSERT_FALSE(col->haveCappedWaiters());
197 {
198 auto notifier = col->getCappedInsertNotifier();
199 ASSERT(col->haveCappedWaiters());
200 }
201 ASSERT_FALSE(col->haveCappedWaiters());
202 }
203
TEST_F(CollectionTest,NotifyCappedWaitersIfNeeded)204 TEST_F(CollectionTest, NotifyCappedWaitersIfNeeded) {
205 NamespaceString nss("test.t");
206 makeCapped(nss);
207
208 AutoGetCollectionForRead acfr(_opCtx, nss);
209 Collection* col = acfr.getCollection();
210 col->notifyCappedWaitersIfNeeded();
211 {
212 auto notifier = col->getCappedInsertNotifier();
213 ASSERT_EQ(notifier->getVersion(), 0u);
214 col->notifyCappedWaitersIfNeeded();
215 ASSERT_EQ(notifier->getVersion(), 1u);
216 }
217 }
218
TEST_F(CollectionTest,AsynchronouslyNotifyCappedWaitersIfNeeded)219 TEST_F(CollectionTest, AsynchronouslyNotifyCappedWaitersIfNeeded) {
220 NamespaceString nss("test.t");
221 makeCapped(nss);
222
223 AutoGetCollectionForRead acfr(_opCtx, nss);
224 Collection* col = acfr.getCollection();
225 auto notifier = col->getCappedInsertNotifier();
226 auto prevVersion = notifier->getVersion();
227 auto thisVersion = prevVersion + 1;
228
229 auto before = Date_t::now();
230 notifier->waitUntil(prevVersion, before + Milliseconds(25));
231 stdx::thread thread([before, prevVersion, col] {
232 auto after = Date_t::now();
233 ASSERT_GTE(after - before, Milliseconds(25));
234 col->notifyCappedWaitersIfNeeded();
235 });
236 notifier->waitUntil(prevVersion, before + Seconds(25));
237 auto after = Date_t::now();
238 ASSERT_LT(after - before, Seconds(25));
239 ASSERT_GTE(after - before, Milliseconds(25));
240 thread.join();
241 ASSERT_EQ(notifier->getVersion(), thisVersion);
242 }
243 } // namespace
244