1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/log_reader.h"
11 
12 #include <stdio.h>
13 #include "file/sequence_file_reader.h"
14 #include "port/lang.h"
15 #include "rocksdb/env.h"
16 #include "test_util/sync_point.h"
17 #include "util/coding.h"
18 #include "util/crc32c.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 namespace log {
22 
~Reporter()23 Reader::Reporter::~Reporter() {
24 }
25 
Reader(std::shared_ptr<Logger> info_log,std::unique_ptr<SequentialFileReader> && _file,Reporter * reporter,bool checksum,uint64_t log_num)26 Reader::Reader(std::shared_ptr<Logger> info_log,
27                std::unique_ptr<SequentialFileReader>&& _file,
28                Reporter* reporter, bool checksum, uint64_t log_num)
29     : info_log_(info_log),
30       file_(std::move(_file)),
31       reporter_(reporter),
32       checksum_(checksum),
33       backing_store_(new char[kBlockSize]),
34       buffer_(),
35       eof_(false),
36       read_error_(false),
37       eof_offset_(0),
38       last_record_offset_(0),
39       end_of_buffer_offset_(0),
40       log_number_(log_num),
41       recycled_(false) {}
42 
~Reader()43 Reader::~Reader() {
44   delete[] backing_store_;
45 }
46 
47 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
48 // in the log files.  For other modes, we can ignore only incomplete records
49 // in the last log file, which are presumably due to a write in progress
50 // during restart (or from log recycling).
51 //
52 // TODO krad: Evaluate if we need to move to a more strict mode where we
53 // restrict the inconsistency to only the last log
ReadRecord(Slice * record,std::string * scratch,WALRecoveryMode wal_recovery_mode)54 bool Reader::ReadRecord(Slice* record, std::string* scratch,
55                         WALRecoveryMode wal_recovery_mode) {
56   scratch->clear();
57   record->clear();
58   bool in_fragmented_record = false;
59   // Record offset of the logical record that we're reading
60   // 0 is a dummy value to make compilers happy
61   uint64_t prospective_record_offset = 0;
62 
63   Slice fragment;
64   while (true) {
65     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
66     size_t drop_size = 0;
67     const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
68     switch (record_type) {
69       case kFullType:
70       case kRecyclableFullType:
71         if (in_fragmented_record && !scratch->empty()) {
72           // Handle bug in earlier versions of log::Writer where
73           // it could emit an empty kFirstType record at the tail end
74           // of a block followed by a kFullType or kFirstType record
75           // at the beginning of the next block.
76           ReportCorruption(scratch->size(), "partial record without end(1)");
77         }
78         prospective_record_offset = physical_record_offset;
79         scratch->clear();
80         *record = fragment;
81         last_record_offset_ = prospective_record_offset;
82         return true;
83 
84       case kFirstType:
85       case kRecyclableFirstType:
86         if (in_fragmented_record && !scratch->empty()) {
87           // Handle bug in earlier versions of log::Writer where
88           // it could emit an empty kFirstType record at the tail end
89           // of a block followed by a kFullType or kFirstType record
90           // at the beginning of the next block.
91           ReportCorruption(scratch->size(), "partial record without end(2)");
92         }
93         prospective_record_offset = physical_record_offset;
94         scratch->assign(fragment.data(), fragment.size());
95         in_fragmented_record = true;
96         break;
97 
98       case kMiddleType:
99       case kRecyclableMiddleType:
100         if (!in_fragmented_record) {
101           ReportCorruption(fragment.size(),
102                            "missing start of fragmented record(1)");
103         } else {
104           scratch->append(fragment.data(), fragment.size());
105         }
106         break;
107 
108       case kLastType:
109       case kRecyclableLastType:
110         if (!in_fragmented_record) {
111           ReportCorruption(fragment.size(),
112                            "missing start of fragmented record(2)");
113         } else {
114           scratch->append(fragment.data(), fragment.size());
115           *record = Slice(*scratch);
116           last_record_offset_ = prospective_record_offset;
117           return true;
118         }
119         break;
120 
121       case kBadHeader:
122         if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
123             wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
124           // In clean shutdown we don't expect any error in the log files.
125           // In point-in-time recovery an incomplete record at the end could
126           // produce a hole in the recovered data. Report an error here, which
127           // higher layers can choose to ignore when it's provable there is no
128           // hole.
129           ReportCorruption(drop_size, "truncated header");
130         }
131         FALLTHROUGH_INTENDED;
132 
133       case kEof:
134         if (in_fragmented_record) {
135           if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
136               wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
137             // In clean shutdown we don't expect any error in the log files.
138             // In point-in-time recovery an incomplete record at the end could
139             // produce a hole in the recovered data. Report an error here, which
140             // higher layers can choose to ignore when it's provable there is no
141             // hole.
142             ReportCorruption(scratch->size(), "error reading trailing data");
143           }
144           // This can be caused by the writer dying immediately after
145           //  writing a physical record but before completing the next; don't
146           //  treat it as a corruption, just ignore the entire logical record.
147           scratch->clear();
148         }
149         return false;
150 
151       case kOldRecord:
152         if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
153           // Treat a record from a previous instance of the log as EOF.
154           if (in_fragmented_record) {
155             if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
156                 wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
157               // In clean shutdown we don't expect any error in the log files.
158               // In point-in-time recovery an incomplete record at the end could
159               // produce a hole in the recovered data. Report an error here,
160               // which higher layers can choose to ignore when it's provable
161               // there is no hole.
162               ReportCorruption(scratch->size(), "error reading trailing data");
163             }
164             // This can be caused by the writer dying immediately after
165             //  writing a physical record but before completing the next; don't
166             //  treat it as a corruption, just ignore the entire logical record.
167             scratch->clear();
168           }
169           return false;
170         }
171         FALLTHROUGH_INTENDED;
172 
173       case kBadRecord:
174         if (in_fragmented_record) {
175           ReportCorruption(scratch->size(), "error in middle of record");
176           in_fragmented_record = false;
177           scratch->clear();
178         }
179         break;
180 
181       case kBadRecordLen:
182         if (eof_) {
183           if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
184               wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
185             // In clean shutdown we don't expect any error in the log files.
186             // In point-in-time recovery an incomplete record at the end could
187             // produce a hole in the recovered data. Report an error here, which
188             // higher layers can choose to ignore when it's provable there is no
189             // hole.
190             ReportCorruption(drop_size, "truncated record body");
191           }
192           return false;
193         }
194         FALLTHROUGH_INTENDED;
195 
196       case kBadRecordChecksum:
197         if (recycled_ &&
198             wal_recovery_mode ==
199                 WALRecoveryMode::kTolerateCorruptedTailRecords) {
200           scratch->clear();
201           return false;
202         }
203         if (record_type == kBadRecordLen) {
204           ReportCorruption(drop_size, "bad record length");
205         } else {
206           ReportCorruption(drop_size, "checksum mismatch");
207         }
208         if (in_fragmented_record) {
209           ReportCorruption(scratch->size(), "error in middle of record");
210           in_fragmented_record = false;
211           scratch->clear();
212         }
213         break;
214 
215       default: {
216         char buf[40];
217         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
218         ReportCorruption(
219             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
220             buf);
221         in_fragmented_record = false;
222         scratch->clear();
223         break;
224       }
225     }
226   }
227   return false;
228 }
229 
LastRecordOffset()230 uint64_t Reader::LastRecordOffset() {
231   return last_record_offset_;
232 }
233 
LastRecordEnd()234 uint64_t Reader::LastRecordEnd() {
235   return end_of_buffer_offset_ - buffer_.size();
236 }
237 
UnmarkEOF()238 void Reader::UnmarkEOF() {
239   if (read_error_) {
240     return;
241   }
242   eof_ = false;
243   if (eof_offset_ == 0) {
244     return;
245   }
246   UnmarkEOFInternal();
247 }
248 
UnmarkEOFInternal()249 void Reader::UnmarkEOFInternal() {
250   // If the EOF was in the middle of a block (a partial block was read) we have
251   // to read the rest of the block as ReadPhysicalRecord can only read full
252   // blocks and expects the file position indicator to be aligned to the start
253   // of a block.
254   //
255   //      consumed_bytes + buffer_size() + remaining == kBlockSize
256 
257   size_t consumed_bytes = eof_offset_ - buffer_.size();
258   size_t remaining = kBlockSize - eof_offset_;
259 
260   // backing_store_ is used to concatenate what is left in buffer_ and
261   // the remainder of the block. If buffer_ already uses backing_store_,
262   // we just append the new data.
263   if (buffer_.data() != backing_store_ + consumed_bytes) {
264     // Buffer_ does not use backing_store_ for storage.
265     // Copy what is left in buffer_ to backing_store.
266     memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
267   }
268 
269   Slice read_buffer;
270   Status status = file_->Read(remaining, &read_buffer,
271     backing_store_ + eof_offset_);
272 
273   size_t added = read_buffer.size();
274   end_of_buffer_offset_ += added;
275 
276   if (!status.ok()) {
277     if (added > 0) {
278       ReportDrop(added, status);
279     }
280 
281     read_error_ = true;
282     return;
283   }
284 
285   if (read_buffer.data() != backing_store_ + eof_offset_) {
286     // Read did not write to backing_store_
287     memmove(backing_store_ + eof_offset_, read_buffer.data(),
288       read_buffer.size());
289   }
290 
291   buffer_ = Slice(backing_store_ + consumed_bytes,
292     eof_offset_ + added - consumed_bytes);
293 
294   if (added < remaining) {
295     eof_ = true;
296     eof_offset_ += added;
297   } else {
298     eof_offset_ = 0;
299   }
300 }
301 
ReportCorruption(size_t bytes,const char * reason)302 void Reader::ReportCorruption(size_t bytes, const char* reason) {
303   ReportDrop(bytes, Status::Corruption(reason));
304 }
305 
ReportDrop(size_t bytes,const Status & reason)306 void Reader::ReportDrop(size_t bytes, const Status& reason) {
307   if (reporter_ != nullptr) {
308     reporter_->Corruption(bytes, reason);
309   }
310 }
311 
ReadMore(size_t * drop_size,int * error)312 bool Reader::ReadMore(size_t* drop_size, int *error) {
313   if (!eof_ && !read_error_) {
314     // Last read was a full read, so this is a trailer to skip
315     buffer_.clear();
316     Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
317     TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status);
318     end_of_buffer_offset_ += buffer_.size();
319     if (!status.ok()) {
320       buffer_.clear();
321       ReportDrop(kBlockSize, status);
322       read_error_ = true;
323       *error = kEof;
324       return false;
325     } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
326       eof_ = true;
327       eof_offset_ = buffer_.size();
328     }
329     return true;
330   } else {
331     // Note that if buffer_ is non-empty, we have a truncated header at the
332     //  end of the file, which can be caused by the writer crashing in the
333     //  middle of writing the header. Unless explicitly requested we don't
334     //  considering this an error, just report EOF.
335     if (buffer_.size()) {
336       *drop_size = buffer_.size();
337       buffer_.clear();
338       *error = kBadHeader;
339       return false;
340     }
341     buffer_.clear();
342     *error = kEof;
343     return false;
344   }
345 }
346 
ReadPhysicalRecord(Slice * result,size_t * drop_size)347 unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
348   while (true) {
349     // We need at least the minimum header size
350     if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
351       // the default value of r is meaningless because ReadMore will overwrite
352       // it if it returns false; in case it returns true, the return value will
353       // not be used anyway
354       int r = kEof;
355       if (!ReadMore(drop_size, &r)) {
356         return r;
357       }
358       continue;
359     }
360 
361     // Parse the header
362     const char* header = buffer_.data();
363     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
364     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
365     const unsigned int type = header[6];
366     const uint32_t length = a | (b << 8);
367     int header_size = kHeaderSize;
368     if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
369       if (end_of_buffer_offset_ - buffer_.size() == 0) {
370         recycled_ = true;
371       }
372       header_size = kRecyclableHeaderSize;
373       // We need enough for the larger header
374       if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
375         int r = kEof;
376         if (!ReadMore(drop_size, &r)) {
377           return r;
378         }
379         continue;
380       }
381       const uint32_t log_num = DecodeFixed32(header + 7);
382       if (log_num != log_number_) {
383         return kOldRecord;
384       }
385     }
386     if (header_size + length > buffer_.size()) {
387       assert(buffer_.size() >= static_cast<size_t>(header_size));
388       *drop_size = buffer_.size();
389       buffer_.clear();
390       // If the end of the read has been reached without seeing
391       // `header_size + length` bytes of payload, report a corruption. The
392       // higher layers can decide how to handle it based on the recovery mode,
393       // whether this occurred at EOF, whether this is the final WAL, etc.
394       return kBadRecordLen;
395     }
396 
397     if (type == kZeroType && length == 0) {
398       // Skip zero length record without reporting any drops since
399       // such records are produced by the mmap based writing code in
400       // env_posix.cc that preallocates file regions.
401       // NOTE: this should never happen in DB written by new RocksDB versions,
402       // since we turn off mmap writes to manifest and log files
403       buffer_.clear();
404       return kBadRecord;
405     }
406 
407     // Check crc
408     if (checksum_) {
409       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
410       uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
411       if (actual_crc != expected_crc) {
412         // Drop the rest of the buffer since "length" itself may have
413         // been corrupted and if we trust it, we could find some
414         // fragment of a real log record that just happens to look
415         // like a valid log record.
416         *drop_size = buffer_.size();
417         buffer_.clear();
418         return kBadRecordChecksum;
419       }
420     }
421 
422     buffer_.remove_prefix(header_size + length);
423 
424     *result = Slice(header + header_size, length);
425     return type;
426   }
427 }
428 
ReadRecord(Slice * record,std::string * scratch,WALRecoveryMode)429 bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
430                                         WALRecoveryMode /*unused*/) {
431   assert(record != nullptr);
432   assert(scratch != nullptr);
433   record->clear();
434   scratch->clear();
435 
436   uint64_t prospective_record_offset = 0;
437   uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
438   size_t drop_size = 0;
439   unsigned int fragment_type_or_err = 0;  // Initialize to make compiler happy
440   Slice fragment;
441   while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
442     switch (fragment_type_or_err) {
443       case kFullType:
444       case kRecyclableFullType:
445         if (in_fragmented_record_ && !fragments_.empty()) {
446           ReportCorruption(fragments_.size(), "partial record without end(1)");
447         }
448         fragments_.clear();
449         *record = fragment;
450         prospective_record_offset = physical_record_offset;
451         last_record_offset_ = prospective_record_offset;
452         in_fragmented_record_ = false;
453         return true;
454 
455       case kFirstType:
456       case kRecyclableFirstType:
457         if (in_fragmented_record_ || !fragments_.empty()) {
458           ReportCorruption(fragments_.size(), "partial record without end(2)");
459         }
460         prospective_record_offset = physical_record_offset;
461         fragments_.assign(fragment.data(), fragment.size());
462         in_fragmented_record_ = true;
463         break;
464 
465       case kMiddleType:
466       case kRecyclableMiddleType:
467         if (!in_fragmented_record_) {
468           ReportCorruption(fragment.size(),
469                            "missing start of fragmented record(1)");
470         } else {
471           fragments_.append(fragment.data(), fragment.size());
472         }
473         break;
474 
475       case kLastType:
476       case kRecyclableLastType:
477         if (!in_fragmented_record_) {
478           ReportCorruption(fragment.size(),
479                            "missing start of fragmented record(2)");
480         } else {
481           fragments_.append(fragment.data(), fragment.size());
482           scratch->assign(fragments_.data(), fragments_.size());
483           fragments_.clear();
484           *record = Slice(*scratch);
485           last_record_offset_ = prospective_record_offset;
486           in_fragmented_record_ = false;
487           return true;
488         }
489         break;
490 
491       case kBadHeader:
492       case kBadRecord:
493       case kEof:
494       case kOldRecord:
495         if (in_fragmented_record_) {
496           ReportCorruption(fragments_.size(), "error in middle of record");
497           in_fragmented_record_ = false;
498           fragments_.clear();
499         }
500         break;
501 
502       case kBadRecordChecksum:
503         if (recycled_) {
504           fragments_.clear();
505           return false;
506         }
507         ReportCorruption(drop_size, "checksum mismatch");
508         if (in_fragmented_record_) {
509           ReportCorruption(fragments_.size(), "error in middle of record");
510           in_fragmented_record_ = false;
511           fragments_.clear();
512         }
513         break;
514 
515       default: {
516         char buf[40];
517         snprintf(buf, sizeof(buf), "unknown record type %u",
518                  fragment_type_or_err);
519         ReportCorruption(
520             fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
521             buf);
522         in_fragmented_record_ = false;
523         fragments_.clear();
524         break;
525       }
526     }
527   }
528   return false;
529 }
530 
UnmarkEOF()531 void FragmentBufferedReader::UnmarkEOF() {
532   if (read_error_) {
533     return;
534   }
535   eof_ = false;
536   UnmarkEOFInternal();
537 }
538 
TryReadMore(size_t * drop_size,int * error)539 bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
540   if (!eof_ && !read_error_) {
541     // Last read was a full read, so this is a trailer to skip
542     buffer_.clear();
543     Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
544     end_of_buffer_offset_ += buffer_.size();
545     if (!status.ok()) {
546       buffer_.clear();
547       ReportDrop(kBlockSize, status);
548       read_error_ = true;
549       *error = kEof;
550       return false;
551     } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
552       eof_ = true;
553       eof_offset_ = buffer_.size();
554       TEST_SYNC_POINT_CALLBACK(
555           "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
556     }
557     return true;
558   } else if (!read_error_) {
559     UnmarkEOF();
560   }
561   if (!read_error_) {
562     return true;
563   }
564   *error = kEof;
565   *drop_size = buffer_.size();
566   if (buffer_.size() > 0) {
567     *error = kBadHeader;
568   }
569   buffer_.clear();
570   return false;
571 }
572 
573 // return true if the caller should process the fragment_type_or_err.
TryReadFragment(Slice * fragment,size_t * drop_size,unsigned int * fragment_type_or_err)574 bool FragmentBufferedReader::TryReadFragment(
575     Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
576   assert(fragment != nullptr);
577   assert(drop_size != nullptr);
578   assert(fragment_type_or_err != nullptr);
579 
580   while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
581     size_t old_size = buffer_.size();
582     int error = kEof;
583     if (!TryReadMore(drop_size, &error)) {
584       *fragment_type_or_err = error;
585       return false;
586     } else if (old_size == buffer_.size()) {
587       return false;
588     }
589   }
590   const char* header = buffer_.data();
591   const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
592   const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
593   const unsigned int type = header[6];
594   const uint32_t length = a | (b << 8);
595   int header_size = kHeaderSize;
596   if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
597     if (end_of_buffer_offset_ - buffer_.size() == 0) {
598       recycled_ = true;
599     }
600     header_size = kRecyclableHeaderSize;
601     while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
602       size_t old_size = buffer_.size();
603       int error = kEof;
604       if (!TryReadMore(drop_size, &error)) {
605         *fragment_type_or_err = error;
606         return false;
607       } else if (old_size == buffer_.size()) {
608         return false;
609       }
610     }
611     const uint32_t log_num = DecodeFixed32(header + 7);
612     if (log_num != log_number_) {
613       *fragment_type_or_err = kOldRecord;
614       return true;
615     }
616   }
617 
618   while (header_size + length > buffer_.size()) {
619     size_t old_size = buffer_.size();
620     int error = kEof;
621     if (!TryReadMore(drop_size, &error)) {
622       *fragment_type_or_err = error;
623       return false;
624     } else if (old_size == buffer_.size()) {
625       return false;
626     }
627   }
628 
629   if (type == kZeroType && length == 0) {
630     buffer_.clear();
631     *fragment_type_or_err = kBadRecord;
632     return true;
633   }
634 
635   if (checksum_) {
636     uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
637     uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
638     if (actual_crc != expected_crc) {
639       *drop_size = buffer_.size();
640       buffer_.clear();
641       *fragment_type_or_err = kBadRecordChecksum;
642       return true;
643     }
644   }
645 
646   buffer_.remove_prefix(header_size + length);
647 
648   *fragment = Slice(header + header_size, length);
649   *fragment_type_or_err = type;
650   return true;
651 }
652 
653 }  // namespace log
654 }  // namespace ROCKSDB_NAMESPACE
655