1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <atomic>
7 #include <fstream>
8 #include <memory>
9 #include <thread>
10 #include <vector>
11 
12 #include "db/db_test_util.h"
13 #include "db/write_batch_internal.h"
14 #include "db/write_thread.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/fault_injection_env.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 
24 // Test variations of WriteImpl.
25 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
26  public:
DBWriteTest()27   DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {}
28 
GetOptions()29   Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
30 
Open()31   void Open() { DBTestBase::Reopen(GetOptions()); }
32 };
33 
34 // It is invalid to do sync write while disabling WAL.
TEST_P(DBWriteTest,SyncAndDisableWAL)35 TEST_P(DBWriteTest, SyncAndDisableWAL) {
36   WriteOptions write_options;
37   write_options.sync = true;
38   write_options.disableWAL = true;
39   ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
40   WriteBatch batch;
41   ASSERT_OK(batch.Put("foo", "bar"));
42   ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
43 }
44 
TEST_P(DBWriteTest,WriteStallRemoveNoSlowdownWrite)45 TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
46   Options options = GetOptions();
47   options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
48       4;
49   std::vector<port::Thread> threads;
50   std::atomic<int> thread_num(0);
51   port::Mutex mutex;
52   port::CondVar cv(&mutex);
53   // Guarded by mutex
54   int writers = 0;
55 
56   Reopen(options);
57 
58   std::function<void()> write_slowdown_func = [&]() {
59     int a = thread_num.fetch_add(1);
60     std::string key = "foo" + std::to_string(a);
61     WriteOptions wo;
62     wo.no_slowdown = false;
63     ASSERT_OK(dbfull()->Put(wo, key, "bar"));
64   };
65   std::function<void()> write_no_slowdown_func = [&]() {
66     int a = thread_num.fetch_add(1);
67     std::string key = "foo" + std::to_string(a);
68     WriteOptions wo;
69     wo.no_slowdown = true;
70     Status s = dbfull()->Put(wo, key, "bar");
71     ASSERT_TRUE(s.ok() || s.IsIncomplete());
72   };
73   std::function<void(void*)> unblock_main_thread_func = [&](void*) {
74     mutex.Lock();
75     ++writers;
76     cv.SignalAll();
77     mutex.Unlock();
78   };
79 
80   // Create 3 L0 files and schedule 4th without waiting
81   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
82   ASSERT_OK(Flush());
83   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
84   ASSERT_OK(Flush());
85   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
86   ASSERT_OK(Flush());
87   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
88 
89   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
90       "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
91   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
92       {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
93         "DBImpl::BackgroundCallFlush:start"},
94        {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
95         "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
96        // Make compaction start wait for the write stall to be detected and
97        // implemented by a write group leader
98        {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
99         "BackgroundCallCompaction:0"}});
100   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
101 
102   // Schedule creation of 4th L0 file without waiting. This will seal the
103   // memtable and then wait for a sync point before writing the file. We need
104   // to do it this way because SwitchMemtable() needs to enter the
105   // write_thread
106   FlushOptions fopt;
107   fopt.wait = false;
108   ASSERT_OK(dbfull()->Flush(fopt));
109 
110   // Create a mix of slowdown/no_slowdown write threads
111   mutex.Lock();
112   // First leader
113   threads.emplace_back(write_slowdown_func);
114   while (writers != 1) {
115     cv.Wait();
116   }
117 
118   // Second leader. Will stall writes
119   // Build a writers list with no slowdown in the middle:
120   //  +-------------+
121   //  | slowdown    +<----+ newest
122   //  +--+----------+
123   //     |
124   //     v
125   //  +--+----------+
126   //  | no slowdown |
127   //  +--+----------+
128   //     |
129   //     v
130   //  +--+----------+
131   //  | slowdown    +
132   //  +-------------+
133   threads.emplace_back(write_slowdown_func);
134   while (writers != 2) {
135     cv.Wait();
136   }
137   threads.emplace_back(write_no_slowdown_func);
138   while (writers != 3) {
139     cv.Wait();
140   }
141   threads.emplace_back(write_slowdown_func);
142   while (writers != 4) {
143     cv.Wait();
144   }
145 
146   mutex.Unlock();
147 
148   TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
149   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
150   // This would have triggered a write stall. Unblock the write group leader
151   TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
152   // The leader is going to create missing newer links. When the leader
153   // finishes, the next leader is going to delay writes and fail writers with
154   // no_slowdown
155 
156   TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
157   for (auto& t : threads) {
158     t.join();
159   }
160 
161   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
162   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
163 }
164 
TEST_P(DBWriteTest,WriteThreadHangOnWriteStall)165 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
166   Options options = GetOptions();
167   options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
168   std::vector<port::Thread> threads;
169   std::atomic<int> thread_num(0);
170   port::Mutex mutex;
171   port::CondVar cv(&mutex);
172   // Guarded by mutex
173   int writers = 0;
174 
175   Reopen(options);
176 
177   std::function<void()> write_slowdown_func = [&]() {
178     int a = thread_num.fetch_add(1);
179     std::string key = "foo" + std::to_string(a);
180     WriteOptions wo;
181     wo.no_slowdown = false;
182     ASSERT_OK(dbfull()->Put(wo, key, "bar"));
183   };
184   std::function<void()> write_no_slowdown_func = [&]() {
185     int a = thread_num.fetch_add(1);
186     std::string key = "foo" + std::to_string(a);
187     WriteOptions wo;
188     wo.no_slowdown = true;
189     Status s = dbfull()->Put(wo, key, "bar");
190     ASSERT_TRUE(s.ok() || s.IsIncomplete());
191   };
192   std::function<void(void *)> unblock_main_thread_func = [&](void *) {
193     mutex.Lock();
194     ++writers;
195     cv.SignalAll();
196     mutex.Unlock();
197   };
198 
199   // Create 3 L0 files and schedule 4th without waiting
200   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
201   ASSERT_OK(Flush());
202   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
203   ASSERT_OK(Flush());
204   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
205   ASSERT_OK(Flush());
206   ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
207 
208   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
209       "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
210   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
211       {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
212         "DBImpl::BackgroundCallFlush:start"},
213        {"DBWriteTest::WriteThreadHangOnWriteStall:2",
214         "DBImpl::WriteImpl:BeforeLeaderEnters"},
215        // Make compaction start wait for the write stall to be detected and
216        // implemented by a write group leader
217        {"DBWriteTest::WriteThreadHangOnWriteStall:3",
218         "BackgroundCallCompaction:0"}});
219   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
220 
221   // Schedule creation of 4th L0 file without waiting. This will seal the
222   // memtable and then wait for a sync point before writing the file. We need
223   // to do it this way because SwitchMemtable() needs to enter the
224   // write_thread
225   FlushOptions fopt;
226   fopt.wait = false;
227   ASSERT_OK(dbfull()->Flush(fopt));
228 
229   // Create a mix of slowdown/no_slowdown write threads
230   mutex.Lock();
231   // First leader
232   threads.emplace_back(write_slowdown_func);
233   while (writers != 1) {
234     cv.Wait();
235   }
236   // Second leader. Will stall writes
237   threads.emplace_back(write_slowdown_func);
238   threads.emplace_back(write_no_slowdown_func);
239   threads.emplace_back(write_slowdown_func);
240   threads.emplace_back(write_no_slowdown_func);
241   threads.emplace_back(write_slowdown_func);
242   while (writers != 6) {
243     cv.Wait();
244   }
245   mutex.Unlock();
246 
247   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
248   ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
249   // This would have triggered a write stall. Unblock the write group leader
250   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
251   // The leader is going to create missing newer links. When the leader finishes,
252   // the next leader is going to delay writes and fail writers with no_slowdown
253 
254   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
255   for (auto& t : threads) {
256     t.join();
257   }
258   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
259   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
260 }
261 
TEST_P(DBWriteTest,IOErrorOnWALWritePropagateToWriteThreadFollower)262 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
263   constexpr int kNumThreads = 5;
264   std::unique_ptr<FaultInjectionTestEnv> mock_env(
265       new FaultInjectionTestEnv(env_));
266   Options options = GetOptions();
267   options.env = mock_env.get();
268   Reopen(options);
269   std::atomic<int> ready_count{0};
270   std::atomic<int> leader_count{0};
271   std::vector<port::Thread> threads;
272   mock_env->SetFilesystemActive(false);
273 
274   // Wait until all threads linked to write threads, to make sure
275   // all threads join the same batch group.
276   SyncPoint::GetInstance()->SetCallBack(
277       "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
278         ready_count++;
279         auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
280         if (w->state == WriteThread::STATE_GROUP_LEADER) {
281           leader_count++;
282           while (ready_count < kNumThreads) {
283             // busy waiting
284           }
285         }
286       });
287   SyncPoint::GetInstance()->EnableProcessing();
288   for (int i = 0; i < kNumThreads; i++) {
289     threads.push_back(port::Thread(
290         [&](int index) {
291           // All threads should fail.
292           auto res = Put("key" + ToString(index), "value");
293           if (options.manual_wal_flush) {
294             ASSERT_TRUE(res.ok());
295             // we should see fs error when we do the flush
296 
297             // TSAN reports a false alarm for lock-order-inversion but Open and
298             // FlushWAL are not run concurrently. Disabling this until TSAN is
299             // fixed.
300             // res = dbfull()->FlushWAL(false);
301             // ASSERT_FALSE(res.ok());
302           } else {
303             ASSERT_FALSE(res.ok());
304           }
305         },
306         i));
307   }
308   for (int i = 0; i < kNumThreads; i++) {
309     threads[i].join();
310   }
311   ASSERT_EQ(1, leader_count);
312 
313   // The Failed PUT operations can cause a BG error to be set.
314   // Mark it as Checked for the ASSERT_STATUS_CHECKED
315   dbfull()->Resume().PermitUncheckedError();
316 
317   // Close before mock_env destruct.
318   Close();
319 }
320 
TEST_P(DBWriteTest,ManualWalFlushInEffect)321 TEST_P(DBWriteTest, ManualWalFlushInEffect) {
322   Options options = GetOptions();
323   Reopen(options);
324   // try the 1st WAL created during open
325   ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
326   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
327   ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
328   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
329   // try the 2nd wal created during SwitchWAL
330   ASSERT_OK(dbfull()->TEST_SwitchWAL());
331   ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
332   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
333   ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
334   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
335 }
336 
TEST_P(DBWriteTest,IOErrorOnWALWriteTriggersReadOnlyMode)337 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
338   std::unique_ptr<FaultInjectionTestEnv> mock_env(
339       new FaultInjectionTestEnv(env_));
340   Options options = GetOptions();
341   options.env = mock_env.get();
342   Reopen(options);
343   for (int i = 0; i < 2; i++) {
344     // Forcibly fail WAL write for the first Put only. Subsequent Puts should
345     // fail due to read-only mode
346     mock_env->SetFilesystemActive(i != 0);
347     auto res = Put("key" + ToString(i), "value");
348     // TSAN reports a false alarm for lock-order-inversion but Open and
349     // FlushWAL are not run concurrently. Disabling this until TSAN is
350     // fixed.
351     /*
352     if (options.manual_wal_flush && i == 0) {
353       // even with manual_wal_flush the 2nd Put should return error because of
354       // the read-only mode
355       ASSERT_TRUE(res.ok());
356       // we should see fs error when we do the flush
357       res = dbfull()->FlushWAL(false);
358     }
359     */
360     if (!options.manual_wal_flush) {
361       ASSERT_NOK(res);
362     } else {
363       ASSERT_OK(res);
364     }
365   }
366   // Close before mock_env destruct.
367   Close();
368 }
369 
TEST_P(DBWriteTest,IOErrorOnSwitchMemtable)370 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
371   Random rnd(301);
372   std::unique_ptr<FaultInjectionTestEnv> mock_env(
373       new FaultInjectionTestEnv(env_));
374   Options options = GetOptions();
375   options.env = mock_env.get();
376   options.writable_file_max_buffer_size = 4 * 1024 * 1024;
377   options.write_buffer_size = 3 * 512 * 1024;
378   options.wal_bytes_per_sync = 256 * 1024;
379   options.manual_wal_flush = true;
380   Reopen(options);
381   mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
382   Status s;
383   for (int i = 0; i < 4 * 512; ++i) {
384     s = Put(Key(i), rnd.RandomString(1024));
385     if (!s.ok()) {
386       break;
387     }
388   }
389   ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
390 
391   mock_env->SetFilesystemActive(true);
392   // Close before mock_env destruct.
393   Close();
394 }
395 
396 // Test that db->LockWAL() flushes the WAL after locking.
TEST_P(DBWriteTest,LockWalInEffect)397 TEST_P(DBWriteTest, LockWalInEffect) {
398   Options options = GetOptions();
399   Reopen(options);
400   // try the 1st WAL created during open
401   ASSERT_OK(Put("key" + ToString(0), "value"));
402   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
403   ASSERT_OK(dbfull()->LockWAL());
404   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
405   ASSERT_OK(dbfull()->UnlockWAL());
406   // try the 2nd wal created during SwitchWAL
407   ASSERT_OK(dbfull()->TEST_SwitchWAL());
408   ASSERT_OK(Put("key" + ToString(0), "value"));
409   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
410   ASSERT_OK(dbfull()->LockWAL());
411   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
412   ASSERT_OK(dbfull()->UnlockWAL());
413 }
414 
TEST_P(DBWriteTest,ConcurrentlyDisabledWAL)415 TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
416     Options options = GetOptions();
417     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
418     options.statistics->set_stats_level(StatsLevel::kAll);
419     Reopen(options);
420     std::string wal_key_prefix = "WAL_KEY_";
421     std::string no_wal_key_prefix = "K_";
422     // 100 KB value each for NO-WAL operation
423     std::string no_wal_value(1024 * 100, 'X');
424     // 1B value each for WAL operation
425     std::string wal_value = "0";
426     std::thread threads[10];
427     for (int t = 0; t < 10; t++) {
428         threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
429             for(int i = 0; i < 10; i++) {
430               ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
431               write_option_disable.disableWAL = true;
432               ROCKSDB_NAMESPACE::WriteOptions write_option_default;
433               std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
434                                        "_" + std::to_string(i);
435               ASSERT_OK(
436                   this->Put(no_wal_key, no_wal_value, write_option_disable));
437               std::string wal_key =
438                   wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
439               ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
440               ASSERT_OK(dbfull()->SyncWAL());
441             }
442             return;
443         });
444     }
445     for (auto& t: threads) {
446         t.join();
447     }
448     uint64_t bytes_num = options.statistics->getTickerCount(
449         ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
450     // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
451     ASSERT_LE(bytes_num, 1024 * 100);
452 }
453 
454 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
455                         testing::Values(DBTestBase::kDefault,
456                                         DBTestBase::kConcurrentWALWrites,
457                                         DBTestBase::kPipelinedWrite));
458 
459 }  // namespace ROCKSDB_NAMESPACE
460 
main(int argc,char ** argv)461 int main(int argc, char** argv) {
462   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
463   ::testing::InitGoogleTest(&argc, argv);
464   return RUN_ALL_TESTS();
465 }
466