1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/log_reader.h"
11 
12 #include <stdio.h>
13 #include "file/sequence_file_reader.h"
14 #include "rocksdb/env.h"
15 #include "test_util/sync_point.h"
16 #include "util/coding.h"
17 #include "util/crc32c.h"
18 #include "util/util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 namespace log {
22 
~Reporter()23 Reader::Reporter::~Reporter() {
24 }
25 
Reader(std::shared_ptr<Logger> info_log,std::unique_ptr<SequentialFileReader> && _file,Reporter * reporter,bool checksum,uint64_t log_num)26 Reader::Reader(std::shared_ptr<Logger> info_log,
27                std::unique_ptr<SequentialFileReader>&& _file,
28                Reporter* reporter, bool checksum, uint64_t log_num)
29     : info_log_(info_log),
30       file_(std::move(_file)),
31       reporter_(reporter),
32       checksum_(checksum),
33       backing_store_(new char[kBlockSize]),
34       buffer_(),
35       eof_(false),
36       read_error_(false),
37       eof_offset_(0),
38       last_record_offset_(0),
39       end_of_buffer_offset_(0),
40       log_number_(log_num),
41       recycled_(false) {}
42 
~Reader()43 Reader::~Reader() {
44   delete[] backing_store_;
45 }
46 
47 // For kAbsoluteConsistency, on clean shutdown we don't expect any error
48 // in the log files.  For other modes, we can ignore only incomplete records
49 // in the last log file, which are presumably due to a write in progress
50 // during restart (or from log recycling).
51 //
52 // TODO krad: Evaluate if we need to move to a more strict mode where we
53 // restrict the inconsistency to only the last log
ReadRecord(Slice * record,std::string * scratch,WALRecoveryMode wal_recovery_mode)54 bool Reader::ReadRecord(Slice* record, std::string* scratch,
55                         WALRecoveryMode wal_recovery_mode) {
56   scratch->clear();
57   record->clear();
58   bool in_fragmented_record = false;
59   // Record offset of the logical record that we're reading
60   // 0 is a dummy value to make compilers happy
61   uint64_t prospective_record_offset = 0;
62 
63   Slice fragment;
64   while (true) {
65     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
66     size_t drop_size = 0;
67     const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);
68     switch (record_type) {
69       case kFullType:
70       case kRecyclableFullType:
71         if (in_fragmented_record && !scratch->empty()) {
72           // Handle bug in earlier versions of log::Writer where
73           // it could emit an empty kFirstType record at the tail end
74           // of a block followed by a kFullType or kFirstType record
75           // at the beginning of the next block.
76           ReportCorruption(scratch->size(), "partial record without end(1)");
77         }
78         prospective_record_offset = physical_record_offset;
79         scratch->clear();
80         *record = fragment;
81         last_record_offset_ = prospective_record_offset;
82         return true;
83 
84       case kFirstType:
85       case kRecyclableFirstType:
86         if (in_fragmented_record && !scratch->empty()) {
87           // Handle bug in earlier versions of log::Writer where
88           // it could emit an empty kFirstType record at the tail end
89           // of a block followed by a kFullType or kFirstType record
90           // at the beginning of the next block.
91           ReportCorruption(scratch->size(), "partial record without end(2)");
92         }
93         prospective_record_offset = physical_record_offset;
94         scratch->assign(fragment.data(), fragment.size());
95         in_fragmented_record = true;
96         break;
97 
98       case kMiddleType:
99       case kRecyclableMiddleType:
100         if (!in_fragmented_record) {
101           ReportCorruption(fragment.size(),
102                            "missing start of fragmented record(1)");
103         } else {
104           scratch->append(fragment.data(), fragment.size());
105         }
106         break;
107 
108       case kLastType:
109       case kRecyclableLastType:
110         if (!in_fragmented_record) {
111           ReportCorruption(fragment.size(),
112                            "missing start of fragmented record(2)");
113         } else {
114           scratch->append(fragment.data(), fragment.size());
115           *record = Slice(*scratch);
116           last_record_offset_ = prospective_record_offset;
117           return true;
118         }
119         break;
120 
121       case kBadHeader:
122         if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
123           // in clean shutdown we don't expect any error in the log files
124           ReportCorruption(drop_size, "truncated header");
125         }
126         FALLTHROUGH_INTENDED;
127 
128       case kEof:
129         if (in_fragmented_record) {
130           if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
131             // in clean shutdown we don't expect any error in the log files
132             ReportCorruption(scratch->size(), "error reading trailing data");
133           }
134           // This can be caused by the writer dying immediately after
135           //  writing a physical record but before completing the next; don't
136           //  treat it as a corruption, just ignore the entire logical record.
137           scratch->clear();
138         }
139         return false;
140 
141       case kOldRecord:
142         if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {
143           // Treat a record from a previous instance of the log as EOF.
144           if (in_fragmented_record) {
145             if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {
146               // in clean shutdown we don't expect any error in the log files
147               ReportCorruption(scratch->size(), "error reading trailing data");
148             }
149             // This can be caused by the writer dying immediately after
150             //  writing a physical record but before completing the next; don't
151             //  treat it as a corruption, just ignore the entire logical record.
152             scratch->clear();
153           }
154           return false;
155         }
156         FALLTHROUGH_INTENDED;
157 
158       case kBadRecord:
159         if (in_fragmented_record) {
160           ReportCorruption(scratch->size(), "error in middle of record");
161           in_fragmented_record = false;
162           scratch->clear();
163         }
164         break;
165 
166       case kBadRecordLen:
167       case kBadRecordChecksum:
168         if (recycled_ &&
169             wal_recovery_mode ==
170                 WALRecoveryMode::kTolerateCorruptedTailRecords) {
171           scratch->clear();
172           return false;
173         }
174         if (record_type == kBadRecordLen) {
175           ReportCorruption(drop_size, "bad record length");
176         } else {
177           ReportCorruption(drop_size, "checksum mismatch");
178         }
179         if (in_fragmented_record) {
180           ReportCorruption(scratch->size(), "error in middle of record");
181           in_fragmented_record = false;
182           scratch->clear();
183         }
184         break;
185 
186       default: {
187         char buf[40];
188         snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
189         ReportCorruption(
190             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
191             buf);
192         in_fragmented_record = false;
193         scratch->clear();
194         break;
195       }
196     }
197   }
198   return false;
199 }
200 
LastRecordOffset()201 uint64_t Reader::LastRecordOffset() {
202   return last_record_offset_;
203 }
204 
UnmarkEOF()205 void Reader::UnmarkEOF() {
206   if (read_error_) {
207     return;
208   }
209   eof_ = false;
210   if (eof_offset_ == 0) {
211     return;
212   }
213   UnmarkEOFInternal();
214 }
215 
UnmarkEOFInternal()216 void Reader::UnmarkEOFInternal() {
217   // If the EOF was in the middle of a block (a partial block was read) we have
218   // to read the rest of the block as ReadPhysicalRecord can only read full
219   // blocks and expects the file position indicator to be aligned to the start
220   // of a block.
221   //
222   //      consumed_bytes + buffer_size() + remaining == kBlockSize
223 
224   size_t consumed_bytes = eof_offset_ - buffer_.size();
225   size_t remaining = kBlockSize - eof_offset_;
226 
227   // backing_store_ is used to concatenate what is left in buffer_ and
228   // the remainder of the block. If buffer_ already uses backing_store_,
229   // we just append the new data.
230   if (buffer_.data() != backing_store_ + consumed_bytes) {
231     // Buffer_ does not use backing_store_ for storage.
232     // Copy what is left in buffer_ to backing_store.
233     memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
234   }
235 
236   Slice read_buffer;
237   Status status = file_->Read(remaining, &read_buffer,
238     backing_store_ + eof_offset_);
239 
240   size_t added = read_buffer.size();
241   end_of_buffer_offset_ += added;
242 
243   if (!status.ok()) {
244     if (added > 0) {
245       ReportDrop(added, status);
246     }
247 
248     read_error_ = true;
249     return;
250   }
251 
252   if (read_buffer.data() != backing_store_ + eof_offset_) {
253     // Read did not write to backing_store_
254     memmove(backing_store_ + eof_offset_, read_buffer.data(),
255       read_buffer.size());
256   }
257 
258   buffer_ = Slice(backing_store_ + consumed_bytes,
259     eof_offset_ + added - consumed_bytes);
260 
261   if (added < remaining) {
262     eof_ = true;
263     eof_offset_ += added;
264   } else {
265     eof_offset_ = 0;
266   }
267 }
268 
ReportCorruption(size_t bytes,const char * reason)269 void Reader::ReportCorruption(size_t bytes, const char* reason) {
270   ReportDrop(bytes, Status::Corruption(reason));
271 }
272 
ReportDrop(size_t bytes,const Status & reason)273 void Reader::ReportDrop(size_t bytes, const Status& reason) {
274   if (reporter_ != nullptr) {
275     reporter_->Corruption(bytes, reason);
276   }
277 }
278 
ReadMore(size_t * drop_size,int * error)279 bool Reader::ReadMore(size_t* drop_size, int *error) {
280   if (!eof_ && !read_error_) {
281     // Last read was a full read, so this is a trailer to skip
282     buffer_.clear();
283     Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
284     end_of_buffer_offset_ += buffer_.size();
285     if (!status.ok()) {
286       buffer_.clear();
287       ReportDrop(kBlockSize, status);
288       read_error_ = true;
289       *error = kEof;
290       return false;
291     } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
292       eof_ = true;
293       eof_offset_ = buffer_.size();
294     }
295     return true;
296   } else {
297     // Note that if buffer_ is non-empty, we have a truncated header at the
298     //  end of the file, which can be caused by the writer crashing in the
299     //  middle of writing the header. Unless explicitly requested we don't
300     //  considering this an error, just report EOF.
301     if (buffer_.size()) {
302       *drop_size = buffer_.size();
303       buffer_.clear();
304       *error = kBadHeader;
305       return false;
306     }
307     buffer_.clear();
308     *error = kEof;
309     return false;
310   }
311 }
312 
ReadPhysicalRecord(Slice * result,size_t * drop_size)313 unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
314   while (true) {
315     // We need at least the minimum header size
316     if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
317       // the default value of r is meaningless because ReadMore will overwrite
318       // it if it returns false; in case it returns true, the return value will
319       // not be used anyway
320       int r = kEof;
321       if (!ReadMore(drop_size, &r)) {
322         return r;
323       }
324       continue;
325     }
326 
327     // Parse the header
328     const char* header = buffer_.data();
329     const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
330     const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
331     const unsigned int type = header[6];
332     const uint32_t length = a | (b << 8);
333     int header_size = kHeaderSize;
334     if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
335       if (end_of_buffer_offset_ - buffer_.size() == 0) {
336         recycled_ = true;
337       }
338       header_size = kRecyclableHeaderSize;
339       // We need enough for the larger header
340       if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
341         int r = kEof;
342         if (!ReadMore(drop_size, &r)) {
343           return r;
344         }
345         continue;
346       }
347       const uint32_t log_num = DecodeFixed32(header + 7);
348       if (log_num != log_number_) {
349         return kOldRecord;
350       }
351     }
352     if (header_size + length > buffer_.size()) {
353       *drop_size = buffer_.size();
354       buffer_.clear();
355       if (!eof_) {
356         return kBadRecordLen;
357       }
358       // If the end of the file has been reached without reading |length|
359       // bytes of payload, assume the writer died in the middle of writing the
360       // record. Don't report a corruption unless requested.
361       if (*drop_size) {
362         return kBadHeader;
363       }
364       return kEof;
365     }
366 
367     if (type == kZeroType && length == 0) {
368       // Skip zero length record without reporting any drops since
369       // such records are produced by the mmap based writing code in
370       // env_posix.cc that preallocates file regions.
371       // NOTE: this should never happen in DB written by new RocksDB versions,
372       // since we turn off mmap writes to manifest and log files
373       buffer_.clear();
374       return kBadRecord;
375     }
376 
377     // Check crc
378     if (checksum_) {
379       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
380       uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
381       if (actual_crc != expected_crc) {
382         // Drop the rest of the buffer since "length" itself may have
383         // been corrupted and if we trust it, we could find some
384         // fragment of a real log record that just happens to look
385         // like a valid log record.
386         *drop_size = buffer_.size();
387         buffer_.clear();
388         return kBadRecordChecksum;
389       }
390     }
391 
392     buffer_.remove_prefix(header_size + length);
393 
394     *result = Slice(header + header_size, length);
395     return type;
396   }
397 }
398 
ReadRecord(Slice * record,std::string * scratch,WALRecoveryMode)399 bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
400                                         WALRecoveryMode /*unused*/) {
401   assert(record != nullptr);
402   assert(scratch != nullptr);
403   record->clear();
404   scratch->clear();
405 
406   uint64_t prospective_record_offset = 0;
407   uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
408   size_t drop_size = 0;
409   unsigned int fragment_type_or_err = 0;  // Initialize to make compiler happy
410   Slice fragment;
411   while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
412     switch (fragment_type_or_err) {
413       case kFullType:
414       case kRecyclableFullType:
415         if (in_fragmented_record_ && !fragments_.empty()) {
416           ReportCorruption(fragments_.size(), "partial record without end(1)");
417         }
418         fragments_.clear();
419         *record = fragment;
420         prospective_record_offset = physical_record_offset;
421         last_record_offset_ = prospective_record_offset;
422         in_fragmented_record_ = false;
423         return true;
424 
425       case kFirstType:
426       case kRecyclableFirstType:
427         if (in_fragmented_record_ || !fragments_.empty()) {
428           ReportCorruption(fragments_.size(), "partial record without end(2)");
429         }
430         prospective_record_offset = physical_record_offset;
431         fragments_.assign(fragment.data(), fragment.size());
432         in_fragmented_record_ = true;
433         break;
434 
435       case kMiddleType:
436       case kRecyclableMiddleType:
437         if (!in_fragmented_record_) {
438           ReportCorruption(fragment.size(),
439                            "missing start of fragmented record(1)");
440         } else {
441           fragments_.append(fragment.data(), fragment.size());
442         }
443         break;
444 
445       case kLastType:
446       case kRecyclableLastType:
447         if (!in_fragmented_record_) {
448           ReportCorruption(fragment.size(),
449                            "missing start of fragmented record(2)");
450         } else {
451           fragments_.append(fragment.data(), fragment.size());
452           scratch->assign(fragments_.data(), fragments_.size());
453           fragments_.clear();
454           *record = Slice(*scratch);
455           last_record_offset_ = prospective_record_offset;
456           in_fragmented_record_ = false;
457           return true;
458         }
459         break;
460 
461       case kBadHeader:
462       case kBadRecord:
463       case kEof:
464       case kOldRecord:
465         if (in_fragmented_record_) {
466           ReportCorruption(fragments_.size(), "error in middle of record");
467           in_fragmented_record_ = false;
468           fragments_.clear();
469         }
470         break;
471 
472       case kBadRecordChecksum:
473         if (recycled_) {
474           fragments_.clear();
475           return false;
476         }
477         ReportCorruption(drop_size, "checksum mismatch");
478         if (in_fragmented_record_) {
479           ReportCorruption(fragments_.size(), "error in middle of record");
480           in_fragmented_record_ = false;
481           fragments_.clear();
482         }
483         break;
484 
485       default: {
486         char buf[40];
487         snprintf(buf, sizeof(buf), "unknown record type %u",
488                  fragment_type_or_err);
489         ReportCorruption(
490             fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
491             buf);
492         in_fragmented_record_ = false;
493         fragments_.clear();
494         break;
495       }
496     }
497   }
498   return false;
499 }
500 
UnmarkEOF()501 void FragmentBufferedReader::UnmarkEOF() {
502   if (read_error_) {
503     return;
504   }
505   eof_ = false;
506   UnmarkEOFInternal();
507 }
508 
TryReadMore(size_t * drop_size,int * error)509 bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
510   if (!eof_ && !read_error_) {
511     // Last read was a full read, so this is a trailer to skip
512     buffer_.clear();
513     Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
514     end_of_buffer_offset_ += buffer_.size();
515     if (!status.ok()) {
516       buffer_.clear();
517       ReportDrop(kBlockSize, status);
518       read_error_ = true;
519       *error = kEof;
520       return false;
521     } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
522       eof_ = true;
523       eof_offset_ = buffer_.size();
524       TEST_SYNC_POINT_CALLBACK(
525           "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
526     }
527     return true;
528   } else if (!read_error_) {
529     UnmarkEOF();
530   }
531   if (!read_error_) {
532     return true;
533   }
534   *error = kEof;
535   *drop_size = buffer_.size();
536   if (buffer_.size() > 0) {
537     *error = kBadHeader;
538   }
539   buffer_.clear();
540   return false;
541 }
542 
543 // return true if the caller should process the fragment_type_or_err.
TryReadFragment(Slice * fragment,size_t * drop_size,unsigned int * fragment_type_or_err)544 bool FragmentBufferedReader::TryReadFragment(
545     Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
546   assert(fragment != nullptr);
547   assert(drop_size != nullptr);
548   assert(fragment_type_or_err != nullptr);
549 
550   while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
551     size_t old_size = buffer_.size();
552     int error = kEof;
553     if (!TryReadMore(drop_size, &error)) {
554       *fragment_type_or_err = error;
555       return false;
556     } else if (old_size == buffer_.size()) {
557       return false;
558     }
559   }
560   const char* header = buffer_.data();
561   const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
562   const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
563   const unsigned int type = header[6];
564   const uint32_t length = a | (b << 8);
565   int header_size = kHeaderSize;
566   if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
567     if (end_of_buffer_offset_ - buffer_.size() == 0) {
568       recycled_ = true;
569     }
570     header_size = kRecyclableHeaderSize;
571     while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
572       size_t old_size = buffer_.size();
573       int error = kEof;
574       if (!TryReadMore(drop_size, &error)) {
575         *fragment_type_or_err = error;
576         return false;
577       } else if (old_size == buffer_.size()) {
578         return false;
579       }
580     }
581     const uint32_t log_num = DecodeFixed32(header + 7);
582     if (log_num != log_number_) {
583       *fragment_type_or_err = kOldRecord;
584       return true;
585     }
586   }
587 
588   while (header_size + length > buffer_.size()) {
589     size_t old_size = buffer_.size();
590     int error = kEof;
591     if (!TryReadMore(drop_size, &error)) {
592       *fragment_type_or_err = error;
593       return false;
594     } else if (old_size == buffer_.size()) {
595       return false;
596     }
597   }
598 
599   if (type == kZeroType && length == 0) {
600     buffer_.clear();
601     *fragment_type_or_err = kBadRecord;
602     return true;
603   }
604 
605   if (checksum_) {
606     uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
607     uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
608     if (actual_crc != expected_crc) {
609       *drop_size = buffer_.size();
610       buffer_.clear();
611       *fragment_type_or_err = kBadRecordChecksum;
612       return true;
613     }
614   }
615 
616   buffer_.remove_prefix(header_size + length);
617 
618   *fragment = Slice(header + header_size, length);
619   *fragment_type_or_err = type;
620   return true;
621 }
622 
623 }  // namespace log
624 }  // namespace ROCKSDB_NAMESPACE
625