1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12 
13 namespace leveldb {
14 namespace log {
15 
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19   std::string result;
20   while (result.size() < n) {
21     result.append(partial_string);
22   }
23   result.resize(n);
24   return result;
25 }
26 
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29   char buf[50];
30   snprintf(buf, sizeof(buf), "%d.", n);
31   return std::string(buf);
32 }
33 
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36   return BigString(NumberString(i), rnd->Skewed(17));
37 }
38 
39 class LogTest {
40  private:
41   class StringDest : public WritableFile {
42    public:
43     std::string contents_;
44 
Close()45     virtual Status Close() { return Status::OK(); }
Flush()46     virtual Status Flush() { return Status::OK(); }
Sync()47     virtual Status Sync() { return Status::OK(); }
Append(const Slice & slice)48     virtual Status Append(const Slice& slice) {
49       contents_.append(slice.data(), slice.size());
50       return Status::OK();
51     }
52   };
53 
54   class StringSource : public SequentialFile {
55    public:
56     Slice contents_;
57     bool force_error_;
58     bool returned_partial_;
StringSource()59     StringSource() : force_error_(false), returned_partial_(false) { }
60 
Read(size_t n,Slice * result,char * scratch)61     virtual Status Read(size_t n, Slice* result, char* scratch) {
62       ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63 
64       if (force_error_) {
65         force_error_ = false;
66         returned_partial_ = true;
67         return Status::Corruption("read error");
68       }
69 
70       if (contents_.size() < n) {
71         n = contents_.size();
72         returned_partial_ = true;
73       }
74       *result = Slice(contents_.data(), n);
75       contents_.remove_prefix(n);
76       return Status::OK();
77     }
78 
Skip(uint64_t n)79     virtual Status Skip(uint64_t n) {
80       if (n > contents_.size()) {
81         contents_.clear();
82         return Status::NotFound("in-memory file skipped past end");
83       }
84 
85       contents_.remove_prefix(n);
86 
87       return Status::OK();
88     }
89   };
90 
91   class ReportCollector : public Reader::Reporter {
92    public:
93     size_t dropped_bytes_;
94     std::string message_;
95 
ReportCollector()96     ReportCollector() : dropped_bytes_(0) { }
Corruption(size_t bytes,const Status & status)97     virtual void Corruption(size_t bytes, const Status& status) {
98       dropped_bytes_ += bytes;
99       message_.append(status.ToString());
100     }
101   };
102 
103   StringDest dest_;
104   StringSource source_;
105   ReportCollector report_;
106   bool reading_;
107   Writer* writer_;
108   Reader* reader_;
109 
110   // Record metadata for testing initial offset functionality
111   static size_t initial_offset_record_sizes_[];
112   static uint64_t initial_offset_last_record_offsets_[];
113   static int num_initial_offset_records_;
114 
115  public:
LogTest()116   LogTest() : reading_(false),
117               writer_(new Writer(&dest_)),
118               reader_(new Reader(&source_, &report_, true/*checksum*/,
119                       0/*initial_offset*/)) {
120   }
121 
~LogTest()122   ~LogTest() {
123     delete writer_;
124     delete reader_;
125   }
126 
ReopenForAppend()127   void ReopenForAppend() {
128     delete writer_;
129     writer_ = new Writer(&dest_, dest_.contents_.size());
130   }
131 
Write(const std::string & msg)132   void Write(const std::string& msg) {
133     ASSERT_TRUE(!reading_) << "Write() after starting to read";
134     writer_->AddRecord(Slice(msg));
135   }
136 
WrittenBytes() const137   size_t WrittenBytes() const {
138     return dest_.contents_.size();
139   }
140 
Read()141   std::string Read() {
142     if (!reading_) {
143       reading_ = true;
144       source_.contents_ = Slice(dest_.contents_);
145     }
146     std::string scratch;
147     Slice record;
148     if (reader_->ReadRecord(&record, &scratch)) {
149       return record.ToString();
150     } else {
151       return "EOF";
152     }
153   }
154 
IncrementByte(int offset,int delta)155   void IncrementByte(int offset, int delta) {
156     dest_.contents_[offset] += delta;
157   }
158 
SetByte(int offset,char new_byte)159   void SetByte(int offset, char new_byte) {
160     dest_.contents_[offset] = new_byte;
161   }
162 
ShrinkSize(int bytes)163   void ShrinkSize(int bytes) {
164     dest_.contents_.resize(dest_.contents_.size() - bytes);
165   }
166 
FixChecksum(int header_offset,int len)167   void FixChecksum(int header_offset, int len) {
168     // Compute crc of type/len/data
169     uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
170     crc = crc32c::Mask(crc);
171     EncodeFixed32(&dest_.contents_[header_offset], crc);
172   }
173 
ForceError()174   void ForceError() {
175     source_.force_error_ = true;
176   }
177 
DroppedBytes() const178   size_t DroppedBytes() const {
179     return report_.dropped_bytes_;
180   }
181 
ReportMessage() const182   std::string ReportMessage() const {
183     return report_.message_;
184   }
185 
186   // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const187   std::string MatchError(const std::string& msg) const {
188     if (report_.message_.find(msg) == std::string::npos) {
189       return report_.message_;
190     } else {
191       return "OK";
192     }
193   }
194 
WriteInitialOffsetLog()195   void WriteInitialOffsetLog() {
196     for (int i = 0; i < num_initial_offset_records_; i++) {
197       std::string record(initial_offset_record_sizes_[i],
198                          static_cast<char>('a' + i));
199       Write(record);
200     }
201   }
202 
StartReadingAt(uint64_t initial_offset)203   void StartReadingAt(uint64_t initial_offset) {
204     delete reader_;
205     reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
206   }
207 
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)208   void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
209     WriteInitialOffsetLog();
210     reading_ = true;
211     source_.contents_ = Slice(dest_.contents_);
212     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
213                                        WrittenBytes() + offset_past_end);
214     Slice record;
215     std::string scratch;
216     ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
217     delete offset_reader;
218   }
219 
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)220   void CheckInitialOffsetRecord(uint64_t initial_offset,
221                                 int expected_record_offset) {
222     WriteInitialOffsetLog();
223     reading_ = true;
224     source_.contents_ = Slice(dest_.contents_);
225     Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
226                                        initial_offset);
227 
228     // Read all records from expected_record_offset through the last one.
229     ASSERT_LT(expected_record_offset, num_initial_offset_records_);
230     for (; expected_record_offset < num_initial_offset_records_;
231          ++expected_record_offset) {
232       Slice record;
233       std::string scratch;
234       ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
235       ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
236                 record.size());
237       ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
238                 offset_reader->LastRecordOffset());
239       ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
240     }
241     delete offset_reader;
242   }
243 };
244 
245 size_t LogTest::initial_offset_record_sizes_[] =
246     {10000,  // Two sizable records in first block
247      10000,
248      2 * log::kBlockSize - 1000,  // Span three blocks
249      1,
250      13716,  // Consume all but two bytes of block 3.
251      log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
252     };
253 
254 uint64_t LogTest::initial_offset_last_record_offsets_[] =
255     {0,
256      kHeaderSize + 10000,
257      2 * (kHeaderSize + 10000),
258      2 * (kHeaderSize + 10000) +
259          (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
260      2 * (kHeaderSize + 10000) +
261          (2 * log::kBlockSize - 1000) + 3 * kHeaderSize
262          + kHeaderSize + 1,
263      3 * log::kBlockSize,
264     };
265 
266 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
267 int LogTest::num_initial_offset_records_ =
268     sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);
269 
TEST(LogTest,Empty)270 TEST(LogTest, Empty) {
271   ASSERT_EQ("EOF", Read());
272 }
273 
TEST(LogTest,ReadWrite)274 TEST(LogTest, ReadWrite) {
275   Write("foo");
276   Write("bar");
277   Write("");
278   Write("xxxx");
279   ASSERT_EQ("foo", Read());
280   ASSERT_EQ("bar", Read());
281   ASSERT_EQ("", Read());
282   ASSERT_EQ("xxxx", Read());
283   ASSERT_EQ("EOF", Read());
284   ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
285 }
286 
TEST(LogTest,ManyBlocks)287 TEST(LogTest, ManyBlocks) {
288   for (int i = 0; i < 100000; i++) {
289     Write(NumberString(i));
290   }
291   for (int i = 0; i < 100000; i++) {
292     ASSERT_EQ(NumberString(i), Read());
293   }
294   ASSERT_EQ("EOF", Read());
295 }
296 
TEST(LogTest,Fragmentation)297 TEST(LogTest, Fragmentation) {
298   Write("small");
299   Write(BigString("medium", 50000));
300   Write(BigString("large", 100000));
301   ASSERT_EQ("small", Read());
302   ASSERT_EQ(BigString("medium", 50000), Read());
303   ASSERT_EQ(BigString("large", 100000), Read());
304   ASSERT_EQ("EOF", Read());
305 }
306 
TEST(LogTest,MarginalTrailer)307 TEST(LogTest, MarginalTrailer) {
308   // Make a trailer that is exactly the same length as an empty record.
309   const int n = kBlockSize - 2*kHeaderSize;
310   Write(BigString("foo", n));
311   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
312   Write("");
313   Write("bar");
314   ASSERT_EQ(BigString("foo", n), Read());
315   ASSERT_EQ("", Read());
316   ASSERT_EQ("bar", Read());
317   ASSERT_EQ("EOF", Read());
318 }
319 
TEST(LogTest,MarginalTrailer2)320 TEST(LogTest, MarginalTrailer2) {
321   // Make a trailer that is exactly the same length as an empty record.
322   const int n = kBlockSize - 2*kHeaderSize;
323   Write(BigString("foo", n));
324   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
325   Write("bar");
326   ASSERT_EQ(BigString("foo", n), Read());
327   ASSERT_EQ("bar", Read());
328   ASSERT_EQ("EOF", Read());
329   ASSERT_EQ(0, DroppedBytes());
330   ASSERT_EQ("", ReportMessage());
331 }
332 
TEST(LogTest,ShortTrailer)333 TEST(LogTest, ShortTrailer) {
334   const int n = kBlockSize - 2*kHeaderSize + 4;
335   Write(BigString("foo", n));
336   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
337   Write("");
338   Write("bar");
339   ASSERT_EQ(BigString("foo", n), Read());
340   ASSERT_EQ("", Read());
341   ASSERT_EQ("bar", Read());
342   ASSERT_EQ("EOF", Read());
343 }
344 
TEST(LogTest,AlignedEof)345 TEST(LogTest, AlignedEof) {
346   const int n = kBlockSize - 2*kHeaderSize + 4;
347   Write(BigString("foo", n));
348   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
349   ASSERT_EQ(BigString("foo", n), Read());
350   ASSERT_EQ("EOF", Read());
351 }
352 
TEST(LogTest,OpenForAppend)353 TEST(LogTest, OpenForAppend) {
354   Write("hello");
355   ReopenForAppend();
356   Write("world");
357   ASSERT_EQ("hello", Read());
358   ASSERT_EQ("world", Read());
359   ASSERT_EQ("EOF", Read());
360 }
361 
TEST(LogTest,RandomRead)362 TEST(LogTest, RandomRead) {
363   const int N = 500;
364   Random write_rnd(301);
365   for (int i = 0; i < N; i++) {
366     Write(RandomSkewedString(i, &write_rnd));
367   }
368   Random read_rnd(301);
369   for (int i = 0; i < N; i++) {
370     ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
371   }
372   ASSERT_EQ("EOF", Read());
373 }
374 
375 // Tests of all the error paths in log_reader.cc follow:
376 
TEST(LogTest,ReadError)377 TEST(LogTest, ReadError) {
378   Write("foo");
379   ForceError();
380   ASSERT_EQ("EOF", Read());
381   ASSERT_EQ(kBlockSize, DroppedBytes());
382   ASSERT_EQ("OK", MatchError("read error"));
383 }
384 
TEST(LogTest,BadRecordType)385 TEST(LogTest, BadRecordType) {
386   Write("foo");
387   // Type is stored in header[6]
388   IncrementByte(6, 100);
389   FixChecksum(0, 3);
390   ASSERT_EQ("EOF", Read());
391   ASSERT_EQ(3, DroppedBytes());
392   ASSERT_EQ("OK", MatchError("unknown record type"));
393 }
394 
TEST(LogTest,TruncatedTrailingRecordIsIgnored)395 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
396   Write("foo");
397   ShrinkSize(4);   // Drop all payload as well as a header byte
398   ASSERT_EQ("EOF", Read());
399   // Truncated last record is ignored, not treated as an error.
400   ASSERT_EQ(0, DroppedBytes());
401   ASSERT_EQ("", ReportMessage());
402 }
403 
TEST(LogTest,BadLength)404 TEST(LogTest, BadLength) {
405   const int kPayloadSize = kBlockSize - kHeaderSize;
406   Write(BigString("bar", kPayloadSize));
407   Write("foo");
408   // Least significant size byte is stored in header[4].
409   IncrementByte(4, 1);
410   ASSERT_EQ("foo", Read());
411   ASSERT_EQ(kBlockSize, DroppedBytes());
412   ASSERT_EQ("OK", MatchError("bad record length"));
413 }
414 
TEST(LogTest,BadLengthAtEndIsIgnored)415 TEST(LogTest, BadLengthAtEndIsIgnored) {
416   Write("foo");
417   ShrinkSize(1);
418   ASSERT_EQ("EOF", Read());
419   ASSERT_EQ(0, DroppedBytes());
420   ASSERT_EQ("", ReportMessage());
421 }
422 
TEST(LogTest,ChecksumMismatch)423 TEST(LogTest, ChecksumMismatch) {
424   Write("foo");
425   IncrementByte(0, 10);
426   ASSERT_EQ("EOF", Read());
427   ASSERT_EQ(10, DroppedBytes());
428   ASSERT_EQ("OK", MatchError("checksum mismatch"));
429 }
430 
TEST(LogTest,UnexpectedMiddleType)431 TEST(LogTest, UnexpectedMiddleType) {
432   Write("foo");
433   SetByte(6, kMiddleType);
434   FixChecksum(0, 3);
435   ASSERT_EQ("EOF", Read());
436   ASSERT_EQ(3, DroppedBytes());
437   ASSERT_EQ("OK", MatchError("missing start"));
438 }
439 
TEST(LogTest,UnexpectedLastType)440 TEST(LogTest, UnexpectedLastType) {
441   Write("foo");
442   SetByte(6, kLastType);
443   FixChecksum(0, 3);
444   ASSERT_EQ("EOF", Read());
445   ASSERT_EQ(3, DroppedBytes());
446   ASSERT_EQ("OK", MatchError("missing start"));
447 }
448 
TEST(LogTest,UnexpectedFullType)449 TEST(LogTest, UnexpectedFullType) {
450   Write("foo");
451   Write("bar");
452   SetByte(6, kFirstType);
453   FixChecksum(0, 3);
454   ASSERT_EQ("bar", Read());
455   ASSERT_EQ("EOF", Read());
456   ASSERT_EQ(3, DroppedBytes());
457   ASSERT_EQ("OK", MatchError("partial record without end"));
458 }
459 
TEST(LogTest,UnexpectedFirstType)460 TEST(LogTest, UnexpectedFirstType) {
461   Write("foo");
462   Write(BigString("bar", 100000));
463   SetByte(6, kFirstType);
464   FixChecksum(0, 3);
465   ASSERT_EQ(BigString("bar", 100000), Read());
466   ASSERT_EQ("EOF", Read());
467   ASSERT_EQ(3, DroppedBytes());
468   ASSERT_EQ("OK", MatchError("partial record without end"));
469 }
470 
TEST(LogTest,MissingLastIsIgnored)471 TEST(LogTest, MissingLastIsIgnored) {
472   Write(BigString("bar", kBlockSize));
473   // Remove the LAST block, including header.
474   ShrinkSize(14);
475   ASSERT_EQ("EOF", Read());
476   ASSERT_EQ("", ReportMessage());
477   ASSERT_EQ(0, DroppedBytes());
478 }
479 
TEST(LogTest,PartialLastIsIgnored)480 TEST(LogTest, PartialLastIsIgnored) {
481   Write(BigString("bar", kBlockSize));
482   // Cause a bad record length in the LAST block.
483   ShrinkSize(1);
484   ASSERT_EQ("EOF", Read());
485   ASSERT_EQ("", ReportMessage());
486   ASSERT_EQ(0, DroppedBytes());
487 }
488 
TEST(LogTest,SkipIntoMultiRecord)489 TEST(LogTest, SkipIntoMultiRecord) {
490   // Consider a fragmented record:
491   //    first(R1), middle(R1), last(R1), first(R2)
492   // If initial_offset points to a record after first(R1) but before first(R2)
493   // incomplete fragment errors are not actual errors, and must be suppressed
494   // until a new first or full record is encountered.
495   Write(BigString("foo", 3*kBlockSize));
496   Write("correct");
497   StartReadingAt(kBlockSize);
498 
499   ASSERT_EQ("correct", Read());
500   ASSERT_EQ("", ReportMessage());
501   ASSERT_EQ(0, DroppedBytes());
502   ASSERT_EQ("EOF", Read());
503 }
504 
TEST(LogTest,ErrorJoinsRecords)505 TEST(LogTest, ErrorJoinsRecords) {
506   // Consider two fragmented records:
507   //    first(R1) last(R1) first(R2) last(R2)
508   // where the middle two fragments disappear.  We do not want
509   // first(R1),last(R2) to get joined and returned as a valid record.
510 
511   // Write records that span two blocks
512   Write(BigString("foo", kBlockSize));
513   Write(BigString("bar", kBlockSize));
514   Write("correct");
515 
516   // Wipe the middle block
517   for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
518     SetByte(offset, 'x');
519   }
520 
521   ASSERT_EQ("correct", Read());
522   ASSERT_EQ("EOF", Read());
523   const size_t dropped = DroppedBytes();
524   ASSERT_LE(dropped, 2*kBlockSize + 100);
525   ASSERT_GE(dropped, 2*kBlockSize);
526 }
527 
TEST(LogTest,ReadStart)528 TEST(LogTest, ReadStart) {
529   CheckInitialOffsetRecord(0, 0);
530 }
531 
TEST(LogTest,ReadSecondOneOff)532 TEST(LogTest, ReadSecondOneOff) {
533   CheckInitialOffsetRecord(1, 1);
534 }
535 
TEST(LogTest,ReadSecondTenThousand)536 TEST(LogTest, ReadSecondTenThousand) {
537   CheckInitialOffsetRecord(10000, 1);
538 }
539 
TEST(LogTest,ReadSecondStart)540 TEST(LogTest, ReadSecondStart) {
541   CheckInitialOffsetRecord(10007, 1);
542 }
543 
TEST(LogTest,ReadThirdOneOff)544 TEST(LogTest, ReadThirdOneOff) {
545   CheckInitialOffsetRecord(10008, 2);
546 }
547 
TEST(LogTest,ReadThirdStart)548 TEST(LogTest, ReadThirdStart) {
549   CheckInitialOffsetRecord(20014, 2);
550 }
551 
TEST(LogTest,ReadFourthOneOff)552 TEST(LogTest, ReadFourthOneOff) {
553   CheckInitialOffsetRecord(20015, 3);
554 }
555 
TEST(LogTest,ReadFourthFirstBlockTrailer)556 TEST(LogTest, ReadFourthFirstBlockTrailer) {
557   CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
558 }
559 
TEST(LogTest,ReadFourthMiddleBlock)560 TEST(LogTest, ReadFourthMiddleBlock) {
561   CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
562 }
563 
TEST(LogTest,ReadFourthLastBlock)564 TEST(LogTest, ReadFourthLastBlock) {
565   CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
566 }
567 
TEST(LogTest,ReadFourthStart)568 TEST(LogTest, ReadFourthStart) {
569   CheckInitialOffsetRecord(
570       2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
571       3);
572 }
573 
TEST(LogTest,ReadInitialOffsetIntoBlockPadding)574 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
575   CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
576 }
577 
TEST(LogTest,ReadEnd)578 TEST(LogTest, ReadEnd) {
579   CheckOffsetPastEndReturnsNoRecords(0);
580 }
581 
TEST(LogTest,ReadPastEnd)582 TEST(LogTest, ReadPastEnd) {
583   CheckOffsetPastEndReturnsNoRecords(5);
584 }
585 
586 }  // namespace log
587 }  // namespace leveldb
588 
main(int argc,char ** argv)589 int main(int argc, char** argv) {
590   return leveldb::test::RunAllTests();
591 }
592