1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12
13 namespace leveldb {
14 namespace log {
15
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19 std::string result;
20 while (result.size() < n) {
21 result.append(partial_string);
22 }
23 result.resize(n);
24 return result;
25 }
26
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29 char buf[50];
30 snprintf(buf, sizeof(buf), "%d.", n);
31 return std::string(buf);
32 }
33
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36 return BigString(NumberString(i), rnd->Skewed(17));
37 }
38
39 class LogTest {
40 public:
LogTest()41 LogTest()
42 : reading_(false),
43 writer_(new Writer(&dest_)),
44 reader_(new Reader(&source_, &report_, true /*checksum*/,
45 0 /*initial_offset*/)) {}
46
~LogTest()47 ~LogTest() {
48 delete writer_;
49 delete reader_;
50 }
51
ReopenForAppend()52 void ReopenForAppend() {
53 delete writer_;
54 writer_ = new Writer(&dest_, dest_.contents_.size());
55 }
56
Write(const std::string & msg)57 void Write(const std::string& msg) {
58 ASSERT_TRUE(!reading_) << "Write() after starting to read";
59 writer_->AddRecord(Slice(msg));
60 }
61
WrittenBytes() const62 size_t WrittenBytes() const { return dest_.contents_.size(); }
63
Read()64 std::string Read() {
65 if (!reading_) {
66 reading_ = true;
67 source_.contents_ = Slice(dest_.contents_);
68 }
69 std::string scratch;
70 Slice record;
71 if (reader_->ReadRecord(&record, &scratch)) {
72 return record.ToString();
73 } else {
74 return "EOF";
75 }
76 }
77
IncrementByte(int offset,int delta)78 void IncrementByte(int offset, int delta) {
79 dest_.contents_[offset] += delta;
80 }
81
SetByte(int offset,char new_byte)82 void SetByte(int offset, char new_byte) {
83 dest_.contents_[offset] = new_byte;
84 }
85
ShrinkSize(int bytes)86 void ShrinkSize(int bytes) {
87 dest_.contents_.resize(dest_.contents_.size() - bytes);
88 }
89
FixChecksum(int header_offset,int len)90 void FixChecksum(int header_offset, int len) {
91 // Compute crc of type/len/data
92 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
93 crc = crc32c::Mask(crc);
94 EncodeFixed32(&dest_.contents_[header_offset], crc);
95 }
96
ForceError()97 void ForceError() { source_.force_error_ = true; }
98
DroppedBytes() const99 size_t DroppedBytes() const { return report_.dropped_bytes_; }
100
ReportMessage() const101 std::string ReportMessage() const { return report_.message_; }
102
103 // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const104 std::string MatchError(const std::string& msg) const {
105 if (report_.message_.find(msg) == std::string::npos) {
106 return report_.message_;
107 } else {
108 return "OK";
109 }
110 }
111
WriteInitialOffsetLog()112 void WriteInitialOffsetLog() {
113 for (int i = 0; i < num_initial_offset_records_; i++) {
114 std::string record(initial_offset_record_sizes_[i],
115 static_cast<char>('a' + i));
116 Write(record);
117 }
118 }
119
StartReadingAt(uint64_t initial_offset)120 void StartReadingAt(uint64_t initial_offset) {
121 delete reader_;
122 reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
123 }
124
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)125 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
126 WriteInitialOffsetLog();
127 reading_ = true;
128 source_.contents_ = Slice(dest_.contents_);
129 Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
130 WrittenBytes() + offset_past_end);
131 Slice record;
132 std::string scratch;
133 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
134 delete offset_reader;
135 }
136
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)137 void CheckInitialOffsetRecord(uint64_t initial_offset,
138 int expected_record_offset) {
139 WriteInitialOffsetLog();
140 reading_ = true;
141 source_.contents_ = Slice(dest_.contents_);
142 Reader* offset_reader =
143 new Reader(&source_, &report_, true /*checksum*/, initial_offset);
144
145 // Read all records from expected_record_offset through the last one.
146 ASSERT_LT(expected_record_offset, num_initial_offset_records_);
147 for (; expected_record_offset < num_initial_offset_records_;
148 ++expected_record_offset) {
149 Slice record;
150 std::string scratch;
151 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
152 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
153 record.size());
154 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
155 offset_reader->LastRecordOffset());
156 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
157 }
158 delete offset_reader;
159 }
160
161 private:
162 class StringDest : public WritableFile {
163 public:
Close()164 Status Close() override { return Status::OK(); }
Flush()165 Status Flush() override { return Status::OK(); }
Sync()166 Status Sync() override { return Status::OK(); }
Append(const Slice & slice)167 Status Append(const Slice& slice) override {
168 contents_.append(slice.data(), slice.size());
169 return Status::OK();
170 }
GetName() const171 std::string GetName() const override { return ""; }
172
173 std::string contents_;
174 };
175
176 class StringSource : public SequentialFile {
177 public:
StringSource()178 StringSource() : force_error_(false), returned_partial_(false) {}
179
Read(size_t n,Slice * result,char * scratch)180 Status Read(size_t n, Slice* result, char* scratch) override {
181 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
182
183 if (force_error_) {
184 force_error_ = false;
185 returned_partial_ = true;
186 return Status::Corruption("read error");
187 }
188
189 if (contents_.size() < n) {
190 n = contents_.size();
191 returned_partial_ = true;
192 }
193 *result = Slice(contents_.data(), n);
194 contents_.remove_prefix(n);
195 return Status::OK();
196 }
197
Skip(uint64_t n)198 Status Skip(uint64_t n) override {
199 if (n > contents_.size()) {
200 contents_.clear();
201 return Status::NotFound("in-memory file skipped past end");
202 }
203
204 contents_.remove_prefix(n);
205
206 return Status::OK();
207 }
GetName() const208 std::string GetName() const { return ""; }
209
210 Slice contents_;
211 bool force_error_;
212 bool returned_partial_;
213 };
214
215 class ReportCollector : public Reader::Reporter {
216 public:
ReportCollector()217 ReportCollector() : dropped_bytes_(0) {}
Corruption(size_t bytes,const Status & status)218 void Corruption(size_t bytes, const Status& status) override {
219 dropped_bytes_ += bytes;
220 message_.append(status.ToString());
221 }
222
223 size_t dropped_bytes_;
224 std::string message_;
225 };
226
227 // Record metadata for testing initial offset functionality
228 static size_t initial_offset_record_sizes_[];
229 static uint64_t initial_offset_last_record_offsets_[];
230 static int num_initial_offset_records_;
231
232 StringDest dest_;
233 StringSource source_;
234 ReportCollector report_;
235 bool reading_;
236 Writer* writer_;
237 Reader* reader_;
238 };
239
240 size_t LogTest::initial_offset_record_sizes_[] = {
241 10000, // Two sizable records in first block
242 10000,
243 2 * log::kBlockSize - 1000, // Span three blocks
244 1,
245 13716, // Consume all but two bytes of block 3.
246 log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
247 };
248
249 uint64_t LogTest::initial_offset_last_record_offsets_[] = {
250 0,
251 kHeaderSize + 10000,
252 2 * (kHeaderSize + 10000),
253 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
254 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
255 kHeaderSize + 1,
256 3 * log::kBlockSize,
257 };
258
259 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
260 int LogTest::num_initial_offset_records_ =
261 sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
262
TEST(LogTest,Empty)263 TEST(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
264
TEST(LogTest,ReadWrite)265 TEST(LogTest, ReadWrite) {
266 Write("foo");
267 Write("bar");
268 Write("");
269 Write("xxxx");
270 ASSERT_EQ("foo", Read());
271 ASSERT_EQ("bar", Read());
272 ASSERT_EQ("", Read());
273 ASSERT_EQ("xxxx", Read());
274 ASSERT_EQ("EOF", Read());
275 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
276 }
277
TEST(LogTest,ManyBlocks)278 TEST(LogTest, ManyBlocks) {
279 for (int i = 0; i < 100000; i++) {
280 Write(NumberString(i));
281 }
282 for (int i = 0; i < 100000; i++) {
283 ASSERT_EQ(NumberString(i), Read());
284 }
285 ASSERT_EQ("EOF", Read());
286 }
287
TEST(LogTest,Fragmentation)288 TEST(LogTest, Fragmentation) {
289 Write("small");
290 Write(BigString("medium", 50000));
291 Write(BigString("large", 100000));
292 ASSERT_EQ("small", Read());
293 ASSERT_EQ(BigString("medium", 50000), Read());
294 ASSERT_EQ(BigString("large", 100000), Read());
295 ASSERT_EQ("EOF", Read());
296 }
297
TEST(LogTest,MarginalTrailer)298 TEST(LogTest, MarginalTrailer) {
299 // Make a trailer that is exactly the same length as an empty record.
300 const int n = kBlockSize - 2 * kHeaderSize;
301 Write(BigString("foo", n));
302 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
303 Write("");
304 Write("bar");
305 ASSERT_EQ(BigString("foo", n), Read());
306 ASSERT_EQ("", Read());
307 ASSERT_EQ("bar", Read());
308 ASSERT_EQ("EOF", Read());
309 }
310
TEST(LogTest,MarginalTrailer2)311 TEST(LogTest, MarginalTrailer2) {
312 // Make a trailer that is exactly the same length as an empty record.
313 const int n = kBlockSize - 2 * kHeaderSize;
314 Write(BigString("foo", n));
315 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
316 Write("bar");
317 ASSERT_EQ(BigString("foo", n), Read());
318 ASSERT_EQ("bar", Read());
319 ASSERT_EQ("EOF", Read());
320 ASSERT_EQ(0, DroppedBytes());
321 ASSERT_EQ("", ReportMessage());
322 }
323
TEST(LogTest,ShortTrailer)324 TEST(LogTest, ShortTrailer) {
325 const int n = kBlockSize - 2 * kHeaderSize + 4;
326 Write(BigString("foo", n));
327 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
328 Write("");
329 Write("bar");
330 ASSERT_EQ(BigString("foo", n), Read());
331 ASSERT_EQ("", Read());
332 ASSERT_EQ("bar", Read());
333 ASSERT_EQ("EOF", Read());
334 }
335
TEST(LogTest,AlignedEof)336 TEST(LogTest, AlignedEof) {
337 const int n = kBlockSize - 2 * kHeaderSize + 4;
338 Write(BigString("foo", n));
339 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
340 ASSERT_EQ(BigString("foo", n), Read());
341 ASSERT_EQ("EOF", Read());
342 }
343
TEST(LogTest,OpenForAppend)344 TEST(LogTest, OpenForAppend) {
345 Write("hello");
346 ReopenForAppend();
347 Write("world");
348 ASSERT_EQ("hello", Read());
349 ASSERT_EQ("world", Read());
350 ASSERT_EQ("EOF", Read());
351 }
352
TEST(LogTest,RandomRead)353 TEST(LogTest, RandomRead) {
354 const int N = 500;
355 Random write_rnd(301);
356 for (int i = 0; i < N; i++) {
357 Write(RandomSkewedString(i, &write_rnd));
358 }
359 Random read_rnd(301);
360 for (int i = 0; i < N; i++) {
361 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
362 }
363 ASSERT_EQ("EOF", Read());
364 }
365
366 // Tests of all the error paths in log_reader.cc follow:
367
TEST(LogTest,ReadError)368 TEST(LogTest, ReadError) {
369 Write("foo");
370 ForceError();
371 ASSERT_EQ("EOF", Read());
372 ASSERT_EQ(kBlockSize, DroppedBytes());
373 ASSERT_EQ("OK", MatchError("read error"));
374 }
375
TEST(LogTest,BadRecordType)376 TEST(LogTest, BadRecordType) {
377 Write("foo");
378 // Type is stored in header[6]
379 IncrementByte(6, 100);
380 FixChecksum(0, 3);
381 ASSERT_EQ("EOF", Read());
382 ASSERT_EQ(3, DroppedBytes());
383 ASSERT_EQ("OK", MatchError("unknown record type"));
384 }
385
TEST(LogTest,TruncatedTrailingRecordIsIgnored)386 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
387 Write("foo");
388 ShrinkSize(4); // Drop all payload as well as a header byte
389 ASSERT_EQ("EOF", Read());
390 // Truncated last record is ignored, not treated as an error.
391 ASSERT_EQ(0, DroppedBytes());
392 ASSERT_EQ("", ReportMessage());
393 }
394
TEST(LogTest,BadLength)395 TEST(LogTest, BadLength) {
396 const int kPayloadSize = kBlockSize - kHeaderSize;
397 Write(BigString("bar", kPayloadSize));
398 Write("foo");
399 // Least significant size byte is stored in header[4].
400 IncrementByte(4, 1);
401 ASSERT_EQ("foo", Read());
402 ASSERT_EQ(kBlockSize, DroppedBytes());
403 ASSERT_EQ("OK", MatchError("bad record length"));
404 }
405
TEST(LogTest,BadLengthAtEndIsIgnored)406 TEST(LogTest, BadLengthAtEndIsIgnored) {
407 Write("foo");
408 ShrinkSize(1);
409 ASSERT_EQ("EOF", Read());
410 ASSERT_EQ(0, DroppedBytes());
411 ASSERT_EQ("", ReportMessage());
412 }
413
TEST(LogTest,ChecksumMismatch)414 TEST(LogTest, ChecksumMismatch) {
415 Write("foo");
416 IncrementByte(0, 10);
417 ASSERT_EQ("EOF", Read());
418 ASSERT_EQ(10, DroppedBytes());
419 ASSERT_EQ("OK", MatchError("checksum mismatch"));
420 }
421
TEST(LogTest,UnexpectedMiddleType)422 TEST(LogTest, UnexpectedMiddleType) {
423 Write("foo");
424 SetByte(6, kMiddleType);
425 FixChecksum(0, 3);
426 ASSERT_EQ("EOF", Read());
427 ASSERT_EQ(3, DroppedBytes());
428 ASSERT_EQ("OK", MatchError("missing start"));
429 }
430
TEST(LogTest,UnexpectedLastType)431 TEST(LogTest, UnexpectedLastType) {
432 Write("foo");
433 SetByte(6, kLastType);
434 FixChecksum(0, 3);
435 ASSERT_EQ("EOF", Read());
436 ASSERT_EQ(3, DroppedBytes());
437 ASSERT_EQ("OK", MatchError("missing start"));
438 }
439
TEST(LogTest,UnexpectedFullType)440 TEST(LogTest, UnexpectedFullType) {
441 Write("foo");
442 Write("bar");
443 SetByte(6, kFirstType);
444 FixChecksum(0, 3);
445 ASSERT_EQ("bar", Read());
446 ASSERT_EQ("EOF", Read());
447 ASSERT_EQ(3, DroppedBytes());
448 ASSERT_EQ("OK", MatchError("partial record without end"));
449 }
450
TEST(LogTest,UnexpectedFirstType)451 TEST(LogTest, UnexpectedFirstType) {
452 Write("foo");
453 Write(BigString("bar", 100000));
454 SetByte(6, kFirstType);
455 FixChecksum(0, 3);
456 ASSERT_EQ(BigString("bar", 100000), Read());
457 ASSERT_EQ("EOF", Read());
458 ASSERT_EQ(3, DroppedBytes());
459 ASSERT_EQ("OK", MatchError("partial record without end"));
460 }
461
TEST(LogTest,MissingLastIsIgnored)462 TEST(LogTest, MissingLastIsIgnored) {
463 Write(BigString("bar", kBlockSize));
464 // Remove the LAST block, including header.
465 ShrinkSize(14);
466 ASSERT_EQ("EOF", Read());
467 ASSERT_EQ("", ReportMessage());
468 ASSERT_EQ(0, DroppedBytes());
469 }
470
TEST(LogTest,PartialLastIsIgnored)471 TEST(LogTest, PartialLastIsIgnored) {
472 Write(BigString("bar", kBlockSize));
473 // Cause a bad record length in the LAST block.
474 ShrinkSize(1);
475 ASSERT_EQ("EOF", Read());
476 ASSERT_EQ("", ReportMessage());
477 ASSERT_EQ(0, DroppedBytes());
478 }
479
TEST(LogTest,SkipIntoMultiRecord)480 TEST(LogTest, SkipIntoMultiRecord) {
481 // Consider a fragmented record:
482 // first(R1), middle(R1), last(R1), first(R2)
483 // If initial_offset points to a record after first(R1) but before first(R2)
484 // incomplete fragment errors are not actual errors, and must be suppressed
485 // until a new first or full record is encountered.
486 Write(BigString("foo", 3 * kBlockSize));
487 Write("correct");
488 StartReadingAt(kBlockSize);
489
490 ASSERT_EQ("correct", Read());
491 ASSERT_EQ("", ReportMessage());
492 ASSERT_EQ(0, DroppedBytes());
493 ASSERT_EQ("EOF", Read());
494 }
495
TEST(LogTest,ErrorJoinsRecords)496 TEST(LogTest, ErrorJoinsRecords) {
497 // Consider two fragmented records:
498 // first(R1) last(R1) first(R2) last(R2)
499 // where the middle two fragments disappear. We do not want
500 // first(R1),last(R2) to get joined and returned as a valid record.
501
502 // Write records that span two blocks
503 Write(BigString("foo", kBlockSize));
504 Write(BigString("bar", kBlockSize));
505 Write("correct");
506
507 // Wipe the middle block
508 for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
509 SetByte(offset, 'x');
510 }
511
512 ASSERT_EQ("correct", Read());
513 ASSERT_EQ("EOF", Read());
514 const size_t dropped = DroppedBytes();
515 ASSERT_LE(dropped, 2 * kBlockSize + 100);
516 ASSERT_GE(dropped, 2 * kBlockSize);
517 }
518
TEST(LogTest,ReadStart)519 TEST(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
520
TEST(LogTest,ReadSecondOneOff)521 TEST(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
522
TEST(LogTest,ReadSecondTenThousand)523 TEST(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
524
TEST(LogTest,ReadSecondStart)525 TEST(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
526
TEST(LogTest,ReadThirdOneOff)527 TEST(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
528
TEST(LogTest,ReadThirdStart)529 TEST(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
530
TEST(LogTest,ReadFourthOneOff)531 TEST(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
532
TEST(LogTest,ReadFourthFirstBlockTrailer)533 TEST(LogTest, ReadFourthFirstBlockTrailer) {
534 CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
535 }
536
TEST(LogTest,ReadFourthMiddleBlock)537 TEST(LogTest, ReadFourthMiddleBlock) {
538 CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
539 }
540
TEST(LogTest,ReadFourthLastBlock)541 TEST(LogTest, ReadFourthLastBlock) {
542 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
543 }
544
TEST(LogTest,ReadFourthStart)545 TEST(LogTest, ReadFourthStart) {
546 CheckInitialOffsetRecord(
547 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
548 3);
549 }
550
TEST(LogTest,ReadInitialOffsetIntoBlockPadding)551 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
552 CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
553 }
554
TEST(LogTest,ReadEnd)555 TEST(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
556
TEST(LogTest,ReadPastEnd)557 TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
558
559 } // namespace log
560 } // namespace leveldb
561
main(int argc,char ** argv)562 int main(int argc, char** argv) { return leveldb::test::RunAllTests(); }
563