1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12 
13 namespace leveldb {
14 namespace log {
15 
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19   std::string result;
20   while (result.size() < n) {
21     result.append(partial_string);
22   }
23   result.resize(n);
24   return result;
25 }
26 
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29   char buf[50];
30   snprintf(buf, sizeof(buf), "%d.", n);
31   return std::string(buf);
32 }
33 
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36   return BigString(NumberString(i), rnd->Skewed(17));
37 }
38 
39 class LogTest {
40  public:
LogTest()41   LogTest()
42       : reading_(false),
43         writer_(new Writer(&dest_)),
44         reader_(new Reader(&source_, &report_, true /*checksum*/,
45                            0 /*initial_offset*/)) {}
46 
~LogTest()47   ~LogTest() {
48     delete writer_;
49     delete reader_;
50   }
51 
ReopenForAppend()52   void ReopenForAppend() {
53     delete writer_;
54     writer_ = new Writer(&dest_, dest_.contents_.size());
55   }
56 
Write(const std::string & msg)57   void Write(const std::string& msg) {
58     ASSERT_TRUE(!reading_) << "Write() after starting to read";
59     writer_->AddRecord(Slice(msg));
60   }
61 
WrittenBytes() const62   size_t WrittenBytes() const { return dest_.contents_.size(); }
63 
Read()64   std::string Read() {
65     if (!reading_) {
66       reading_ = true;
67       source_.contents_ = Slice(dest_.contents_);
68     }
69     std::string scratch;
70     Slice record;
71     if (reader_->ReadRecord(&record, &scratch)) {
72       return record.ToString();
73     } else {
74       return "EOF";
75     }
76   }
77 
IncrementByte(int offset,int delta)78   void IncrementByte(int offset, int delta) {
79     dest_.contents_[offset] += delta;
80   }
81 
SetByte(int offset,char new_byte)82   void SetByte(int offset, char new_byte) {
83     dest_.contents_[offset] = new_byte;
84   }
85 
ShrinkSize(int bytes)86   void ShrinkSize(int bytes) {
87     dest_.contents_.resize(dest_.contents_.size() - bytes);
88   }
89 
FixChecksum(int header_offset,int len)90   void FixChecksum(int header_offset, int len) {
91     // Compute crc of type/len/data
92     uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
93     crc = crc32c::Mask(crc);
94     EncodeFixed32(&dest_.contents_[header_offset], crc);
95   }
96 
ForceError()97   void ForceError() { source_.force_error_ = true; }
98 
DroppedBytes() const99   size_t DroppedBytes() const { return report_.dropped_bytes_; }
100 
ReportMessage() const101   std::string ReportMessage() const { return report_.message_; }
102 
103   // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const104   std::string MatchError(const std::string& msg) const {
105     if (report_.message_.find(msg) == std::string::npos) {
106       return report_.message_;
107     } else {
108       return "OK";
109     }
110   }
111 
WriteInitialOffsetLog()112   void WriteInitialOffsetLog() {
113     for (int i = 0; i < num_initial_offset_records_; i++) {
114       std::string record(initial_offset_record_sizes_[i],
115                          static_cast<char>('a' + i));
116       Write(record);
117     }
118   }
119 
StartReadingAt(uint64_t initial_offset)120   void StartReadingAt(uint64_t initial_offset) {
121     delete reader_;
122     reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
123   }
124 
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)125   void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
126     WriteInitialOffsetLog();
127     reading_ = true;
128     source_.contents_ = Slice(dest_.contents_);
129     Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
130                                        WrittenBytes() + offset_past_end);
131     Slice record;
132     std::string scratch;
133     ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
134     delete offset_reader;
135   }
136 
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)137   void CheckInitialOffsetRecord(uint64_t initial_offset,
138                                 int expected_record_offset) {
139     WriteInitialOffsetLog();
140     reading_ = true;
141     source_.contents_ = Slice(dest_.contents_);
142     Reader* offset_reader =
143         new Reader(&source_, &report_, true /*checksum*/, initial_offset);
144 
145     // Read all records from expected_record_offset through the last one.
146     ASSERT_LT(expected_record_offset, num_initial_offset_records_);
147     for (; expected_record_offset < num_initial_offset_records_;
148          ++expected_record_offset) {
149       Slice record;
150       std::string scratch;
151       ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
152       ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
153                 record.size());
154       ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
155                 offset_reader->LastRecordOffset());
156       ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
157     }
158     delete offset_reader;
159   }
160 
161  private:
162   class StringDest : public WritableFile {
163    public:
Close()164     Status Close() override { return Status::OK(); }
Flush()165     Status Flush() override { return Status::OK(); }
Sync()166     Status Sync() override { return Status::OK(); }
Append(const Slice & slice)167     Status Append(const Slice& slice) override {
168       contents_.append(slice.data(), slice.size());
169       return Status::OK();
170     }
GetName() const171     std::string GetName() const override { return ""; }
172 
173     std::string contents_;
174   };
175 
176   class StringSource : public SequentialFile {
177    public:
StringSource()178     StringSource() : force_error_(false), returned_partial_(false) {}
179 
Read(size_t n,Slice * result,char * scratch)180     Status Read(size_t n, Slice* result, char* scratch) override {
181       ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
182 
183       if (force_error_) {
184         force_error_ = false;
185         returned_partial_ = true;
186         return Status::Corruption("read error");
187       }
188 
189       if (contents_.size() < n) {
190         n = contents_.size();
191         returned_partial_ = true;
192       }
193       *result = Slice(contents_.data(), n);
194       contents_.remove_prefix(n);
195       return Status::OK();
196     }
197 
Skip(uint64_t n)198     Status Skip(uint64_t n) override {
199       if (n > contents_.size()) {
200         contents_.clear();
201         return Status::NotFound("in-memory file skipped past end");
202       }
203 
204       contents_.remove_prefix(n);
205 
206       return Status::OK();
207     }
GetName() const208     std::string GetName() const { return ""; }
209 
210     Slice contents_;
211     bool force_error_;
212     bool returned_partial_;
213   };
214 
215   class ReportCollector : public Reader::Reporter {
216    public:
ReportCollector()217     ReportCollector() : dropped_bytes_(0) {}
Corruption(size_t bytes,const Status & status)218     void Corruption(size_t bytes, const Status& status) override {
219       dropped_bytes_ += bytes;
220       message_.append(status.ToString());
221     }
222 
223     size_t dropped_bytes_;
224     std::string message_;
225   };
226 
227   // Record metadata for testing initial offset functionality
228   static size_t initial_offset_record_sizes_[];
229   static uint64_t initial_offset_last_record_offsets_[];
230   static int num_initial_offset_records_;
231 
232   StringDest dest_;
233   StringSource source_;
234   ReportCollector report_;
235   bool reading_;
236   Writer* writer_;
237   Reader* reader_;
238 };
239 
240 size_t LogTest::initial_offset_record_sizes_[] = {
241     10000,  // Two sizable records in first block
242     10000,
243     2 * log::kBlockSize - 1000,  // Span three blocks
244     1,
245     13716,                          // Consume all but two bytes of block 3.
246     log::kBlockSize - kHeaderSize,  // Consume the entirety of block 4.
247 };
248 
249 uint64_t LogTest::initial_offset_last_record_offsets_[] = {
250     0,
251     kHeaderSize + 10000,
252     2 * (kHeaderSize + 10000),
253     2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
254     2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
255         kHeaderSize + 1,
256     3 * log::kBlockSize,
257 };
258 
259 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
260 int LogTest::num_initial_offset_records_ =
261     sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
262 
TEST(LogTest,Empty)263 TEST(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
264 
TEST(LogTest,ReadWrite)265 TEST(LogTest, ReadWrite) {
266   Write("foo");
267   Write("bar");
268   Write("");
269   Write("xxxx");
270   ASSERT_EQ("foo", Read());
271   ASSERT_EQ("bar", Read());
272   ASSERT_EQ("", Read());
273   ASSERT_EQ("xxxx", Read());
274   ASSERT_EQ("EOF", Read());
275   ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
276 }
277 
TEST(LogTest,ManyBlocks)278 TEST(LogTest, ManyBlocks) {
279   for (int i = 0; i < 100000; i++) {
280     Write(NumberString(i));
281   }
282   for (int i = 0; i < 100000; i++) {
283     ASSERT_EQ(NumberString(i), Read());
284   }
285   ASSERT_EQ("EOF", Read());
286 }
287 
TEST(LogTest,Fragmentation)288 TEST(LogTest, Fragmentation) {
289   Write("small");
290   Write(BigString("medium", 50000));
291   Write(BigString("large", 100000));
292   ASSERT_EQ("small", Read());
293   ASSERT_EQ(BigString("medium", 50000), Read());
294   ASSERT_EQ(BigString("large", 100000), Read());
295   ASSERT_EQ("EOF", Read());
296 }
297 
TEST(LogTest,MarginalTrailer)298 TEST(LogTest, MarginalTrailer) {
299   // Make a trailer that is exactly the same length as an empty record.
300   const int n = kBlockSize - 2 * kHeaderSize;
301   Write(BigString("foo", n));
302   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
303   Write("");
304   Write("bar");
305   ASSERT_EQ(BigString("foo", n), Read());
306   ASSERT_EQ("", Read());
307   ASSERT_EQ("bar", Read());
308   ASSERT_EQ("EOF", Read());
309 }
310 
TEST(LogTest,MarginalTrailer2)311 TEST(LogTest, MarginalTrailer2) {
312   // Make a trailer that is exactly the same length as an empty record.
313   const int n = kBlockSize - 2 * kHeaderSize;
314   Write(BigString("foo", n));
315   ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
316   Write("bar");
317   ASSERT_EQ(BigString("foo", n), Read());
318   ASSERT_EQ("bar", Read());
319   ASSERT_EQ("EOF", Read());
320   ASSERT_EQ(0, DroppedBytes());
321   ASSERT_EQ("", ReportMessage());
322 }
323 
TEST(LogTest,ShortTrailer)324 TEST(LogTest, ShortTrailer) {
325   const int n = kBlockSize - 2 * kHeaderSize + 4;
326   Write(BigString("foo", n));
327   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
328   Write("");
329   Write("bar");
330   ASSERT_EQ(BigString("foo", n), Read());
331   ASSERT_EQ("", Read());
332   ASSERT_EQ("bar", Read());
333   ASSERT_EQ("EOF", Read());
334 }
335 
TEST(LogTest,AlignedEof)336 TEST(LogTest, AlignedEof) {
337   const int n = kBlockSize - 2 * kHeaderSize + 4;
338   Write(BigString("foo", n));
339   ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
340   ASSERT_EQ(BigString("foo", n), Read());
341   ASSERT_EQ("EOF", Read());
342 }
343 
TEST(LogTest,OpenForAppend)344 TEST(LogTest, OpenForAppend) {
345   Write("hello");
346   ReopenForAppend();
347   Write("world");
348   ASSERT_EQ("hello", Read());
349   ASSERT_EQ("world", Read());
350   ASSERT_EQ("EOF", Read());
351 }
352 
TEST(LogTest,RandomRead)353 TEST(LogTest, RandomRead) {
354   const int N = 500;
355   Random write_rnd(301);
356   for (int i = 0; i < N; i++) {
357     Write(RandomSkewedString(i, &write_rnd));
358   }
359   Random read_rnd(301);
360   for (int i = 0; i < N; i++) {
361     ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
362   }
363   ASSERT_EQ("EOF", Read());
364 }
365 
366 // Tests of all the error paths in log_reader.cc follow:
367 
TEST(LogTest,ReadError)368 TEST(LogTest, ReadError) {
369   Write("foo");
370   ForceError();
371   ASSERT_EQ("EOF", Read());
372   ASSERT_EQ(kBlockSize, DroppedBytes());
373   ASSERT_EQ("OK", MatchError("read error"));
374 }
375 
TEST(LogTest,BadRecordType)376 TEST(LogTest, BadRecordType) {
377   Write("foo");
378   // Type is stored in header[6]
379   IncrementByte(6, 100);
380   FixChecksum(0, 3);
381   ASSERT_EQ("EOF", Read());
382   ASSERT_EQ(3, DroppedBytes());
383   ASSERT_EQ("OK", MatchError("unknown record type"));
384 }
385 
TEST(LogTest,TruncatedTrailingRecordIsIgnored)386 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
387   Write("foo");
388   ShrinkSize(4);  // Drop all payload as well as a header byte
389   ASSERT_EQ("EOF", Read());
390   // Truncated last record is ignored, not treated as an error.
391   ASSERT_EQ(0, DroppedBytes());
392   ASSERT_EQ("", ReportMessage());
393 }
394 
TEST(LogTest,BadLength)395 TEST(LogTest, BadLength) {
396   const int kPayloadSize = kBlockSize - kHeaderSize;
397   Write(BigString("bar", kPayloadSize));
398   Write("foo");
399   // Least significant size byte is stored in header[4].
400   IncrementByte(4, 1);
401   ASSERT_EQ("foo", Read());
402   ASSERT_EQ(kBlockSize, DroppedBytes());
403   ASSERT_EQ("OK", MatchError("bad record length"));
404 }
405 
TEST(LogTest,BadLengthAtEndIsIgnored)406 TEST(LogTest, BadLengthAtEndIsIgnored) {
407   Write("foo");
408   ShrinkSize(1);
409   ASSERT_EQ("EOF", Read());
410   ASSERT_EQ(0, DroppedBytes());
411   ASSERT_EQ("", ReportMessage());
412 }
413 
TEST(LogTest,ChecksumMismatch)414 TEST(LogTest, ChecksumMismatch) {
415   Write("foo");
416   IncrementByte(0, 10);
417   ASSERT_EQ("EOF", Read());
418   ASSERT_EQ(10, DroppedBytes());
419   ASSERT_EQ("OK", MatchError("checksum mismatch"));
420 }
421 
TEST(LogTest,UnexpectedMiddleType)422 TEST(LogTest, UnexpectedMiddleType) {
423   Write("foo");
424   SetByte(6, kMiddleType);
425   FixChecksum(0, 3);
426   ASSERT_EQ("EOF", Read());
427   ASSERT_EQ(3, DroppedBytes());
428   ASSERT_EQ("OK", MatchError("missing start"));
429 }
430 
TEST(LogTest,UnexpectedLastType)431 TEST(LogTest, UnexpectedLastType) {
432   Write("foo");
433   SetByte(6, kLastType);
434   FixChecksum(0, 3);
435   ASSERT_EQ("EOF", Read());
436   ASSERT_EQ(3, DroppedBytes());
437   ASSERT_EQ("OK", MatchError("missing start"));
438 }
439 
TEST(LogTest,UnexpectedFullType)440 TEST(LogTest, UnexpectedFullType) {
441   Write("foo");
442   Write("bar");
443   SetByte(6, kFirstType);
444   FixChecksum(0, 3);
445   ASSERT_EQ("bar", Read());
446   ASSERT_EQ("EOF", Read());
447   ASSERT_EQ(3, DroppedBytes());
448   ASSERT_EQ("OK", MatchError("partial record without end"));
449 }
450 
TEST(LogTest,UnexpectedFirstType)451 TEST(LogTest, UnexpectedFirstType) {
452   Write("foo");
453   Write(BigString("bar", 100000));
454   SetByte(6, kFirstType);
455   FixChecksum(0, 3);
456   ASSERT_EQ(BigString("bar", 100000), Read());
457   ASSERT_EQ("EOF", Read());
458   ASSERT_EQ(3, DroppedBytes());
459   ASSERT_EQ("OK", MatchError("partial record without end"));
460 }
461 
TEST(LogTest,MissingLastIsIgnored)462 TEST(LogTest, MissingLastIsIgnored) {
463   Write(BigString("bar", kBlockSize));
464   // Remove the LAST block, including header.
465   ShrinkSize(14);
466   ASSERT_EQ("EOF", Read());
467   ASSERT_EQ("", ReportMessage());
468   ASSERT_EQ(0, DroppedBytes());
469 }
470 
TEST(LogTest,PartialLastIsIgnored)471 TEST(LogTest, PartialLastIsIgnored) {
472   Write(BigString("bar", kBlockSize));
473   // Cause a bad record length in the LAST block.
474   ShrinkSize(1);
475   ASSERT_EQ("EOF", Read());
476   ASSERT_EQ("", ReportMessage());
477   ASSERT_EQ(0, DroppedBytes());
478 }
479 
TEST(LogTest,SkipIntoMultiRecord)480 TEST(LogTest, SkipIntoMultiRecord) {
481   // Consider a fragmented record:
482   //    first(R1), middle(R1), last(R1), first(R2)
483   // If initial_offset points to a record after first(R1) but before first(R2)
484   // incomplete fragment errors are not actual errors, and must be suppressed
485   // until a new first or full record is encountered.
486   Write(BigString("foo", 3 * kBlockSize));
487   Write("correct");
488   StartReadingAt(kBlockSize);
489 
490   ASSERT_EQ("correct", Read());
491   ASSERT_EQ("", ReportMessage());
492   ASSERT_EQ(0, DroppedBytes());
493   ASSERT_EQ("EOF", Read());
494 }
495 
TEST(LogTest,ErrorJoinsRecords)496 TEST(LogTest, ErrorJoinsRecords) {
497   // Consider two fragmented records:
498   //    first(R1) last(R1) first(R2) last(R2)
499   // where the middle two fragments disappear.  We do not want
500   // first(R1),last(R2) to get joined and returned as a valid record.
501 
502   // Write records that span two blocks
503   Write(BigString("foo", kBlockSize));
504   Write(BigString("bar", kBlockSize));
505   Write("correct");
506 
507   // Wipe the middle block
508   for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
509     SetByte(offset, 'x');
510   }
511 
512   ASSERT_EQ("correct", Read());
513   ASSERT_EQ("EOF", Read());
514   const size_t dropped = DroppedBytes();
515   ASSERT_LE(dropped, 2 * kBlockSize + 100);
516   ASSERT_GE(dropped, 2 * kBlockSize);
517 }
518 
TEST(LogTest,ReadStart)519 TEST(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
520 
TEST(LogTest,ReadSecondOneOff)521 TEST(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
522 
TEST(LogTest,ReadSecondTenThousand)523 TEST(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
524 
TEST(LogTest,ReadSecondStart)525 TEST(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
526 
TEST(LogTest,ReadThirdOneOff)527 TEST(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
528 
TEST(LogTest,ReadThirdStart)529 TEST(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
530 
TEST(LogTest,ReadFourthOneOff)531 TEST(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
532 
TEST(LogTest,ReadFourthFirstBlockTrailer)533 TEST(LogTest, ReadFourthFirstBlockTrailer) {
534   CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
535 }
536 
TEST(LogTest,ReadFourthMiddleBlock)537 TEST(LogTest, ReadFourthMiddleBlock) {
538   CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
539 }
540 
TEST(LogTest,ReadFourthLastBlock)541 TEST(LogTest, ReadFourthLastBlock) {
542   CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
543 }
544 
TEST(LogTest,ReadFourthStart)545 TEST(LogTest, ReadFourthStart) {
546   CheckInitialOffsetRecord(
547       2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
548       3);
549 }
550 
TEST(LogTest,ReadInitialOffsetIntoBlockPadding)551 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
552   CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
553 }
554 
TEST(LogTest,ReadEnd)555 TEST(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
556 
TEST(LogTest,ReadPastEnd)557 TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
558 
559 }  // namespace log
560 }  // namespace leveldb
561 
main(int argc,char ** argv)562 int main(int argc, char** argv) { return leveldb::test::RunAllTests(); }
563