1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #include <algorithm>
7 #include <vector>
8 
9 #include "env/mock_env.h"
10 #include "file/line_file_reader.h"
11 #include "file/random_access_file_reader.h"
12 #include "file/readahead_raf.h"
13 #include "file/sequence_file_reader.h"
14 #include "file/writable_file_writer.h"
15 #include "rocksdb/file_system.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/random.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 
22 class WritableFileWriterTest : public testing::Test {};
23 
24 const uint32_t kMb = 1 << 20;
25 
TEST_F(WritableFileWriterTest,RangeSync)26 TEST_F(WritableFileWriterTest, RangeSync) {
27   class FakeWF : public FSWritableFile {
28    public:
29     explicit FakeWF() : size_(0), last_synced_(0) {}
30     ~FakeWF() override {}
31 
32     using FSWritableFile::Append;
33     IOStatus Append(const Slice& data, const IOOptions& /*options*/,
34                     IODebugContext* /*dbg*/) override {
35       size_ += data.size();
36       return IOStatus::OK();
37     }
38     IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
39                       IODebugContext* /*dbg*/) override {
40       return IOStatus::OK();
41     }
42     IOStatus Close(const IOOptions& /*options*/,
43                    IODebugContext* /*dbg*/) override {
44       EXPECT_GE(size_, last_synced_ + kMb);
45       EXPECT_LT(size_, last_synced_ + 2 * kMb);
46       // Make sure random writes generated enough writes.
47       EXPECT_GT(size_, 10 * kMb);
48       return IOStatus::OK();
49     }
50     IOStatus Flush(const IOOptions& /*options*/,
51                    IODebugContext* /*dbg*/) override {
52       return IOStatus::OK();
53     }
54     IOStatus Sync(const IOOptions& /*options*/,
55                   IODebugContext* /*dbg*/) override {
56       return IOStatus::OK();
57     }
58     IOStatus Fsync(const IOOptions& /*options*/,
59                    IODebugContext* /*dbg*/) override {
60       return IOStatus::OK();
61     }
62     void SetIOPriority(Env::IOPriority /*pri*/) override {}
63     uint64_t GetFileSize(const IOOptions& /*options*/,
64                          IODebugContext* /*dbg*/) override {
65       return size_;
66     }
67     void GetPreallocationStatus(size_t* /*block_size*/,
68                                 size_t* /*last_allocated_block*/) override {}
69     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
70       return 0;
71     }
72     IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
73       return IOStatus::OK();
74     }
75 
76    protected:
77     IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
78                       const IOOptions& /*options*/,
79                       IODebugContext* /*dbg*/) override {
80       return IOStatus::OK();
81     }
82     IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
83                        const IOOptions& /*options*/,
84                        IODebugContext* /*dbg*/) override {
85       EXPECT_EQ(offset % 4096, 0u);
86       EXPECT_EQ(nbytes % 4096, 0u);
87 
88       EXPECT_EQ(offset, last_synced_);
89       last_synced_ = offset + nbytes;
90       EXPECT_GE(size_, last_synced_ + kMb);
91       if (size_ > 2 * kMb) {
92         EXPECT_LT(size_, last_synced_ + 2 * kMb);
93       }
94       return IOStatus::OK();
95     }
96 
97     uint64_t size_;
98     uint64_t last_synced_;
99   };
100 
101   EnvOptions env_options;
102   env_options.bytes_per_sync = kMb;
103   std::unique_ptr<FakeWF> wf(new FakeWF);
104   std::unique_ptr<WritableFileWriter> writer(
105       new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
106   Random r(301);
107   Status s;
108   std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
109   for (int i = 0; i < 1000; i++) {
110     int skew_limit = (i < 700) ? 10 : 15;
111     uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
112     s = writer->Append(Slice(large_buf.get(), num));
113     ASSERT_OK(s);
114 
115     // Flush in a chance of 1/10.
116     if (r.Uniform(10) == 0) {
117       s = writer->Flush();
118       ASSERT_OK(s);
119     }
120   }
121   s = writer->Close();
122   ASSERT_OK(s);
123 }
124 
TEST_F(WritableFileWriterTest,IncrementalBuffer)125 TEST_F(WritableFileWriterTest, IncrementalBuffer) {
126   class FakeWF : public FSWritableFile {
127    public:
128     explicit FakeWF(std::string* _file_data, bool _use_direct_io,
129                     bool _no_flush)
130         : file_data_(_file_data),
131           use_direct_io_(_use_direct_io),
132           no_flush_(_no_flush) {}
133     ~FakeWF() override {}
134 
135     using FSWritableFile::Append;
136     IOStatus Append(const Slice& data, const IOOptions& /*options*/,
137                     IODebugContext* /*dbg*/) override {
138       file_data_->append(data.data(), data.size());
139       size_ += data.size();
140       return IOStatus::OK();
141     }
142     using FSWritableFile::PositionedAppend;
143     IOStatus PositionedAppend(const Slice& data, uint64_t pos,
144                               const IOOptions& /*options*/,
145                               IODebugContext* /*dbg*/) override {
146       EXPECT_TRUE(pos % 512 == 0);
147       EXPECT_TRUE(data.size() % 512 == 0);
148       file_data_->resize(pos);
149       file_data_->append(data.data(), data.size());
150       size_ += data.size();
151       return IOStatus::OK();
152     }
153 
154     IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
155                       IODebugContext* /*dbg*/) override {
156       file_data_->resize(size);
157       return IOStatus::OK();
158     }
159     IOStatus Close(const IOOptions& /*options*/,
160                    IODebugContext* /*dbg*/) override {
161       return IOStatus::OK();
162     }
163     IOStatus Flush(const IOOptions& /*options*/,
164                    IODebugContext* /*dbg*/) override {
165       return IOStatus::OK();
166     }
167     IOStatus Sync(const IOOptions& /*options*/,
168                   IODebugContext* /*dbg*/) override {
169       return IOStatus::OK();
170     }
171     IOStatus Fsync(const IOOptions& /*options*/,
172                    IODebugContext* /*dbg*/) override {
173       return IOStatus::OK();
174     }
175     void SetIOPriority(Env::IOPriority /*pri*/) override {}
176     uint64_t GetFileSize(const IOOptions& /*options*/,
177                          IODebugContext* /*dbg*/) override {
178       return size_;
179     }
180     void GetPreallocationStatus(size_t* /*block_size*/,
181                                 size_t* /*last_allocated_block*/) override {}
182     size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
183       return 0;
184     }
185     IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
186       return IOStatus::OK();
187     }
188     bool use_direct_io() const override { return use_direct_io_; }
189 
190     std::string* file_data_;
191     bool use_direct_io_;
192     bool no_flush_;
193     size_t size_ = 0;
194   };
195 
196   Random r(301);
197   const int kNumAttempts = 50;
198   for (int attempt = 0; attempt < kNumAttempts; attempt++) {
199     bool no_flush = (attempt % 3 == 0);
200     EnvOptions env_options;
201     env_options.writable_file_max_buffer_size =
202         (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
203     std::string actual;
204     std::unique_ptr<FakeWF> wf(new FakeWF(&actual,
205 #ifndef ROCKSDB_LITE
206                                           attempt % 2 == 1,
207 #else
208                                           false,
209 #endif
210                                           no_flush));
211     std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
212         std::move(wf), "" /* don't care */, env_options));
213 
214     std::string target;
215     for (int i = 0; i < 20; i++) {
216       uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
217       std::string random_string = r.RandomString(num);
218       ASSERT_OK(writer->Append(Slice(random_string.c_str(), num)));
219       target.append(random_string.c_str(), num);
220 
221       // In some attempts, flush in a chance of 1/10.
222       if (!no_flush && r.Uniform(10) == 0) {
223         ASSERT_OK(writer->Flush());
224       }
225     }
226     ASSERT_OK(writer->Flush());
227     ASSERT_OK(writer->Close());
228     ASSERT_EQ(target.size(), actual.size());
229     ASSERT_EQ(target, actual);
230   }
231 }
232 
233 #ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest,AppendStatusReturn)234 TEST_F(WritableFileWriterTest, AppendStatusReturn) {
235   class FakeWF : public FSWritableFile {
236    public:
237     explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
238 
239     bool use_direct_io() const override { return use_direct_io_; }
240 
241     using FSWritableFile::Append;
242     IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
243                     IODebugContext* /*dbg*/) override {
244       if (io_error_) {
245         return IOStatus::IOError("Fake IO error");
246       }
247       return IOStatus::OK();
248     }
249     using FSWritableFile::PositionedAppend;
250     IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
251                               const IOOptions& /*options*/,
252                               IODebugContext* /*dbg*/) override {
253       if (io_error_) {
254         return IOStatus::IOError("Fake IO error");
255       }
256       return IOStatus::OK();
257     }
258     IOStatus Close(const IOOptions& /*options*/,
259                    IODebugContext* /*dbg*/) override {
260       return IOStatus::OK();
261     }
262     IOStatus Flush(const IOOptions& /*options*/,
263                    IODebugContext* /*dbg*/) override {
264       return IOStatus::OK();
265     }
266     IOStatus Sync(const IOOptions& /*options*/,
267                   IODebugContext* /*dbg*/) override {
268       return IOStatus::OK();
269     }
270     void Setuse_direct_io(bool val) { use_direct_io_ = val; }
271     void SetIOError(bool val) { io_error_ = val; }
272 
273    protected:
274     bool use_direct_io_;
275     bool io_error_;
276   };
277   std::unique_ptr<FakeWF> wf(new FakeWF());
278   wf->Setuse_direct_io(true);
279   std::unique_ptr<WritableFileWriter> writer(
280       new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
281 
282   ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
283 
284   // Next call to WritableFile::Append() should fail
285   FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
286   fwf->SetIOError(true);
287   ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
288 }
289 #endif
290 
291 class ReadaheadRandomAccessFileTest
292     : public testing::Test,
293       public testing::WithParamInterface<size_t> {
294  public:
GetReadaheadSizeList()295   static std::vector<size_t> GetReadaheadSizeList() {
296     return {1lu << 12, 1lu << 16};
297   }
SetUp()298   void SetUp() override {
299     readahead_size_ = GetParam();
300     scratch_.reset(new char[2 * readahead_size_]);
301     ResetSourceStr();
302   }
ReadaheadRandomAccessFileTest()303   ReadaheadRandomAccessFileTest() : control_contents_() {}
Read(uint64_t offset,size_t n)304   std::string Read(uint64_t offset, size_t n) {
305     Slice result;
306     Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
307                                        scratch_.get(), nullptr);
308     EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
309     return std::string(result.data(), result.size());
310   }
ResetSourceStr(const std::string & str="")311   void ResetSourceStr(const std::string& str = "") {
312     std::unique_ptr<FSWritableFile> sink(
313         new test::StringSink(&control_contents_));
314     std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
315         std::move(sink), "" /* don't care */, FileOptions()));
316     Status s = write_holder->Append(Slice(str));
317     EXPECT_OK(s);
318     s = write_holder->Flush();
319     EXPECT_OK(s);
320     std::unique_ptr<FSRandomAccessFile> read_holder(
321         new test::StringSource(control_contents_));
322     test_read_holder_ =
323         NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
324   }
GetReadaheadSize() const325   size_t GetReadaheadSize() const { return readahead_size_; }
326 
327  private:
328   size_t readahead_size_;
329   Slice control_contents_;
330   std::unique_ptr<FSRandomAccessFile> test_read_holder_;
331   std::unique_ptr<char[]> scratch_;
332 };
333 
TEST_P(ReadaheadRandomAccessFileTest,EmptySourceStr)334 TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
335   ASSERT_EQ("", Read(0, 1));
336   ASSERT_EQ("", Read(0, 0));
337   ASSERT_EQ("", Read(13, 13));
338 }
339 
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenLessThanReadaheadSize)340 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
341   std::string str = "abcdefghijklmnopqrs";
342   ResetSourceStr(str);
343   ASSERT_EQ(str.substr(3, 4), Read(3, 4));
344   ASSERT_EQ(str.substr(0, 3), Read(0, 3));
345   ASSERT_EQ(str, Read(0, str.size()));
346   ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
347             Read(7, 30));
348   ASSERT_EQ("", Read(100, 100));
349 }
350 
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenGreaterThanReadaheadSize)351 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
352   Random rng(42);
353   for (int k = 0; k < 100; ++k) {
354     size_t strLen = k * GetReadaheadSize() +
355                     rng.Uniform(static_cast<int>(GetReadaheadSize()));
356     std::string str = rng.HumanReadableString(static_cast<int>(strLen));
357     ResetSourceStr(str);
358     for (int test = 1; test <= 100; ++test) {
359       size_t offset = rng.Uniform(static_cast<int>(strLen));
360       size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
361       ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
362                 Read(offset, n));
363     }
364   }
365 }
366 
TEST_P(ReadaheadRandomAccessFileTest,ReadExceedsReadaheadSize)367 TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
368   Random rng(7);
369   size_t strLen = 4 * GetReadaheadSize() +
370                   rng.Uniform(static_cast<int>(GetReadaheadSize()));
371   std::string str = rng.HumanReadableString(static_cast<int>(strLen));
372   ResetSourceStr(str);
373   for (int test = 1; test <= 100; ++test) {
374     size_t offset = rng.Uniform(static_cast<int>(strLen));
375     size_t n =
376         GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
377     ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
378               Read(offset, n));
379   }
380 }
381 
382 INSTANTIATE_TEST_CASE_P(
383     EmptySourceStr, ReadaheadRandomAccessFileTest,
384     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
385 INSTANTIATE_TEST_CASE_P(
386     SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
387     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
388 INSTANTIATE_TEST_CASE_P(
389     SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
390     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
391 INSTANTIATE_TEST_CASE_P(
392     ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
393     ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
394 
395 class ReadaheadSequentialFileTest : public testing::Test,
396                                     public testing::WithParamInterface<size_t> {
397  public:
GetReadaheadSizeList()398   static std::vector<size_t> GetReadaheadSizeList() {
399     return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
400   }
SetUp()401   void SetUp() override {
402     readahead_size_ = GetParam();
403     scratch_.reset(new char[2 * readahead_size_]);
404     ResetSourceStr();
405   }
ReadaheadSequentialFileTest()406   ReadaheadSequentialFileTest() {}
Read(size_t n)407   std::string Read(size_t n) {
408     Slice result;
409     Status s = test_read_holder_->Read(n, &result, scratch_.get());
410     EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
411     return std::string(result.data(), result.size());
412   }
Skip(size_t n)413   void Skip(size_t n) { test_read_holder_->Skip(n); }
ResetSourceStr(const std::string & str="")414   void ResetSourceStr(const std::string& str = "") {
415     auto read_holder = std::unique_ptr<FSSequentialFile>(
416         new test::SeqStringSource(str, &seq_read_count_));
417     test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
418                                                      "test", readahead_size_));
419   }
GetReadaheadSize() const420   size_t GetReadaheadSize() const { return readahead_size_; }
421 
422  private:
423   size_t readahead_size_;
424   std::unique_ptr<SequentialFileReader> test_read_holder_;
425   std::unique_ptr<char[]> scratch_;
426   std::atomic<int> seq_read_count_;
427 };
428 
TEST_P(ReadaheadSequentialFileTest,EmptySourceStr)429 TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
430   ASSERT_EQ("", Read(0));
431   ASSERT_EQ("", Read(1));
432   ASSERT_EQ("", Read(13));
433 }
434 
TEST_P(ReadaheadSequentialFileTest,SourceStrLenLessThanReadaheadSize)435 TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
436   std::string str = "abcdefghijklmnopqrs";
437   ResetSourceStr(str);
438   ASSERT_EQ(str.substr(0, 3), Read(3));
439   ASSERT_EQ(str.substr(3, 1), Read(1));
440   ASSERT_EQ(str.substr(4), Read(str.size()));
441   ASSERT_EQ("", Read(100));
442 }
443 
TEST_P(ReadaheadSequentialFileTest,SourceStrLenGreaterThanReadaheadSize)444 TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
445   Random rng(42);
446   for (int s = 0; s < 1; ++s) {
447     for (int k = 0; k < 100; ++k) {
448       size_t strLen = k * GetReadaheadSize() +
449                       rng.Uniform(static_cast<int>(GetReadaheadSize()));
450       std::string str = rng.HumanReadableString(static_cast<int>(strLen));
451       ResetSourceStr(str);
452       size_t offset = 0;
453       for (int test = 1; test <= 100; ++test) {
454         size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
455         if (s && test % 2) {
456           Skip(n);
457         } else {
458           ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
459         }
460         offset = std::min(offset + n, strLen);
461       }
462     }
463   }
464 }
465 
TEST_P(ReadaheadSequentialFileTest,ReadExceedsReadaheadSize)466 TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
467   Random rng(42);
468   for (int s = 0; s < 1; ++s) {
469     for (int k = 0; k < 100; ++k) {
470       size_t strLen = k * GetReadaheadSize() +
471                       rng.Uniform(static_cast<int>(GetReadaheadSize()));
472       std::string str = rng.HumanReadableString(static_cast<int>(strLen));
473       ResetSourceStr(str);
474       size_t offset = 0;
475       for (int test = 1; test <= 100; ++test) {
476         size_t n = GetReadaheadSize() +
477                    rng.Uniform(static_cast<int>(GetReadaheadSize()));
478         if (s && test % 2) {
479           Skip(n);
480         } else {
481           ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
482         }
483         offset = std::min(offset + n, strLen);
484       }
485     }
486   }
487 }
488 
489 INSTANTIATE_TEST_CASE_P(
490     EmptySourceStr, ReadaheadSequentialFileTest,
491     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
492 INSTANTIATE_TEST_CASE_P(
493     SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
494     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
495 INSTANTIATE_TEST_CASE_P(
496     SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
497     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
498 INSTANTIATE_TEST_CASE_P(
499     ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
500     ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
501 
502 namespace {
GenerateLine(int n)503 std::string GenerateLine(int n) {
504   std::string rv;
505   // Multiples of 17 characters per line, for likely bad buffer alignment
506   for (int i = 0; i < n; ++i) {
507     rv.push_back(static_cast<char>('0' + (i % 10)));
508     rv.append("xxxxxxxxxxxxxxxx");
509   }
510   return rv;
511 }
512 }  // namespace
513 
TEST(LineFileReaderTest,LineFileReaderTest)514 TEST(LineFileReaderTest, LineFileReaderTest) {
515   const int nlines = 1000;
516 
517   std::unique_ptr<MockEnv> mem_env(new MockEnv(Env::Default()));
518   std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
519   // Create an input file
520   {
521     std::unique_ptr<FSWritableFile> file;
522     ASSERT_OK(
523         fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
524 
525     for (int i = 0; i < nlines; ++i) {
526       std::string line = GenerateLine(i);
527       line.push_back('\n');
528       ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
529     }
530   }
531 
532   // Verify with no I/O errors
533   {
534     std::unique_ptr<LineFileReader> reader;
535     ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
536                                      nullptr));
537     std::string line;
538     int count = 0;
539     while (reader->ReadLine(&line)) {
540       ASSERT_EQ(line, GenerateLine(count));
541       ++count;
542       ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
543     }
544     ASSERT_OK(reader->GetStatus());
545     ASSERT_EQ(count, nlines);
546     ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
547     // And still
548     ASSERT_FALSE(reader->ReadLine(&line));
549     ASSERT_OK(reader->GetStatus());
550     ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
551   }
552 
553   // Verify with injected I/O error
554   {
555     std::unique_ptr<LineFileReader> reader;
556     ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
557                                      nullptr));
558     std::string line;
559     int count = 0;
560     // Read part way through the file
561     while (count < nlines / 4) {
562       ASSERT_TRUE(reader->ReadLine(&line));
563       ASSERT_EQ(line, GenerateLine(count));
564       ++count;
565       ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
566     }
567     ASSERT_OK(reader->GetStatus());
568 
569     // Inject error
570     int callback_count = 0;
571     SyncPoint::GetInstance()->SetCallBack(
572         "MemFile::Read:IOStatus", [&](void* arg) {
573           IOStatus* status = static_cast<IOStatus*>(arg);
574           *status = IOStatus::Corruption("test");
575           ++callback_count;
576         });
577     SyncPoint::GetInstance()->EnableProcessing();
578 
579     while (reader->ReadLine(&line)) {
580       ASSERT_EQ(line, GenerateLine(count));
581       ++count;
582       ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
583     }
584     ASSERT_TRUE(reader->GetStatus().IsCorruption());
585     ASSERT_LT(count, nlines / 2);
586     ASSERT_EQ(callback_count, 1);
587 
588     // Still get error & no retry
589     ASSERT_FALSE(reader->ReadLine(&line));
590     ASSERT_TRUE(reader->GetStatus().IsCorruption());
591     ASSERT_EQ(callback_count, 1);
592 
593     SyncPoint::GetInstance()->DisableProcessing();
594     SyncPoint::GetInstance()->ClearAllCallBacks();
595   }
596 }
597 
598 }  // namespace ROCKSDB_NAMESPACE
599 
main(int argc,char ** argv)600 int main(int argc, char** argv) {
601   ::testing::InitGoogleTest(&argc, argv);
602   return RUN_ALL_TESTS();
603 }
604