1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/log_reader.h"
6 #include "db/log_writer.h"
7 #include "leveldb/env.h"
8 #include "util/coding.h"
9 #include "util/crc32c.h"
10 #include "util/random.h"
11 #include "util/testharness.h"
12
13 namespace leveldb {
14 namespace log {
15
16 // Construct a string of the specified length made out of the supplied
17 // partial string.
BigString(const std::string & partial_string,size_t n)18 static std::string BigString(const std::string& partial_string, size_t n) {
19 std::string result;
20 while (result.size() < n) {
21 result.append(partial_string);
22 }
23 result.resize(n);
24 return result;
25 }
26
27 // Construct a string from a number
NumberString(int n)28 static std::string NumberString(int n) {
29 char buf[50];
30 snprintf(buf, sizeof(buf), "%d.", n);
31 return std::string(buf);
32 }
33
34 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)35 static std::string RandomSkewedString(int i, Random* rnd) {
36 return BigString(NumberString(i), rnd->Skewed(17));
37 }
38
39 class LogTest {
40 private:
41 class StringDest : public WritableFile {
42 public:
43 std::string contents_;
44
Close()45 virtual Status Close() { return Status::OK(); }
Flush()46 virtual Status Flush() { return Status::OK(); }
Sync()47 virtual Status Sync() { return Status::OK(); }
Append(const Slice & slice)48 virtual Status Append(const Slice& slice) {
49 contents_.append(slice.data(), slice.size());
50 return Status::OK();
51 }
52 };
53
54 class StringSource : public SequentialFile {
55 public:
56 Slice contents_;
57 bool force_error_;
58 bool returned_partial_;
StringSource()59 StringSource() : force_error_(false), returned_partial_(false) { }
60
Read(size_t n,Slice * result,char * scratch)61 virtual Status Read(size_t n, Slice* result, char* scratch) {
62 ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
63
64 if (force_error_) {
65 force_error_ = false;
66 returned_partial_ = true;
67 return Status::Corruption("read error");
68 }
69
70 if (contents_.size() < n) {
71 n = contents_.size();
72 returned_partial_ = true;
73 }
74 *result = Slice(contents_.data(), n);
75 contents_.remove_prefix(n);
76 return Status::OK();
77 }
78
Skip(uint64_t n)79 virtual Status Skip(uint64_t n) {
80 if (n > contents_.size()) {
81 contents_.clear();
82 return Status::NotFound("in-memory file skipped past end");
83 }
84
85 contents_.remove_prefix(n);
86
87 return Status::OK();
88 }
89 };
90
91 class ReportCollector : public Reader::Reporter {
92 public:
93 size_t dropped_bytes_;
94 std::string message_;
95
ReportCollector()96 ReportCollector() : dropped_bytes_(0) { }
Corruption(size_t bytes,const Status & status)97 virtual void Corruption(size_t bytes, const Status& status) {
98 dropped_bytes_ += bytes;
99 message_.append(status.ToString());
100 }
101 };
102
103 StringDest dest_;
104 StringSource source_;
105 ReportCollector report_;
106 bool reading_;
107 Writer* writer_;
108 Reader* reader_;
109
110 // Record metadata for testing initial offset functionality
111 static size_t initial_offset_record_sizes_[];
112 static uint64_t initial_offset_last_record_offsets_[];
113 static int num_initial_offset_records_;
114
115 public:
LogTest()116 LogTest() : reading_(false),
117 writer_(new Writer(&dest_)),
118 reader_(new Reader(&source_, &report_, true/*checksum*/,
119 0/*initial_offset*/)) {
120 }
121
~LogTest()122 ~LogTest() {
123 delete writer_;
124 delete reader_;
125 }
126
ReopenForAppend()127 void ReopenForAppend() {
128 delete writer_;
129 writer_ = new Writer(&dest_, dest_.contents_.size());
130 }
131
Write(const std::string & msg)132 void Write(const std::string& msg) {
133 ASSERT_TRUE(!reading_) << "Write() after starting to read";
134 writer_->AddRecord(Slice(msg));
135 }
136
WrittenBytes() const137 size_t WrittenBytes() const {
138 return dest_.contents_.size();
139 }
140
Read()141 std::string Read() {
142 if (!reading_) {
143 reading_ = true;
144 source_.contents_ = Slice(dest_.contents_);
145 }
146 std::string scratch;
147 Slice record;
148 if (reader_->ReadRecord(&record, &scratch)) {
149 return record.ToString();
150 } else {
151 return "EOF";
152 }
153 }
154
IncrementByte(int offset,int delta)155 void IncrementByte(int offset, int delta) {
156 dest_.contents_[offset] += delta;
157 }
158
SetByte(int offset,char new_byte)159 void SetByte(int offset, char new_byte) {
160 dest_.contents_[offset] = new_byte;
161 }
162
ShrinkSize(int bytes)163 void ShrinkSize(int bytes) {
164 dest_.contents_.resize(dest_.contents_.size() - bytes);
165 }
166
FixChecksum(int header_offset,int len)167 void FixChecksum(int header_offset, int len) {
168 // Compute crc of type/len/data
169 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
170 crc = crc32c::Mask(crc);
171 EncodeFixed32(&dest_.contents_[header_offset], crc);
172 }
173
ForceError()174 void ForceError() {
175 source_.force_error_ = true;
176 }
177
DroppedBytes() const178 size_t DroppedBytes() const {
179 return report_.dropped_bytes_;
180 }
181
ReportMessage() const182 std::string ReportMessage() const {
183 return report_.message_;
184 }
185
186 // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const187 std::string MatchError(const std::string& msg) const {
188 if (report_.message_.find(msg) == std::string::npos) {
189 return report_.message_;
190 } else {
191 return "OK";
192 }
193 }
194
WriteInitialOffsetLog()195 void WriteInitialOffsetLog() {
196 for (int i = 0; i < num_initial_offset_records_; i++) {
197 std::string record(initial_offset_record_sizes_[i],
198 static_cast<char>('a' + i));
199 Write(record);
200 }
201 }
202
StartReadingAt(uint64_t initial_offset)203 void StartReadingAt(uint64_t initial_offset) {
204 delete reader_;
205 reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
206 }
207
CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end)208 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
209 WriteInitialOffsetLog();
210 reading_ = true;
211 source_.contents_ = Slice(dest_.contents_);
212 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
213 WrittenBytes() + offset_past_end);
214 Slice record;
215 std::string scratch;
216 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
217 delete offset_reader;
218 }
219
CheckInitialOffsetRecord(uint64_t initial_offset,int expected_record_offset)220 void CheckInitialOffsetRecord(uint64_t initial_offset,
221 int expected_record_offset) {
222 WriteInitialOffsetLog();
223 reading_ = true;
224 source_.contents_ = Slice(dest_.contents_);
225 Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
226 initial_offset);
227
228 // Read all records from expected_record_offset through the last one.
229 ASSERT_LT(expected_record_offset, num_initial_offset_records_);
230 for (; expected_record_offset < num_initial_offset_records_;
231 ++expected_record_offset) {
232 Slice record;
233 std::string scratch;
234 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
235 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
236 record.size());
237 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
238 offset_reader->LastRecordOffset());
239 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
240 }
241 delete offset_reader;
242 }
243 };
244
245 size_t LogTest::initial_offset_record_sizes_[] =
246 {10000, // Two sizable records in first block
247 10000,
248 2 * log::kBlockSize - 1000, // Span three blocks
249 1,
250 13716, // Consume all but two bytes of block 3.
251 log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
252 };
253
254 uint64_t LogTest::initial_offset_last_record_offsets_[] =
255 {0,
256 kHeaderSize + 10000,
257 2 * (kHeaderSize + 10000),
258 2 * (kHeaderSize + 10000) +
259 (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
260 2 * (kHeaderSize + 10000) +
261 (2 * log::kBlockSize - 1000) + 3 * kHeaderSize
262 + kHeaderSize + 1,
263 3 * log::kBlockSize,
264 };
265
266 // LogTest::initial_offset_last_record_offsets_ must be defined before this.
267 int LogTest::num_initial_offset_records_ =
268 sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);
269
TEST(LogTest,Empty)270 TEST(LogTest, Empty) {
271 ASSERT_EQ("EOF", Read());
272 }
273
TEST(LogTest,ReadWrite)274 TEST(LogTest, ReadWrite) {
275 Write("foo");
276 Write("bar");
277 Write("");
278 Write("xxxx");
279 ASSERT_EQ("foo", Read());
280 ASSERT_EQ("bar", Read());
281 ASSERT_EQ("", Read());
282 ASSERT_EQ("xxxx", Read());
283 ASSERT_EQ("EOF", Read());
284 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
285 }
286
TEST(LogTest,ManyBlocks)287 TEST(LogTest, ManyBlocks) {
288 for (int i = 0; i < 100000; i++) {
289 Write(NumberString(i));
290 }
291 for (int i = 0; i < 100000; i++) {
292 ASSERT_EQ(NumberString(i), Read());
293 }
294 ASSERT_EQ("EOF", Read());
295 }
296
TEST(LogTest,Fragmentation)297 TEST(LogTest, Fragmentation) {
298 Write("small");
299 Write(BigString("medium", 50000));
300 Write(BigString("large", 100000));
301 ASSERT_EQ("small", Read());
302 ASSERT_EQ(BigString("medium", 50000), Read());
303 ASSERT_EQ(BigString("large", 100000), Read());
304 ASSERT_EQ("EOF", Read());
305 }
306
TEST(LogTest,MarginalTrailer)307 TEST(LogTest, MarginalTrailer) {
308 // Make a trailer that is exactly the same length as an empty record.
309 const int n = kBlockSize - 2*kHeaderSize;
310 Write(BigString("foo", n));
311 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
312 Write("");
313 Write("bar");
314 ASSERT_EQ(BigString("foo", n), Read());
315 ASSERT_EQ("", Read());
316 ASSERT_EQ("bar", Read());
317 ASSERT_EQ("EOF", Read());
318 }
319
TEST(LogTest,MarginalTrailer2)320 TEST(LogTest, MarginalTrailer2) {
321 // Make a trailer that is exactly the same length as an empty record.
322 const int n = kBlockSize - 2*kHeaderSize;
323 Write(BigString("foo", n));
324 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
325 Write("bar");
326 ASSERT_EQ(BigString("foo", n), Read());
327 ASSERT_EQ("bar", Read());
328 ASSERT_EQ("EOF", Read());
329 ASSERT_EQ(0, DroppedBytes());
330 ASSERT_EQ("", ReportMessage());
331 }
332
TEST(LogTest,ShortTrailer)333 TEST(LogTest, ShortTrailer) {
334 const int n = kBlockSize - 2*kHeaderSize + 4;
335 Write(BigString("foo", n));
336 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
337 Write("");
338 Write("bar");
339 ASSERT_EQ(BigString("foo", n), Read());
340 ASSERT_EQ("", Read());
341 ASSERT_EQ("bar", Read());
342 ASSERT_EQ("EOF", Read());
343 }
344
TEST(LogTest,AlignedEof)345 TEST(LogTest, AlignedEof) {
346 const int n = kBlockSize - 2*kHeaderSize + 4;
347 Write(BigString("foo", n));
348 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
349 ASSERT_EQ(BigString("foo", n), Read());
350 ASSERT_EQ("EOF", Read());
351 }
352
TEST(LogTest,OpenForAppend)353 TEST(LogTest, OpenForAppend) {
354 Write("hello");
355 ReopenForAppend();
356 Write("world");
357 ASSERT_EQ("hello", Read());
358 ASSERT_EQ("world", Read());
359 ASSERT_EQ("EOF", Read());
360 }
361
TEST(LogTest,RandomRead)362 TEST(LogTest, RandomRead) {
363 const int N = 500;
364 Random write_rnd(301);
365 for (int i = 0; i < N; i++) {
366 Write(RandomSkewedString(i, &write_rnd));
367 }
368 Random read_rnd(301);
369 for (int i = 0; i < N; i++) {
370 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
371 }
372 ASSERT_EQ("EOF", Read());
373 }
374
375 // Tests of all the error paths in log_reader.cc follow:
376
TEST(LogTest,ReadError)377 TEST(LogTest, ReadError) {
378 Write("foo");
379 ForceError();
380 ASSERT_EQ("EOF", Read());
381 ASSERT_EQ(kBlockSize, DroppedBytes());
382 ASSERT_EQ("OK", MatchError("read error"));
383 }
384
TEST(LogTest,BadRecordType)385 TEST(LogTest, BadRecordType) {
386 Write("foo");
387 // Type is stored in header[6]
388 IncrementByte(6, 100);
389 FixChecksum(0, 3);
390 ASSERT_EQ("EOF", Read());
391 ASSERT_EQ(3, DroppedBytes());
392 ASSERT_EQ("OK", MatchError("unknown record type"));
393 }
394
TEST(LogTest,TruncatedTrailingRecordIsIgnored)395 TEST(LogTest, TruncatedTrailingRecordIsIgnored) {
396 Write("foo");
397 ShrinkSize(4); // Drop all payload as well as a header byte
398 ASSERT_EQ("EOF", Read());
399 // Truncated last record is ignored, not treated as an error.
400 ASSERT_EQ(0, DroppedBytes());
401 ASSERT_EQ("", ReportMessage());
402 }
403
TEST(LogTest,BadLength)404 TEST(LogTest, BadLength) {
405 const int kPayloadSize = kBlockSize - kHeaderSize;
406 Write(BigString("bar", kPayloadSize));
407 Write("foo");
408 // Least significant size byte is stored in header[4].
409 IncrementByte(4, 1);
410 ASSERT_EQ("foo", Read());
411 ASSERT_EQ(kBlockSize, DroppedBytes());
412 ASSERT_EQ("OK", MatchError("bad record length"));
413 }
414
TEST(LogTest,BadLengthAtEndIsIgnored)415 TEST(LogTest, BadLengthAtEndIsIgnored) {
416 Write("foo");
417 ShrinkSize(1);
418 ASSERT_EQ("EOF", Read());
419 ASSERT_EQ(0, DroppedBytes());
420 ASSERT_EQ("", ReportMessage());
421 }
422
TEST(LogTest,ChecksumMismatch)423 TEST(LogTest, ChecksumMismatch) {
424 Write("foo");
425 IncrementByte(0, 10);
426 ASSERT_EQ("EOF", Read());
427 ASSERT_EQ(10, DroppedBytes());
428 ASSERT_EQ("OK", MatchError("checksum mismatch"));
429 }
430
TEST(LogTest,UnexpectedMiddleType)431 TEST(LogTest, UnexpectedMiddleType) {
432 Write("foo");
433 SetByte(6, kMiddleType);
434 FixChecksum(0, 3);
435 ASSERT_EQ("EOF", Read());
436 ASSERT_EQ(3, DroppedBytes());
437 ASSERT_EQ("OK", MatchError("missing start"));
438 }
439
TEST(LogTest,UnexpectedLastType)440 TEST(LogTest, UnexpectedLastType) {
441 Write("foo");
442 SetByte(6, kLastType);
443 FixChecksum(0, 3);
444 ASSERT_EQ("EOF", Read());
445 ASSERT_EQ(3, DroppedBytes());
446 ASSERT_EQ("OK", MatchError("missing start"));
447 }
448
TEST(LogTest,UnexpectedFullType)449 TEST(LogTest, UnexpectedFullType) {
450 Write("foo");
451 Write("bar");
452 SetByte(6, kFirstType);
453 FixChecksum(0, 3);
454 ASSERT_EQ("bar", Read());
455 ASSERT_EQ("EOF", Read());
456 ASSERT_EQ(3, DroppedBytes());
457 ASSERT_EQ("OK", MatchError("partial record without end"));
458 }
459
TEST(LogTest,UnexpectedFirstType)460 TEST(LogTest, UnexpectedFirstType) {
461 Write("foo");
462 Write(BigString("bar", 100000));
463 SetByte(6, kFirstType);
464 FixChecksum(0, 3);
465 ASSERT_EQ(BigString("bar", 100000), Read());
466 ASSERT_EQ("EOF", Read());
467 ASSERT_EQ(3, DroppedBytes());
468 ASSERT_EQ("OK", MatchError("partial record without end"));
469 }
470
TEST(LogTest,MissingLastIsIgnored)471 TEST(LogTest, MissingLastIsIgnored) {
472 Write(BigString("bar", kBlockSize));
473 // Remove the LAST block, including header.
474 ShrinkSize(14);
475 ASSERT_EQ("EOF", Read());
476 ASSERT_EQ("", ReportMessage());
477 ASSERT_EQ(0, DroppedBytes());
478 }
479
TEST(LogTest,PartialLastIsIgnored)480 TEST(LogTest, PartialLastIsIgnored) {
481 Write(BigString("bar", kBlockSize));
482 // Cause a bad record length in the LAST block.
483 ShrinkSize(1);
484 ASSERT_EQ("EOF", Read());
485 ASSERT_EQ("", ReportMessage());
486 ASSERT_EQ(0, DroppedBytes());
487 }
488
TEST(LogTest,SkipIntoMultiRecord)489 TEST(LogTest, SkipIntoMultiRecord) {
490 // Consider a fragmented record:
491 // first(R1), middle(R1), last(R1), first(R2)
492 // If initial_offset points to a record after first(R1) but before first(R2)
493 // incomplete fragment errors are not actual errors, and must be suppressed
494 // until a new first or full record is encountered.
495 Write(BigString("foo", 3*kBlockSize));
496 Write("correct");
497 StartReadingAt(kBlockSize);
498
499 ASSERT_EQ("correct", Read());
500 ASSERT_EQ("", ReportMessage());
501 ASSERT_EQ(0, DroppedBytes());
502 ASSERT_EQ("EOF", Read());
503 }
504
TEST(LogTest,ErrorJoinsRecords)505 TEST(LogTest, ErrorJoinsRecords) {
506 // Consider two fragmented records:
507 // first(R1) last(R1) first(R2) last(R2)
508 // where the middle two fragments disappear. We do not want
509 // first(R1),last(R2) to get joined and returned as a valid record.
510
511 // Write records that span two blocks
512 Write(BigString("foo", kBlockSize));
513 Write(BigString("bar", kBlockSize));
514 Write("correct");
515
516 // Wipe the middle block
517 for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
518 SetByte(offset, 'x');
519 }
520
521 ASSERT_EQ("correct", Read());
522 ASSERT_EQ("EOF", Read());
523 const size_t dropped = DroppedBytes();
524 ASSERT_LE(dropped, 2*kBlockSize + 100);
525 ASSERT_GE(dropped, 2*kBlockSize);
526 }
527
TEST(LogTest,ReadStart)528 TEST(LogTest, ReadStart) {
529 CheckInitialOffsetRecord(0, 0);
530 }
531
TEST(LogTest,ReadSecondOneOff)532 TEST(LogTest, ReadSecondOneOff) {
533 CheckInitialOffsetRecord(1, 1);
534 }
535
TEST(LogTest,ReadSecondTenThousand)536 TEST(LogTest, ReadSecondTenThousand) {
537 CheckInitialOffsetRecord(10000, 1);
538 }
539
TEST(LogTest,ReadSecondStart)540 TEST(LogTest, ReadSecondStart) {
541 CheckInitialOffsetRecord(10007, 1);
542 }
543
TEST(LogTest,ReadThirdOneOff)544 TEST(LogTest, ReadThirdOneOff) {
545 CheckInitialOffsetRecord(10008, 2);
546 }
547
TEST(LogTest,ReadThirdStart)548 TEST(LogTest, ReadThirdStart) {
549 CheckInitialOffsetRecord(20014, 2);
550 }
551
TEST(LogTest,ReadFourthOneOff)552 TEST(LogTest, ReadFourthOneOff) {
553 CheckInitialOffsetRecord(20015, 3);
554 }
555
TEST(LogTest,ReadFourthFirstBlockTrailer)556 TEST(LogTest, ReadFourthFirstBlockTrailer) {
557 CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
558 }
559
TEST(LogTest,ReadFourthMiddleBlock)560 TEST(LogTest, ReadFourthMiddleBlock) {
561 CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
562 }
563
TEST(LogTest,ReadFourthLastBlock)564 TEST(LogTest, ReadFourthLastBlock) {
565 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
566 }
567
TEST(LogTest,ReadFourthStart)568 TEST(LogTest, ReadFourthStart) {
569 CheckInitialOffsetRecord(
570 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
571 3);
572 }
573
TEST(LogTest,ReadInitialOffsetIntoBlockPadding)574 TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
575 CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
576 }
577
TEST(LogTest,ReadEnd)578 TEST(LogTest, ReadEnd) {
579 CheckOffsetPastEndReturnsNoRecords(0);
580 }
581
TEST(LogTest,ReadPastEnd)582 TEST(LogTest, ReadPastEnd) {
583 CheckOffsetPastEndReturnsNoRecords(5);
584 }
585
586 } // namespace log
587 } // namespace leveldb
588
main(int argc,char ** argv)589 int main(int argc, char** argv) {
590 return leveldb::test::RunAllTests();
591 }
592