1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #include <algorithm>
7 #include <vector>
8
9 #include "env/mock_env.h"
10 #include "file/line_file_reader.h"
11 #include "file/random_access_file_reader.h"
12 #include "file/readahead_raf.h"
13 #include "file/sequence_file_reader.h"
14 #include "file/writable_file_writer.h"
15 #include "rocksdb/file_system.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/random.h"
19
20 namespace ROCKSDB_NAMESPACE {
21
22 class WritableFileWriterTest : public testing::Test {};
23
24 const uint32_t kMb = 1 << 20;
25
TEST_F(WritableFileWriterTest,RangeSync)26 TEST_F(WritableFileWriterTest, RangeSync) {
27 class FakeWF : public FSWritableFile {
28 public:
29 explicit FakeWF() : size_(0), last_synced_(0) {}
30 ~FakeWF() override {}
31
32 using FSWritableFile::Append;
33 IOStatus Append(const Slice& data, const IOOptions& /*options*/,
34 IODebugContext* /*dbg*/) override {
35 size_ += data.size();
36 return IOStatus::OK();
37 }
38 IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*options*/,
39 IODebugContext* /*dbg*/) override {
40 return IOStatus::OK();
41 }
42 IOStatus Close(const IOOptions& /*options*/,
43 IODebugContext* /*dbg*/) override {
44 EXPECT_GE(size_, last_synced_ + kMb);
45 EXPECT_LT(size_, last_synced_ + 2 * kMb);
46 // Make sure random writes generated enough writes.
47 EXPECT_GT(size_, 10 * kMb);
48 return IOStatus::OK();
49 }
50 IOStatus Flush(const IOOptions& /*options*/,
51 IODebugContext* /*dbg*/) override {
52 return IOStatus::OK();
53 }
54 IOStatus Sync(const IOOptions& /*options*/,
55 IODebugContext* /*dbg*/) override {
56 return IOStatus::OK();
57 }
58 IOStatus Fsync(const IOOptions& /*options*/,
59 IODebugContext* /*dbg*/) override {
60 return IOStatus::OK();
61 }
62 void SetIOPriority(Env::IOPriority /*pri*/) override {}
63 uint64_t GetFileSize(const IOOptions& /*options*/,
64 IODebugContext* /*dbg*/) override {
65 return size_;
66 }
67 void GetPreallocationStatus(size_t* /*block_size*/,
68 size_t* /*last_allocated_block*/) override {}
69 size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
70 return 0;
71 }
72 IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
73 return IOStatus::OK();
74 }
75
76 protected:
77 IOStatus Allocate(uint64_t /*offset*/, uint64_t /*len*/,
78 const IOOptions& /*options*/,
79 IODebugContext* /*dbg*/) override {
80 return IOStatus::OK();
81 }
82 IOStatus RangeSync(uint64_t offset, uint64_t nbytes,
83 const IOOptions& /*options*/,
84 IODebugContext* /*dbg*/) override {
85 EXPECT_EQ(offset % 4096, 0u);
86 EXPECT_EQ(nbytes % 4096, 0u);
87
88 EXPECT_EQ(offset, last_synced_);
89 last_synced_ = offset + nbytes;
90 EXPECT_GE(size_, last_synced_ + kMb);
91 if (size_ > 2 * kMb) {
92 EXPECT_LT(size_, last_synced_ + 2 * kMb);
93 }
94 return IOStatus::OK();
95 }
96
97 uint64_t size_;
98 uint64_t last_synced_;
99 };
100
101 EnvOptions env_options;
102 env_options.bytes_per_sync = kMb;
103 std::unique_ptr<FakeWF> wf(new FakeWF);
104 std::unique_ptr<WritableFileWriter> writer(
105 new WritableFileWriter(std::move(wf), "" /* don't care */, env_options));
106 Random r(301);
107 Status s;
108 std::unique_ptr<char[]> large_buf(new char[10 * kMb]);
109 for (int i = 0; i < 1000; i++) {
110 int skew_limit = (i < 700) ? 10 : 15;
111 uint32_t num = r.Skewed(skew_limit) * 100 + r.Uniform(100);
112 s = writer->Append(Slice(large_buf.get(), num));
113 ASSERT_OK(s);
114
115 // Flush in a chance of 1/10.
116 if (r.Uniform(10) == 0) {
117 s = writer->Flush();
118 ASSERT_OK(s);
119 }
120 }
121 s = writer->Close();
122 ASSERT_OK(s);
123 }
124
TEST_F(WritableFileWriterTest,IncrementalBuffer)125 TEST_F(WritableFileWriterTest, IncrementalBuffer) {
126 class FakeWF : public FSWritableFile {
127 public:
128 explicit FakeWF(std::string* _file_data, bool _use_direct_io,
129 bool _no_flush)
130 : file_data_(_file_data),
131 use_direct_io_(_use_direct_io),
132 no_flush_(_no_flush) {}
133 ~FakeWF() override {}
134
135 using FSWritableFile::Append;
136 IOStatus Append(const Slice& data, const IOOptions& /*options*/,
137 IODebugContext* /*dbg*/) override {
138 file_data_->append(data.data(), data.size());
139 size_ += data.size();
140 return IOStatus::OK();
141 }
142 using FSWritableFile::PositionedAppend;
143 IOStatus PositionedAppend(const Slice& data, uint64_t pos,
144 const IOOptions& /*options*/,
145 IODebugContext* /*dbg*/) override {
146 EXPECT_TRUE(pos % 512 == 0);
147 EXPECT_TRUE(data.size() % 512 == 0);
148 file_data_->resize(pos);
149 file_data_->append(data.data(), data.size());
150 size_ += data.size();
151 return IOStatus::OK();
152 }
153
154 IOStatus Truncate(uint64_t size, const IOOptions& /*options*/,
155 IODebugContext* /*dbg*/) override {
156 file_data_->resize(size);
157 return IOStatus::OK();
158 }
159 IOStatus Close(const IOOptions& /*options*/,
160 IODebugContext* /*dbg*/) override {
161 return IOStatus::OK();
162 }
163 IOStatus Flush(const IOOptions& /*options*/,
164 IODebugContext* /*dbg*/) override {
165 return IOStatus::OK();
166 }
167 IOStatus Sync(const IOOptions& /*options*/,
168 IODebugContext* /*dbg*/) override {
169 return IOStatus::OK();
170 }
171 IOStatus Fsync(const IOOptions& /*options*/,
172 IODebugContext* /*dbg*/) override {
173 return IOStatus::OK();
174 }
175 void SetIOPriority(Env::IOPriority /*pri*/) override {}
176 uint64_t GetFileSize(const IOOptions& /*options*/,
177 IODebugContext* /*dbg*/) override {
178 return size_;
179 }
180 void GetPreallocationStatus(size_t* /*block_size*/,
181 size_t* /*last_allocated_block*/) override {}
182 size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
183 return 0;
184 }
185 IOStatus InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
186 return IOStatus::OK();
187 }
188 bool use_direct_io() const override { return use_direct_io_; }
189
190 std::string* file_data_;
191 bool use_direct_io_;
192 bool no_flush_;
193 size_t size_ = 0;
194 };
195
196 Random r(301);
197 const int kNumAttempts = 50;
198 for (int attempt = 0; attempt < kNumAttempts; attempt++) {
199 bool no_flush = (attempt % 3 == 0);
200 EnvOptions env_options;
201 env_options.writable_file_max_buffer_size =
202 (attempt < kNumAttempts / 2) ? 512 * 1024 : 700 * 1024;
203 std::string actual;
204 std::unique_ptr<FakeWF> wf(new FakeWF(&actual,
205 #ifndef ROCKSDB_LITE
206 attempt % 2 == 1,
207 #else
208 false,
209 #endif
210 no_flush));
211 std::unique_ptr<WritableFileWriter> writer(new WritableFileWriter(
212 std::move(wf), "" /* don't care */, env_options));
213
214 std::string target;
215 for (int i = 0; i < 20; i++) {
216 uint32_t num = r.Skewed(16) * 100 + r.Uniform(100);
217 std::string random_string = r.RandomString(num);
218 ASSERT_OK(writer->Append(Slice(random_string.c_str(), num)));
219 target.append(random_string.c_str(), num);
220
221 // In some attempts, flush in a chance of 1/10.
222 if (!no_flush && r.Uniform(10) == 0) {
223 ASSERT_OK(writer->Flush());
224 }
225 }
226 ASSERT_OK(writer->Flush());
227 ASSERT_OK(writer->Close());
228 ASSERT_EQ(target.size(), actual.size());
229 ASSERT_EQ(target, actual);
230 }
231 }
232
233 #ifndef ROCKSDB_LITE
TEST_F(WritableFileWriterTest,AppendStatusReturn)234 TEST_F(WritableFileWriterTest, AppendStatusReturn) {
235 class FakeWF : public FSWritableFile {
236 public:
237 explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
238
239 bool use_direct_io() const override { return use_direct_io_; }
240
241 using FSWritableFile::Append;
242 IOStatus Append(const Slice& /*data*/, const IOOptions& /*options*/,
243 IODebugContext* /*dbg*/) override {
244 if (io_error_) {
245 return IOStatus::IOError("Fake IO error");
246 }
247 return IOStatus::OK();
248 }
249 using FSWritableFile::PositionedAppend;
250 IOStatus PositionedAppend(const Slice& /*data*/, uint64_t,
251 const IOOptions& /*options*/,
252 IODebugContext* /*dbg*/) override {
253 if (io_error_) {
254 return IOStatus::IOError("Fake IO error");
255 }
256 return IOStatus::OK();
257 }
258 IOStatus Close(const IOOptions& /*options*/,
259 IODebugContext* /*dbg*/) override {
260 return IOStatus::OK();
261 }
262 IOStatus Flush(const IOOptions& /*options*/,
263 IODebugContext* /*dbg*/) override {
264 return IOStatus::OK();
265 }
266 IOStatus Sync(const IOOptions& /*options*/,
267 IODebugContext* /*dbg*/) override {
268 return IOStatus::OK();
269 }
270 void Setuse_direct_io(bool val) { use_direct_io_ = val; }
271 void SetIOError(bool val) { io_error_ = val; }
272
273 protected:
274 bool use_direct_io_;
275 bool io_error_;
276 };
277 std::unique_ptr<FakeWF> wf(new FakeWF());
278 wf->Setuse_direct_io(true);
279 std::unique_ptr<WritableFileWriter> writer(
280 new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
281
282 ASSERT_OK(writer->Append(std::string(2 * kMb, 'a')));
283
284 // Next call to WritableFile::Append() should fail
285 FakeWF* fwf = static_cast<FakeWF*>(writer->writable_file());
286 fwf->SetIOError(true);
287 ASSERT_NOK(writer->Append(std::string(2 * kMb, 'b')));
288 }
289 #endif
290
291 class ReadaheadRandomAccessFileTest
292 : public testing::Test,
293 public testing::WithParamInterface<size_t> {
294 public:
GetReadaheadSizeList()295 static std::vector<size_t> GetReadaheadSizeList() {
296 return {1lu << 12, 1lu << 16};
297 }
SetUp()298 void SetUp() override {
299 readahead_size_ = GetParam();
300 scratch_.reset(new char[2 * readahead_size_]);
301 ResetSourceStr();
302 }
ReadaheadRandomAccessFileTest()303 ReadaheadRandomAccessFileTest() : control_contents_() {}
Read(uint64_t offset,size_t n)304 std::string Read(uint64_t offset, size_t n) {
305 Slice result;
306 Status s = test_read_holder_->Read(offset, n, IOOptions(), &result,
307 scratch_.get(), nullptr);
308 EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
309 return std::string(result.data(), result.size());
310 }
ResetSourceStr(const std::string & str="")311 void ResetSourceStr(const std::string& str = "") {
312 std::unique_ptr<FSWritableFile> sink(
313 new test::StringSink(&control_contents_));
314 std::unique_ptr<WritableFileWriter> write_holder(new WritableFileWriter(
315 std::move(sink), "" /* don't care */, FileOptions()));
316 Status s = write_holder->Append(Slice(str));
317 EXPECT_OK(s);
318 s = write_holder->Flush();
319 EXPECT_OK(s);
320 std::unique_ptr<FSRandomAccessFile> read_holder(
321 new test::StringSource(control_contents_));
322 test_read_holder_ =
323 NewReadaheadRandomAccessFile(std::move(read_holder), readahead_size_);
324 }
GetReadaheadSize() const325 size_t GetReadaheadSize() const { return readahead_size_; }
326
327 private:
328 size_t readahead_size_;
329 Slice control_contents_;
330 std::unique_ptr<FSRandomAccessFile> test_read_holder_;
331 std::unique_ptr<char[]> scratch_;
332 };
333
TEST_P(ReadaheadRandomAccessFileTest,EmptySourceStr)334 TEST_P(ReadaheadRandomAccessFileTest, EmptySourceStr) {
335 ASSERT_EQ("", Read(0, 1));
336 ASSERT_EQ("", Read(0, 0));
337 ASSERT_EQ("", Read(13, 13));
338 }
339
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenLessThanReadaheadSize)340 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenLessThanReadaheadSize) {
341 std::string str = "abcdefghijklmnopqrs";
342 ResetSourceStr(str);
343 ASSERT_EQ(str.substr(3, 4), Read(3, 4));
344 ASSERT_EQ(str.substr(0, 3), Read(0, 3));
345 ASSERT_EQ(str, Read(0, str.size()));
346 ASSERT_EQ(str.substr(7, std::min(static_cast<int>(str.size()) - 7, 30)),
347 Read(7, 30));
348 ASSERT_EQ("", Read(100, 100));
349 }
350
TEST_P(ReadaheadRandomAccessFileTest,SourceStrLenGreaterThanReadaheadSize)351 TEST_P(ReadaheadRandomAccessFileTest, SourceStrLenGreaterThanReadaheadSize) {
352 Random rng(42);
353 for (int k = 0; k < 100; ++k) {
354 size_t strLen = k * GetReadaheadSize() +
355 rng.Uniform(static_cast<int>(GetReadaheadSize()));
356 std::string str = rng.HumanReadableString(static_cast<int>(strLen));
357 ResetSourceStr(str);
358 for (int test = 1; test <= 100; ++test) {
359 size_t offset = rng.Uniform(static_cast<int>(strLen));
360 size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
361 ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
362 Read(offset, n));
363 }
364 }
365 }
366
TEST_P(ReadaheadRandomAccessFileTest,ReadExceedsReadaheadSize)367 TEST_P(ReadaheadRandomAccessFileTest, ReadExceedsReadaheadSize) {
368 Random rng(7);
369 size_t strLen = 4 * GetReadaheadSize() +
370 rng.Uniform(static_cast<int>(GetReadaheadSize()));
371 std::string str = rng.HumanReadableString(static_cast<int>(strLen));
372 ResetSourceStr(str);
373 for (int test = 1; test <= 100; ++test) {
374 size_t offset = rng.Uniform(static_cast<int>(strLen));
375 size_t n =
376 GetReadaheadSize() + rng.Uniform(static_cast<int>(GetReadaheadSize()));
377 ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)),
378 Read(offset, n));
379 }
380 }
381
382 INSTANTIATE_TEST_CASE_P(
383 EmptySourceStr, ReadaheadRandomAccessFileTest,
384 ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
385 INSTANTIATE_TEST_CASE_P(
386 SourceStrLenLessThanReadaheadSize, ReadaheadRandomAccessFileTest,
387 ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
388 INSTANTIATE_TEST_CASE_P(
389 SourceStrLenGreaterThanReadaheadSize, ReadaheadRandomAccessFileTest,
390 ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
391 INSTANTIATE_TEST_CASE_P(
392 ReadExceedsReadaheadSize, ReadaheadRandomAccessFileTest,
393 ::testing::ValuesIn(ReadaheadRandomAccessFileTest::GetReadaheadSizeList()));
394
395 class ReadaheadSequentialFileTest : public testing::Test,
396 public testing::WithParamInterface<size_t> {
397 public:
GetReadaheadSizeList()398 static std::vector<size_t> GetReadaheadSizeList() {
399 return {1lu << 8, 1lu << 12, 1lu << 16, 1lu << 18};
400 }
SetUp()401 void SetUp() override {
402 readahead_size_ = GetParam();
403 scratch_.reset(new char[2 * readahead_size_]);
404 ResetSourceStr();
405 }
ReadaheadSequentialFileTest()406 ReadaheadSequentialFileTest() {}
Read(size_t n)407 std::string Read(size_t n) {
408 Slice result;
409 Status s = test_read_holder_->Read(n, &result, scratch_.get());
410 EXPECT_TRUE(s.ok() || s.IsInvalidArgument());
411 return std::string(result.data(), result.size());
412 }
Skip(size_t n)413 void Skip(size_t n) { test_read_holder_->Skip(n); }
ResetSourceStr(const std::string & str="")414 void ResetSourceStr(const std::string& str = "") {
415 auto read_holder = std::unique_ptr<FSSequentialFile>(
416 new test::SeqStringSource(str, &seq_read_count_));
417 test_read_holder_.reset(new SequentialFileReader(std::move(read_holder),
418 "test", readahead_size_));
419 }
GetReadaheadSize() const420 size_t GetReadaheadSize() const { return readahead_size_; }
421
422 private:
423 size_t readahead_size_;
424 std::unique_ptr<SequentialFileReader> test_read_holder_;
425 std::unique_ptr<char[]> scratch_;
426 std::atomic<int> seq_read_count_;
427 };
428
TEST_P(ReadaheadSequentialFileTest,EmptySourceStr)429 TEST_P(ReadaheadSequentialFileTest, EmptySourceStr) {
430 ASSERT_EQ("", Read(0));
431 ASSERT_EQ("", Read(1));
432 ASSERT_EQ("", Read(13));
433 }
434
TEST_P(ReadaheadSequentialFileTest,SourceStrLenLessThanReadaheadSize)435 TEST_P(ReadaheadSequentialFileTest, SourceStrLenLessThanReadaheadSize) {
436 std::string str = "abcdefghijklmnopqrs";
437 ResetSourceStr(str);
438 ASSERT_EQ(str.substr(0, 3), Read(3));
439 ASSERT_EQ(str.substr(3, 1), Read(1));
440 ASSERT_EQ(str.substr(4), Read(str.size()));
441 ASSERT_EQ("", Read(100));
442 }
443
TEST_P(ReadaheadSequentialFileTest,SourceStrLenGreaterThanReadaheadSize)444 TEST_P(ReadaheadSequentialFileTest, SourceStrLenGreaterThanReadaheadSize) {
445 Random rng(42);
446 for (int s = 0; s < 1; ++s) {
447 for (int k = 0; k < 100; ++k) {
448 size_t strLen = k * GetReadaheadSize() +
449 rng.Uniform(static_cast<int>(GetReadaheadSize()));
450 std::string str = rng.HumanReadableString(static_cast<int>(strLen));
451 ResetSourceStr(str);
452 size_t offset = 0;
453 for (int test = 1; test <= 100; ++test) {
454 size_t n = rng.Uniform(static_cast<int>(GetReadaheadSize()));
455 if (s && test % 2) {
456 Skip(n);
457 } else {
458 ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
459 }
460 offset = std::min(offset + n, strLen);
461 }
462 }
463 }
464 }
465
TEST_P(ReadaheadSequentialFileTest,ReadExceedsReadaheadSize)466 TEST_P(ReadaheadSequentialFileTest, ReadExceedsReadaheadSize) {
467 Random rng(42);
468 for (int s = 0; s < 1; ++s) {
469 for (int k = 0; k < 100; ++k) {
470 size_t strLen = k * GetReadaheadSize() +
471 rng.Uniform(static_cast<int>(GetReadaheadSize()));
472 std::string str = rng.HumanReadableString(static_cast<int>(strLen));
473 ResetSourceStr(str);
474 size_t offset = 0;
475 for (int test = 1; test <= 100; ++test) {
476 size_t n = GetReadaheadSize() +
477 rng.Uniform(static_cast<int>(GetReadaheadSize()));
478 if (s && test % 2) {
479 Skip(n);
480 } else {
481 ASSERT_EQ(str.substr(offset, std::min(n, strLen - offset)), Read(n));
482 }
483 offset = std::min(offset + n, strLen);
484 }
485 }
486 }
487 }
488
489 INSTANTIATE_TEST_CASE_P(
490 EmptySourceStr, ReadaheadSequentialFileTest,
491 ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
492 INSTANTIATE_TEST_CASE_P(
493 SourceStrLenLessThanReadaheadSize, ReadaheadSequentialFileTest,
494 ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
495 INSTANTIATE_TEST_CASE_P(
496 SourceStrLenGreaterThanReadaheadSize, ReadaheadSequentialFileTest,
497 ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
498 INSTANTIATE_TEST_CASE_P(
499 ReadExceedsReadaheadSize, ReadaheadSequentialFileTest,
500 ::testing::ValuesIn(ReadaheadSequentialFileTest::GetReadaheadSizeList()));
501
502 namespace {
GenerateLine(int n)503 std::string GenerateLine(int n) {
504 std::string rv;
505 // Multiples of 17 characters per line, for likely bad buffer alignment
506 for (int i = 0; i < n; ++i) {
507 rv.push_back(static_cast<char>('0' + (i % 10)));
508 rv.append("xxxxxxxxxxxxxxxx");
509 }
510 return rv;
511 }
512 } // namespace
513
TEST(LineFileReaderTest,LineFileReaderTest)514 TEST(LineFileReaderTest, LineFileReaderTest) {
515 const int nlines = 1000;
516
517 std::unique_ptr<MockEnv> mem_env(new MockEnv(Env::Default()));
518 std::shared_ptr<FileSystem> fs = mem_env->GetFileSystem();
519 // Create an input file
520 {
521 std::unique_ptr<FSWritableFile> file;
522 ASSERT_OK(
523 fs->NewWritableFile("testfile", FileOptions(), &file, /*dbg*/ nullptr));
524
525 for (int i = 0; i < nlines; ++i) {
526 std::string line = GenerateLine(i);
527 line.push_back('\n');
528 ASSERT_OK(file->Append(line, IOOptions(), /*dbg*/ nullptr));
529 }
530 }
531
532 // Verify with no I/O errors
533 {
534 std::unique_ptr<LineFileReader> reader;
535 ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
536 nullptr));
537 std::string line;
538 int count = 0;
539 while (reader->ReadLine(&line)) {
540 ASSERT_EQ(line, GenerateLine(count));
541 ++count;
542 ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
543 }
544 ASSERT_OK(reader->GetStatus());
545 ASSERT_EQ(count, nlines);
546 ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
547 // And still
548 ASSERT_FALSE(reader->ReadLine(&line));
549 ASSERT_OK(reader->GetStatus());
550 ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
551 }
552
553 // Verify with injected I/O error
554 {
555 std::unique_ptr<LineFileReader> reader;
556 ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader,
557 nullptr));
558 std::string line;
559 int count = 0;
560 // Read part way through the file
561 while (count < nlines / 4) {
562 ASSERT_TRUE(reader->ReadLine(&line));
563 ASSERT_EQ(line, GenerateLine(count));
564 ++count;
565 ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
566 }
567 ASSERT_OK(reader->GetStatus());
568
569 // Inject error
570 int callback_count = 0;
571 SyncPoint::GetInstance()->SetCallBack(
572 "MemFile::Read:IOStatus", [&](void* arg) {
573 IOStatus* status = static_cast<IOStatus*>(arg);
574 *status = IOStatus::Corruption("test");
575 ++callback_count;
576 });
577 SyncPoint::GetInstance()->EnableProcessing();
578
579 while (reader->ReadLine(&line)) {
580 ASSERT_EQ(line, GenerateLine(count));
581 ++count;
582 ASSERT_EQ(static_cast<int>(reader->GetLineNumber()), count);
583 }
584 ASSERT_TRUE(reader->GetStatus().IsCorruption());
585 ASSERT_LT(count, nlines / 2);
586 ASSERT_EQ(callback_count, 1);
587
588 // Still get error & no retry
589 ASSERT_FALSE(reader->ReadLine(&line));
590 ASSERT_TRUE(reader->GetStatus().IsCorruption());
591 ASSERT_EQ(callback_count, 1);
592
593 SyncPoint::GetInstance()->DisableProcessing();
594 SyncPoint::GetInstance()->ClearAllCallBacks();
595 }
596 }
597
598 } // namespace ROCKSDB_NAMESPACE
599
main(int argc,char ** argv)600 int main(int argc, char** argv) {
601 ::testing::InitGoogleTest(&argc, argv);
602 return RUN_ALL_TESTS();
603 }
604