1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <atomic>
7 #include <fstream>
8 #include <memory>
9 #include <thread>
10 #include <vector>
11
12 #include "db/db_test_util.h"
13 #include "db/write_batch_internal.h"
14 #include "db/write_thread.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/fault_injection_env.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
24 // Test variations of WriteImpl.
25 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
26 public:
DBWriteTest()27 DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {}
28
GetOptions()29 Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
30
Open()31 void Open() { DBTestBase::Reopen(GetOptions()); }
32 };
33
34 // It is invalid to do sync write while disabling WAL.
TEST_P(DBWriteTest,SyncAndDisableWAL)35 TEST_P(DBWriteTest, SyncAndDisableWAL) {
36 WriteOptions write_options;
37 write_options.sync = true;
38 write_options.disableWAL = true;
39 ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
40 WriteBatch batch;
41 ASSERT_OK(batch.Put("foo", "bar"));
42 ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
43 }
44
TEST_P(DBWriteTest,WriteStallRemoveNoSlowdownWrite)45 TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
46 Options options = GetOptions();
47 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
48 4;
49 std::vector<port::Thread> threads;
50 std::atomic<int> thread_num(0);
51 port::Mutex mutex;
52 port::CondVar cv(&mutex);
53 // Guarded by mutex
54 int writers = 0;
55
56 Reopen(options);
57
58 std::function<void()> write_slowdown_func = [&]() {
59 int a = thread_num.fetch_add(1);
60 std::string key = "foo" + std::to_string(a);
61 WriteOptions wo;
62 wo.no_slowdown = false;
63 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
64 };
65 std::function<void()> write_no_slowdown_func = [&]() {
66 int a = thread_num.fetch_add(1);
67 std::string key = "foo" + std::to_string(a);
68 WriteOptions wo;
69 wo.no_slowdown = true;
70 Status s = dbfull()->Put(wo, key, "bar");
71 ASSERT_TRUE(s.ok() || s.IsIncomplete());
72 };
73 std::function<void(void*)> unblock_main_thread_func = [&](void*) {
74 mutex.Lock();
75 ++writers;
76 cv.SignalAll();
77 mutex.Unlock();
78 };
79
80 // Create 3 L0 files and schedule 4th without waiting
81 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
82 ASSERT_OK(Flush());
83 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
84 ASSERT_OK(Flush());
85 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
86 ASSERT_OK(Flush());
87 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
88
89 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
90 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
91 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
92 {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
93 "DBImpl::BackgroundCallFlush:start"},
94 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
95 "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
96 // Make compaction start wait for the write stall to be detected and
97 // implemented by a write group leader
98 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
99 "BackgroundCallCompaction:0"}});
100 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
101
102 // Schedule creation of 4th L0 file without waiting. This will seal the
103 // memtable and then wait for a sync point before writing the file. We need
104 // to do it this way because SwitchMemtable() needs to enter the
105 // write_thread
106 FlushOptions fopt;
107 fopt.wait = false;
108 ASSERT_OK(dbfull()->Flush(fopt));
109
110 // Create a mix of slowdown/no_slowdown write threads
111 mutex.Lock();
112 // First leader
113 threads.emplace_back(write_slowdown_func);
114 while (writers != 1) {
115 cv.Wait();
116 }
117
118 // Second leader. Will stall writes
119 // Build a writers list with no slowdown in the middle:
120 // +-------------+
121 // | slowdown +<----+ newest
122 // +--+----------+
123 // |
124 // v
125 // +--+----------+
126 // | no slowdown |
127 // +--+----------+
128 // |
129 // v
130 // +--+----------+
131 // | slowdown +
132 // +-------------+
133 threads.emplace_back(write_slowdown_func);
134 while (writers != 2) {
135 cv.Wait();
136 }
137 threads.emplace_back(write_no_slowdown_func);
138 while (writers != 3) {
139 cv.Wait();
140 }
141 threads.emplace_back(write_slowdown_func);
142 while (writers != 4) {
143 cv.Wait();
144 }
145
146 mutex.Unlock();
147
148 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
149 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
150 // This would have triggered a write stall. Unblock the write group leader
151 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
152 // The leader is going to create missing newer links. When the leader
153 // finishes, the next leader is going to delay writes and fail writers with
154 // no_slowdown
155
156 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
157 for (auto& t : threads) {
158 t.join();
159 }
160
161 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
162 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
163 }
164
TEST_P(DBWriteTest,WriteThreadHangOnWriteStall)165 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
166 Options options = GetOptions();
167 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
168 std::vector<port::Thread> threads;
169 std::atomic<int> thread_num(0);
170 port::Mutex mutex;
171 port::CondVar cv(&mutex);
172 // Guarded by mutex
173 int writers = 0;
174
175 Reopen(options);
176
177 std::function<void()> write_slowdown_func = [&]() {
178 int a = thread_num.fetch_add(1);
179 std::string key = "foo" + std::to_string(a);
180 WriteOptions wo;
181 wo.no_slowdown = false;
182 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
183 };
184 std::function<void()> write_no_slowdown_func = [&]() {
185 int a = thread_num.fetch_add(1);
186 std::string key = "foo" + std::to_string(a);
187 WriteOptions wo;
188 wo.no_slowdown = true;
189 Status s = dbfull()->Put(wo, key, "bar");
190 ASSERT_TRUE(s.ok() || s.IsIncomplete());
191 };
192 std::function<void(void *)> unblock_main_thread_func = [&](void *) {
193 mutex.Lock();
194 ++writers;
195 cv.SignalAll();
196 mutex.Unlock();
197 };
198
199 // Create 3 L0 files and schedule 4th without waiting
200 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
201 ASSERT_OK(Flush());
202 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
203 ASSERT_OK(Flush());
204 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
205 ASSERT_OK(Flush());
206 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
207
208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
209 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
210 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
211 {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
212 "DBImpl::BackgroundCallFlush:start"},
213 {"DBWriteTest::WriteThreadHangOnWriteStall:2",
214 "DBImpl::WriteImpl:BeforeLeaderEnters"},
215 // Make compaction start wait for the write stall to be detected and
216 // implemented by a write group leader
217 {"DBWriteTest::WriteThreadHangOnWriteStall:3",
218 "BackgroundCallCompaction:0"}});
219 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
220
221 // Schedule creation of 4th L0 file without waiting. This will seal the
222 // memtable and then wait for a sync point before writing the file. We need
223 // to do it this way because SwitchMemtable() needs to enter the
224 // write_thread
225 FlushOptions fopt;
226 fopt.wait = false;
227 ASSERT_OK(dbfull()->Flush(fopt));
228
229 // Create a mix of slowdown/no_slowdown write threads
230 mutex.Lock();
231 // First leader
232 threads.emplace_back(write_slowdown_func);
233 while (writers != 1) {
234 cv.Wait();
235 }
236 // Second leader. Will stall writes
237 threads.emplace_back(write_slowdown_func);
238 threads.emplace_back(write_no_slowdown_func);
239 threads.emplace_back(write_slowdown_func);
240 threads.emplace_back(write_no_slowdown_func);
241 threads.emplace_back(write_slowdown_func);
242 while (writers != 6) {
243 cv.Wait();
244 }
245 mutex.Unlock();
246
247 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
248 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
249 // This would have triggered a write stall. Unblock the write group leader
250 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
251 // The leader is going to create missing newer links. When the leader finishes,
252 // the next leader is going to delay writes and fail writers with no_slowdown
253
254 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
255 for (auto& t : threads) {
256 t.join();
257 }
258 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
259 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
260 }
261
TEST_P(DBWriteTest,IOErrorOnWALWritePropagateToWriteThreadFollower)262 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
263 constexpr int kNumThreads = 5;
264 std::unique_ptr<FaultInjectionTestEnv> mock_env(
265 new FaultInjectionTestEnv(env_));
266 Options options = GetOptions();
267 options.env = mock_env.get();
268 Reopen(options);
269 std::atomic<int> ready_count{0};
270 std::atomic<int> leader_count{0};
271 std::vector<port::Thread> threads;
272 mock_env->SetFilesystemActive(false);
273
274 // Wait until all threads linked to write threads, to make sure
275 // all threads join the same batch group.
276 SyncPoint::GetInstance()->SetCallBack(
277 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
278 ready_count++;
279 auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
280 if (w->state == WriteThread::STATE_GROUP_LEADER) {
281 leader_count++;
282 while (ready_count < kNumThreads) {
283 // busy waiting
284 }
285 }
286 });
287 SyncPoint::GetInstance()->EnableProcessing();
288 for (int i = 0; i < kNumThreads; i++) {
289 threads.push_back(port::Thread(
290 [&](int index) {
291 // All threads should fail.
292 auto res = Put("key" + ToString(index), "value");
293 if (options.manual_wal_flush) {
294 ASSERT_TRUE(res.ok());
295 // we should see fs error when we do the flush
296
297 // TSAN reports a false alarm for lock-order-inversion but Open and
298 // FlushWAL are not run concurrently. Disabling this until TSAN is
299 // fixed.
300 // res = dbfull()->FlushWAL(false);
301 // ASSERT_FALSE(res.ok());
302 } else {
303 ASSERT_FALSE(res.ok());
304 }
305 },
306 i));
307 }
308 for (int i = 0; i < kNumThreads; i++) {
309 threads[i].join();
310 }
311 ASSERT_EQ(1, leader_count);
312
313 // The Failed PUT operations can cause a BG error to be set.
314 // Mark it as Checked for the ASSERT_STATUS_CHECKED
315 dbfull()->Resume().PermitUncheckedError();
316
317 // Close before mock_env destruct.
318 Close();
319 }
320
TEST_P(DBWriteTest,ManualWalFlushInEffect)321 TEST_P(DBWriteTest, ManualWalFlushInEffect) {
322 Options options = GetOptions();
323 Reopen(options);
324 // try the 1st WAL created during open
325 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
326 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
327 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
328 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
329 // try the 2nd wal created during SwitchWAL
330 ASSERT_OK(dbfull()->TEST_SwitchWAL());
331 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
332 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
333 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
334 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
335 }
336
TEST_P(DBWriteTest,IOErrorOnWALWriteTriggersReadOnlyMode)337 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
338 std::unique_ptr<FaultInjectionTestEnv> mock_env(
339 new FaultInjectionTestEnv(env_));
340 Options options = GetOptions();
341 options.env = mock_env.get();
342 Reopen(options);
343 for (int i = 0; i < 2; i++) {
344 // Forcibly fail WAL write for the first Put only. Subsequent Puts should
345 // fail due to read-only mode
346 mock_env->SetFilesystemActive(i != 0);
347 auto res = Put("key" + ToString(i), "value");
348 // TSAN reports a false alarm for lock-order-inversion but Open and
349 // FlushWAL are not run concurrently. Disabling this until TSAN is
350 // fixed.
351 /*
352 if (options.manual_wal_flush && i == 0) {
353 // even with manual_wal_flush the 2nd Put should return error because of
354 // the read-only mode
355 ASSERT_TRUE(res.ok());
356 // we should see fs error when we do the flush
357 res = dbfull()->FlushWAL(false);
358 }
359 */
360 if (!options.manual_wal_flush) {
361 ASSERT_NOK(res);
362 } else {
363 ASSERT_OK(res);
364 }
365 }
366 // Close before mock_env destruct.
367 Close();
368 }
369
TEST_P(DBWriteTest,IOErrorOnSwitchMemtable)370 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
371 Random rnd(301);
372 std::unique_ptr<FaultInjectionTestEnv> mock_env(
373 new FaultInjectionTestEnv(env_));
374 Options options = GetOptions();
375 options.env = mock_env.get();
376 options.writable_file_max_buffer_size = 4 * 1024 * 1024;
377 options.write_buffer_size = 3 * 512 * 1024;
378 options.wal_bytes_per_sync = 256 * 1024;
379 options.manual_wal_flush = true;
380 Reopen(options);
381 mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
382 Status s;
383 for (int i = 0; i < 4 * 512; ++i) {
384 s = Put(Key(i), rnd.RandomString(1024));
385 if (!s.ok()) {
386 break;
387 }
388 }
389 ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
390
391 mock_env->SetFilesystemActive(true);
392 // Close before mock_env destruct.
393 Close();
394 }
395
396 // Test that db->LockWAL() flushes the WAL after locking.
TEST_P(DBWriteTest,LockWalInEffect)397 TEST_P(DBWriteTest, LockWalInEffect) {
398 Options options = GetOptions();
399 Reopen(options);
400 // try the 1st WAL created during open
401 ASSERT_OK(Put("key" + ToString(0), "value"));
402 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
403 ASSERT_OK(dbfull()->LockWAL());
404 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
405 ASSERT_OK(dbfull()->UnlockWAL());
406 // try the 2nd wal created during SwitchWAL
407 ASSERT_OK(dbfull()->TEST_SwitchWAL());
408 ASSERT_OK(Put("key" + ToString(0), "value"));
409 ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
410 ASSERT_OK(dbfull()->LockWAL());
411 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
412 ASSERT_OK(dbfull()->UnlockWAL());
413 }
414
TEST_P(DBWriteTest,ConcurrentlyDisabledWAL)415 TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
416 Options options = GetOptions();
417 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
418 options.statistics->set_stats_level(StatsLevel::kAll);
419 Reopen(options);
420 std::string wal_key_prefix = "WAL_KEY_";
421 std::string no_wal_key_prefix = "K_";
422 // 100 KB value each for NO-WAL operation
423 std::string no_wal_value(1024 * 100, 'X');
424 // 1B value each for WAL operation
425 std::string wal_value = "0";
426 std::thread threads[10];
427 for (int t = 0; t < 10; t++) {
428 threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
429 for(int i = 0; i < 10; i++) {
430 ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
431 write_option_disable.disableWAL = true;
432 ROCKSDB_NAMESPACE::WriteOptions write_option_default;
433 std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
434 "_" + std::to_string(i);
435 ASSERT_OK(
436 this->Put(no_wal_key, no_wal_value, write_option_disable));
437 std::string wal_key =
438 wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
439 ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
440 ASSERT_OK(dbfull()->SyncWAL());
441 }
442 return;
443 });
444 }
445 for (auto& t: threads) {
446 t.join();
447 }
448 uint64_t bytes_num = options.statistics->getTickerCount(
449 ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
450 // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
451 ASSERT_LE(bytes_num, 1024 * 100);
452 }
453
454 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
455 testing::Values(DBTestBase::kDefault,
456 DBTestBase::kConcurrentWALWrites,
457 DBTestBase::kPipelinedWrite));
458
459 } // namespace ROCKSDB_NAMESPACE
460
main(int argc,char ** argv)461 int main(int argc, char** argv) {
462 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
463 ::testing::InitGoogleTest(&argc, argv);
464 return RUN_ALL_TESTS();
465 }
466