1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <atomic>
7 #include <memory>
8 #include <thread>
9 #include <vector>
10 #include <fstream>
11 #include "db/db_test_util.h"
12 #include "db/write_batch_internal.h"
13 #include "db/write_thread.h"
14 #include "port/port.h"
15 #include "port/stack_trace.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/sync_point.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 // Test variations of WriteImpl.
23 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
24  public:
DBWriteTest()25   DBWriteTest() : DBTestBase("/db_write_test") {}
26 
GetOptions()27   Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
28 
Open()29   void Open() { DBTestBase::Reopen(GetOptions()); }
30 };
31 
32 // It is invalid to do sync write while disabling WAL.
TEST_P(DBWriteTest,SyncAndDisableWAL)33 TEST_P(DBWriteTest, SyncAndDisableWAL) {
34   WriteOptions write_options;
35   write_options.sync = true;
36   write_options.disableWAL = true;
37   ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
38   WriteBatch batch;
39   ASSERT_OK(batch.Put("foo", "bar"));
40   ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
41 }
42 
TEST_P(DBWriteTest,WriteThreadHangOnWriteStall)43 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
44   Options options = GetOptions();
45   options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
46   std::vector<port::Thread> threads;
47   std::atomic<int> thread_num(0);
48   port::Mutex mutex;
49   port::CondVar cv(&mutex);
50 
51   Reopen(options);
52 
53   std::function<void()> write_slowdown_func = [&]() {
54     int a = thread_num.fetch_add(1);
55     std::string key = "foo" + std::to_string(a);
56     WriteOptions wo;
57     wo.no_slowdown = false;
58     dbfull()->Put(wo, key, "bar");
59   };
60   std::function<void()> write_no_slowdown_func = [&]() {
61     int a = thread_num.fetch_add(1);
62     std::string key = "foo" + std::to_string(a);
63     WriteOptions wo;
64     wo.no_slowdown = true;
65     dbfull()->Put(wo, key, "bar");
66   };
67   std::function<void(void *)> unblock_main_thread_func = [&](void *) {
68     mutex.Lock();
69     cv.SignalAll();
70     mutex.Unlock();
71   };
72 
73   // Create 3 L0 files and schedule 4th without waiting
74   Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
75   Flush();
76   Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
77   Flush();
78   Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
79   Flush();
80   Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
81 
82   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
83       "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
84   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
85       {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
86         "DBImpl::BackgroundCallFlush:start"},
87        {"DBWriteTest::WriteThreadHangOnWriteStall:2",
88         "DBImpl::WriteImpl:BeforeLeaderEnters"},
89        // Make compaction start wait for the write stall to be detected and
90        // implemented by a write group leader
91        {"DBWriteTest::WriteThreadHangOnWriteStall:3",
92         "BackgroundCallCompaction:0"}});
93   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
94 
95   // Schedule creation of 4th L0 file without waiting. This will seal the
96   // memtable and then wait for a sync point before writing the file. We need
97   // to do it this way because SwitchMemtable() needs to enter the
98   // write_thread
99   FlushOptions fopt;
100   fopt.wait = false;
101   dbfull()->Flush(fopt);
102 
103   // Create a mix of slowdown/no_slowdown write threads
104   mutex.Lock();
105   // First leader
106   threads.emplace_back(write_slowdown_func);
107   cv.Wait();
108   // Second leader. Will stall writes
109   threads.emplace_back(write_slowdown_func);
110   cv.Wait();
111   threads.emplace_back(write_no_slowdown_func);
112   cv.Wait();
113   threads.emplace_back(write_slowdown_func);
114   cv.Wait();
115   threads.emplace_back(write_no_slowdown_func);
116   cv.Wait();
117   threads.emplace_back(write_slowdown_func);
118   cv.Wait();
119   mutex.Unlock();
120 
121   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
122   dbfull()->TEST_WaitForFlushMemTable(nullptr);
123   // This would have triggered a write stall. Unblock the write group leader
124   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
125   // The leader is going to create missing newer links. When the leader finishes,
126   // the next leader is going to delay writes and fail writers with no_slowdown
127 
128   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
129   for (auto& t : threads) {
130     t.join();
131   }
132 }
133 
TEST_P(DBWriteTest,IOErrorOnWALWritePropagateToWriteThreadFollower)134 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
135   constexpr int kNumThreads = 5;
136   std::unique_ptr<FaultInjectionTestEnv> mock_env(
137       new FaultInjectionTestEnv(Env::Default()));
138   Options options = GetOptions();
139   options.env = mock_env.get();
140   Reopen(options);
141   std::atomic<int> ready_count{0};
142   std::atomic<int> leader_count{0};
143   std::vector<port::Thread> threads;
144   mock_env->SetFilesystemActive(false);
145 
146   // Wait until all threads linked to write threads, to make sure
147   // all threads join the same batch group.
148   SyncPoint::GetInstance()->SetCallBack(
149       "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
150         ready_count++;
151         auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
152         if (w->state == WriteThread::STATE_GROUP_LEADER) {
153           leader_count++;
154           while (ready_count < kNumThreads) {
155             // busy waiting
156           }
157         }
158       });
159   SyncPoint::GetInstance()->EnableProcessing();
160   for (int i = 0; i < kNumThreads; i++) {
161     threads.push_back(port::Thread(
162         [&](int index) {
163           // All threads should fail.
164           auto res = Put("key" + ToString(index), "value");
165           if (options.manual_wal_flush) {
166             ASSERT_TRUE(res.ok());
167             // we should see fs error when we do the flush
168 
169             // TSAN reports a false alarm for lock-order-inversion but Open and
170             // FlushWAL are not run concurrently. Disabling this until TSAN is
171             // fixed.
172             // res = dbfull()->FlushWAL(false);
173             // ASSERT_FALSE(res.ok());
174           } else {
175             ASSERT_FALSE(res.ok());
176           }
177         },
178         i));
179   }
180   for (int i = 0; i < kNumThreads; i++) {
181     threads[i].join();
182   }
183   ASSERT_EQ(1, leader_count);
184   // Close before mock_env destruct.
185   Close();
186 }
187 
TEST_P(DBWriteTest,ManualWalFlushInEffect)188 TEST_P(DBWriteTest, ManualWalFlushInEffect) {
189   Options options = GetOptions();
190   Reopen(options);
191   // try the 1st WAL created during open
192   ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
193   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
194   ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
195   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
196   // try the 2nd wal created during SwitchWAL
197   dbfull()->TEST_SwitchWAL();
198   ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
199   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
200   ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
201   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
202 }
203 
TEST_P(DBWriteTest,IOErrorOnWALWriteTriggersReadOnlyMode)204 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
205   std::unique_ptr<FaultInjectionTestEnv> mock_env(
206       new FaultInjectionTestEnv(Env::Default()));
207   Options options = GetOptions();
208   options.env = mock_env.get();
209   Reopen(options);
210   for (int i = 0; i < 2; i++) {
211     // Forcibly fail WAL write for the first Put only. Subsequent Puts should
212     // fail due to read-only mode
213     mock_env->SetFilesystemActive(i != 0);
214     auto res = Put("key" + ToString(i), "value");
215     // TSAN reports a false alarm for lock-order-inversion but Open and
216     // FlushWAL are not run concurrently. Disabling this until TSAN is
217     // fixed.
218     /*
219     if (options.manual_wal_flush && i == 0) {
220       // even with manual_wal_flush the 2nd Put should return error because of
221       // the read-only mode
222       ASSERT_TRUE(res.ok());
223       // we should see fs error when we do the flush
224       res = dbfull()->FlushWAL(false);
225     }
226     */
227     if (!options.manual_wal_flush) {
228       ASSERT_FALSE(res.ok());
229     }
230   }
231   // Close before mock_env destruct.
232   Close();
233 }
234 
TEST_P(DBWriteTest,IOErrorOnSwitchMemtable)235 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
236   Random rnd(301);
237   std::unique_ptr<FaultInjectionTestEnv> mock_env(
238       new FaultInjectionTestEnv(Env::Default()));
239   Options options = GetOptions();
240   options.env = mock_env.get();
241   options.writable_file_max_buffer_size = 4 * 1024 * 1024;
242   options.write_buffer_size = 3 * 512 * 1024;
243   options.wal_bytes_per_sync = 256 * 1024;
244   options.manual_wal_flush = true;
245   Reopen(options);
246   mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
247   Status s;
248   for (int i = 0; i < 4 * 512; ++i) {
249     s = Put(Key(i), RandomString(&rnd, 1024));
250     if (!s.ok()) {
251       break;
252     }
253   }
254   ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
255 
256   mock_env->SetFilesystemActive(true);
257   // Close before mock_env destruct.
258   Close();
259 }
260 
261 // Test that db->LockWAL() flushes the WAL after locking.
TEST_P(DBWriteTest,LockWalInEffect)262 TEST_P(DBWriteTest, LockWalInEffect) {
263   Options options = GetOptions();
264   Reopen(options);
265   // try the 1st WAL created during open
266   ASSERT_OK(Put("key" + ToString(0), "value"));
267   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
268   ASSERT_OK(dbfull()->LockWAL());
269   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
270   ASSERT_OK(dbfull()->UnlockWAL());
271   // try the 2nd wal created during SwitchWAL
272   dbfull()->TEST_SwitchWAL();
273   ASSERT_OK(Put("key" + ToString(0), "value"));
274   ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
275   ASSERT_OK(dbfull()->LockWAL());
276   ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
277   ASSERT_OK(dbfull()->UnlockWAL());
278 }
279 
TEST_P(DBWriteTest,ConcurrentlyDisabledWAL)280 TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
281     Options options = GetOptions();
282     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
283     options.statistics->set_stats_level(StatsLevel::kAll);
284     Reopen(options);
285     std::string wal_key_prefix = "WAL_KEY_";
286     std::string no_wal_key_prefix = "K_";
287     // 100 KB value each for NO-WAL operation
288     std::string no_wal_value(1024 * 100, 'X');
289     // 1B value each for WAL operation
290     std::string wal_value = "0";
291     std::thread threads[10];
292     for (int t = 0; t < 10; t++) {
293         threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
294             for(int i = 0; i < 10; i++) {
295               ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
296               write_option_disable.disableWAL = true;
297               ROCKSDB_NAMESPACE::WriteOptions write_option_default;
298               std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
299                                        "_" + std::to_string(i);
300               this->Put(no_wal_key, no_wal_value, write_option_disable);
301               std::string wal_key =
302                   wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
303               this->Put(wal_key, wal_value, write_option_default);
304               dbfull()->SyncWAL();
305             }
306             return 0;
307         });
308     }
309     for (auto& t: threads) {
310         t.join();
311     }
312     uint64_t bytes_num = options.statistics->getTickerCount(
313         ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
314     // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
315     ASSERT_LE(bytes_num, 1024 * 100);
316 }
317 
318 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
319                         testing::Values(DBTestBase::kDefault,
320                                         DBTestBase::kConcurrentWALWrites,
321                                         DBTestBase::kPipelinedWrite));
322 
323 }  // namespace ROCKSDB_NAMESPACE
324 
main(int argc,char ** argv)325 int main(int argc, char** argv) {
326   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
327   ::testing::InitGoogleTest(&argc, argv);
328   return RUN_ALL_TESTS();
329 }
330