1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #include <algorithm>
7 #include <vector>
8 #include "env/composite_env_wrapper.h"
9 #include "file/random_access_file_reader.h"
10 #include "file/readahead_raf.h"
11 #include "file/sequence_file_reader.h"
12 #include "file/writable_file_writer.h"
13 #include "test_util/testharness.h"
14 #include "test_util/testutil.h"
15 #include "util/random.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
19 class WritableFileWriterTest : public testing::Test {};
20 
21 const uint32_t kMb = 1 << 20;
22 
TEST_F(WritableFileWriterTest,RangeSync)23 TEST_F(WritableFileWriterTest, RangeSync) {
24   class FakeWF : public WritableFile {
25    public:
26     explicit FakeWF() : size_(0), last_synced_(0) {}
27     ~FakeWF() override {}
28 
29     Status Append(const Slice& data) override {
30       size_ += data.size();
31       return Status::OK();
32     }
33     Status Truncate(uint64_t /*size*/) override { return Status::OK(); }
34     Status Close() override {
35       EXPECT_GE(size_, last_synced_ + kMb);
36       EXPECT_LT(size_, last_synced_ + 2 * kMb);
37       // Make sure random writes generated enough writes.
38       EXPECT_GT(size_, 10 * kMb);
39       return Status::OK();
40     }
41     Status Flush() override { return Status::OK(); }
42     Status Sync() override { return Status::OK(); }
43     Status Fsync() override { return Status::OK(); }
44     void SetIOPriority(Env::IOPriority /*pri*/) override {}
45     uint64_t GetFileSize() override { return size_; }
46     void GetPreallocationStatus(size_t* /*block_size*/,
47                                 size_t* /*last_allocated_block*/) override {}
48     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
49       return 0;
50     }
51     Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
52       return Status::OK();
53     }
54 
55    protected:
56     Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
57       return Status::OK();
58     }
59     Status RangeSync(uint64_t offset, uint64_t nbytes) override {
60       EXPECT_EQ(offset % 4096, 0u);
61       EXPECT_EQ(nbytes % 4096, 0u);
62 
63       EXPECT_EQ(offset, last_synced_);
64       last_synced_ = offset + nbytes;
65       EXPECT_GE(size_, last_synced_ + kMb);
66       if (size_ > 2 * kMb) {
67         EXPECT_LT(size_, last_synced_ + 2 * kMb);
68       }
69       return Status::OK();
70     }
71 
72     uint64_t size_;
73     uint64_t last_synced_;
74   };
75 
76   EnvOptions env_options;
77   env_options.bytes_per_sync = kMb;
78   std::unique_ptr<FakeWF> wf(new FakeWF);
79   std::unique_ptr<WritableFileWriter> writer(
80       new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
81                              "" /* don't care */, env_options));
82   Random r(301);
83   std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
84   for (int i = 0; i < 1000; i++) {
85     int skew_limit = (i < 700) ? 10 : 15;
86     uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
87     writer->Append(Slice(large_buf.get(), num));
88 
89     // Flush in a chance of 1/10.
90     if (r.Uniform(10) == 0) {
91       writer->Flush();
92     }
93   }
94   writer->Close();
95 }
96 
TEST_F(WritableFileWriterTest,IncrementalBuffer)97 TEST_F(WritableFileWriterTest, IncrementalBuffer) {
98   class FakeWF : public WritableFile {
99    public:
100     explicit FakeWF(std::string* _file_data, bool _use_direct_io,
101                     bool _no_flush)
102         : file_data_(_file_data),
103           use_direct_io_(_use_direct_io),
104           no_flush_(_no_flush) {}
105     ~FakeWF() override {}
106 
107     Status Append(const Slice& data) override {
108       file_data_->append(data.data(), data.size());
109       size_ += data.size();
110       return Status::OK();
111     }
112     Status PositionedAppend(const Slice& data, uint64_t pos) override {
113       EXPECT_TRUE(pos % 512 == 0);
114       EXPECT_TRUE(data.size() % 512 == 0);
115       file_data_->resize(pos);
116       file_data_->append(data.data(), data.size());
117       size_ += data.size();
118       return Status::OK();
119     }
120 
121     Status Truncate(uint64_t size) override {
122       file_data_->resize(size);
123       return Status::OK();
124     }
125     Status Close() override { return Status::OK(); }
126     Status Flush() override { return Status::OK(); }
127     Status Sync() override { return Status::OK(); }
128     Status Fsync() override { return Status::OK(); }
129     void SetIOPriority(Env::IOPriority /*pri*/) override {}
130     uint64_t GetFileSize() override { return size_; }
131     void GetPreallocationStatus(size_t* /*block_size*/,
132                                 size_t* /*last_allocated_block*/) override {}
133     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
134       return 0;
135     }
136     Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
137       return Status::OK();
138     }
139     bool use_direct_io() const override { return use_direct_io_; }
140 
141     std::string* file_data_;
142     bool use_direct_io_;
143     bool no_flush_;
144     size_t size_ = 0;
145   };
146 
147   Random r(301);
148   const int kNumAttempts = 50;
149   for (int attempt = 0; attempt < kNumAttempts; attempt++) {
150     bool no_flush = (attempt % 3 == 0);
151     EnvOptions env_options;
152     env_options.writable_file_max_buffer_size =
153         (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
154     std::string actual;
155     std::unique_ptr<FakeWF> wf(new FakeWF(&actual,
156 #ifndef ROCKSDB_LITE
157                                           attempt % 2 == 1,
158 #else
159                                           false,
160 #endif
161                                           no_flush));
162     std::unique_ptr<WritableFileWriter> writer(
163         new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
164                                "" /* don't care */, env_options));
165 
166     std::string target;
167     for (int i = 0; i < 20; i++) {
168       uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
169       std::string random_string;
170       test::RandomString(&r, num, &random_string);
171       writer->Append(Slice(random_string.c_str(), num));
172       target.append(random_string.c_str(), num);
173 
174       // In some attempts, flush in a chance of 1/10.
175       if (!no_flush && r.Uniform(10) == 0) {
176         writer->Flush();
177       }
178     }
179     writer->Flush();
180     writer->Close();
181     ASSERT_EQ(target.size(), actual.size());
182     ASSERT_EQ(target, actual);
183   }
184 }
185 
186 #ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest,AppendStatusReturn)187 TEST_F(WritableFileWriterTest, AppendStatusReturn) {
188   class FakeWF : public WritableFile {
189    public:
190     explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
191 
192     bool use_direct_io() const override { return use_direct_io_; }
193     Status Append(const Slice& /*data*/) override {
194       if (io_error_) {
195         return Status::IOError("Fake IO error");
196       }
197       return Status::OK();
198     }
199     Status PositionedAppend(const Slice& /*data*/, uint64_t) override {
200       if (io_error_) {
201         return Status::IOError("Fake IO error");
202       }
203       return Status::OK();
204     }
205     Status Close() override { return Status::OK(); }
206     Status Flush() override { return Status::OK(); }
207     Status Sync() override { return Status::OK(); }
208     void Setuse_direct_io(bool val) { use_direct_io_ = val; }
209     void SetIOError(bool val) { io_error_ = val; }
210 
211    protected:
212     bool use_direct_io_;
213     bool io_error_;
214   };
215   std::unique_ptr<FakeWF> wf(new FakeWF());
216   wf->Setuse_direct_io(true);
217   std::unique_ptr<WritableFileWriter> writer(
218       new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(wf)),
219                              "" /* don't care */, EnvOptions()));
220 
221   ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
222 
223   // Next call to WritableFile::Append() should fail
224   LegacyWritableFileWrapper* file =
225       static_cast<LegacyWritableFileWrapper*>(writer->writable_file());
226   static_cast<FakeWF*>(file->target())->SetIOError(true);
227   ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
228 }
229 #endif
230 
231 class ReadaheadRandomAccessFileTest
232     : public testing::Test,
233       public testing::WithParamInterface<size_t> {
234  public:
GetReadaheadSizeList()235   static std::vector<size_t> GetReadaheadSizeList() {
236     return {1lu << 12, 1lu << 16};
237   }
SetUp()238   void SetUp() override {
239     readahead_size_ = GetParam();
240     scratch_.reset(new char[2 * readahead_size_]);
241     ResetSourceStr();
242   }
ReadaheadRandomAccessFileTest()243   ReadaheadRandomAccessFileTest() : control_contents_() {}
Read(uint64_t offset,size_t n)244   std::string Read(uint64_t offset, size_t n) {
245     Slice result;
246     test_read_holder_->Read(offset, n, &result, scratch_.get());
247     return std::string(result.data(), result.size());
248   }
ResetSourceStr(const std::string & str="")249   void ResetSourceStr(const std::string& str = "") {
250     auto write_holder =
251         std::unique_ptr<WritableFileWriter>(test::GetWritableFileWriter(
252             new test::StringSink(&control_contents_), "" /* don't care */));
253     write_holder->Append(Slice(str));
254     write_holder->Flush();
255     auto read_holder = std::unique_ptr<RandomAccessFile>(
256         new test::StringSource(control_contents_));
257     test_read_holder_ =
258         NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
259   }
GetReadaheadSize() const260   size_t GetReadaheadSize() const { return readahead_size_; }
261 
262  private:
263   size_t readahead_size_;
264   Slice control_contents_;
265   std::unique_ptr<RandomAccessFile> test_read_holder_;
266   std::unique_ptr<char[]> scratch_;
267 };
268 
TEST_P(ReadaheadRandomAccessFileTest,EmptySourceStr)269 TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
270   ASSERT_EQ("", Read(0, 1));
271   ASSERT_EQ("", Read(0, 0));
272   ASSERT_EQ("", Read(13, 13));
273 }
274 
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenLessThanReadaheadSize)275 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
276   std::string str = "abcdefghijklmnopqrs";
277   ResetSourceStr(str);
278   ASSERT_EQ(str.substr(3, 4), Read(3, 4));
279   ASSERT_EQ(str.substr(0, 3), Read(0, 3));
280   ASSERT_EQ(str, Read(0, str.size()));
281   ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
282             Read(7, 30));
283   ASSERT_EQ("", Read(100, 100));
284 }
285 
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenGreaterThanReadaheadSize)286 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
287   Random rng(42);
288   for (int k = 0; k < 100; ++k) {
289     size_t strLen = k * GetReadaheadSize() +
290                     rng.Uniform(static_cast<int>(GetReadaheadSize()));
291     std::string str =
292         test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
293     ResetSourceStr(str);
294     for (int test = 1; test <= 100; ++test) {
295       size_t offset = rng.Uniform(static_cast<int>(strLen));
296       size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
297       ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
298                 Read(offset, n));
299     }
300   }
301 }
302 
TEST_P(ReadaheadRandomAccessFileTest,ReadExceedsReadaheadSize)303 TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
304   Random rng(7);
305   size_t strLen = 4 * GetReadaheadSize() +
306                   rng.Uniform(static_cast<int>(GetReadaheadSize()));
307   std::string str =
308       test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
309   ResetSourceStr(str);
310   for (int test = 1; test <= 100; ++test) {
311     size_t offset = rng.Uniform(static_cast<int>(strLen));
312     size_t n =
313         GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
314     ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
315               Read(offset, n));
316   }
317 }
318 
319 INSTANTIATE_TEST_CASE_P(
320     EmptySourceStr, ReadaheadRandomAccessFileTest,
321     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
322 INSTANTIATE_TEST_CASE_P(
323     SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
324     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
325 INSTANTIATE_TEST_CASE_P(
326     SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
327     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
328 INSTANTIATE_TEST_CASE_P(
329     ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
330     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
331 
332 class ReadaheadSequentialFileTest : public testing::Test,
333                                     public testing::WithParamInterface<size_t> {
334  public:
GetReadaheadSizeList()335   static std::vector<size_t> GetReadaheadSizeList() {
336     return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
337   }
SetUp()338   void SetUp() override {
339     readahead_size_ = GetParam();
340     scratch_.reset(new char[2 * readahead_size_]);
341     ResetSourceStr();
342   }
ReadaheadSequentialFileTest()343   ReadaheadSequentialFileTest() {}
Read(size_t n)344   std::string Read(size_t n) {
345     Slice result;
346     test_read_holder_->Read(n, &result, scratch_.get());
347     return std::string(result.data(), result.size());
348   }
Skip(size_t n)349   void Skip(size_t n) { test_read_holder_->Skip(n); }
ResetSourceStr(const std::string & str="")350   void ResetSourceStr(const std::string& str = "") {
351     auto read_holder = std::unique_ptr<SequentialFile>(
352         new test::SeqStringSource(str, &seq_read_count_));
353     test_read_holder_.reset(new SequentialFileReader(
354         NewLegacySequentialFileWrapper(read_holder), "test", readahead_size_));
355   }
GetReadaheadSize() const356   size_t GetReadaheadSize() const { return readahead_size_; }
357 
358  private:
359   size_t readahead_size_;
360   std::unique_ptr<SequentialFileReader> test_read_holder_;
361   std::unique_ptr<char[]> scratch_;
362   std::atomic<int> seq_read_count_;
363 };
364 
TEST_P(ReadaheadSequentialFileTest,EmptySourceStr)365 TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
366   ASSERT_EQ("", Read(0));
367   ASSERT_EQ("", Read(1));
368   ASSERT_EQ("", Read(13));
369 }
370 
TEST_P(ReadaheadSequentialFileTest,SourceStrLenLessThanReadaheadSize)371 TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
372   std::string str = "abcdefghijklmnopqrs";
373   ResetSourceStr(str);
374   ASSERT_EQ(str.substr(0, 3), Read(3));
375   ASSERT_EQ(str.substr(3, 1), Read(1));
376   ASSERT_EQ(str.substr(4), Read(str.size()));
377   ASSERT_EQ("", Read(100));
378 }
379 
TEST_P(ReadaheadSequentialFileTest,SourceStrLenGreaterThanReadaheadSize)380 TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
381   Random rng(42);
382   for (int s = 0; s < 1; ++s) {
383     for (int k = 0; k < 100; ++k) {
384       size_t strLen = k * GetReadaheadSize() +
385                       rng.Uniform(static_cast<int>(GetReadaheadSize()));
386       std::string str =
387           test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
388       ResetSourceStr(str);
389       size_t offset = 0;
390       for (int test = 1; test <= 100; ++test) {
391         size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
392         if (s && test % 2) {
393           Skip(n);
394         } else {
395           ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
396         }
397         offset = std::min(offset + n, strLen);
398       }
399     }
400   }
401 }
402 
TEST_P(ReadaheadSequentialFileTest,ReadExceedsReadaheadSize)403 TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
404   Random rng(42);
405   for (int s = 0; s < 1; ++s) {
406     for (int k = 0; k < 100; ++k) {
407       size_t strLen = k * GetReadaheadSize() +
408                       rng.Uniform(static_cast<int>(GetReadaheadSize()));
409       std::string str =
410           test::RandomHumanReadableString(&rng, static_cast<int>(strLen));
411       ResetSourceStr(str);
412       size_t offset = 0;
413       for (int test = 1; test <= 100; ++test) {
414         size_t n = GetReadaheadSize() +
415                    rng.Uniform(static_cast<int>(GetReadaheadSize()));
416         if (s && test % 2) {
417           Skip(n);
418         } else {
419           ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
420         }
421         offset = std::min(offset + n, strLen);
422       }
423     }
424   }
425 }
426 
427 INSTANTIATE_TEST_CASE_P(
428     EmptySourceStr, ReadaheadSequentialFileTest,
429     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
430 INSTANTIATE_TEST_CASE_P(
431     SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
432     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
433 INSTANTIATE_TEST_CASE_P(
434     SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
435     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
436 INSTANTIATE_TEST_CASE_P(
437     ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
438     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
439 }  // namespace ROCKSDB_NAMESPACE
440 
main(int argc,char ** argv)441 int main(int argc, char** argv) {
442   ::testing::InitGoogleTest(&argc, argv);
443   return RUN_ALL_TESTS();
444 }
445