1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/blob/blob_file_reader.h"
7 
8 #include <cassert>
9 #include <string>
10 
11 #include "db/blob/blob_log_format.h"
12 #include "file/filename.h"
13 #include "options/cf_options.h"
14 #include "rocksdb/file_system.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/status.h"
17 #include "test_util/sync_point.h"
18 #include "util/compression.h"
19 #include "util/crc32c.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
Create(const ImmutableOptions & immutable_cf_options,const FileOptions & file_options,uint32_t column_family_id,HistogramImpl * blob_file_read_hist,uint64_t blob_file_number,const std::shared_ptr<IOTracer> & io_tracer,std::unique_ptr<BlobFileReader> * blob_file_reader)23 Status BlobFileReader::Create(
24     const ImmutableOptions& immutable_cf_options,
25     const FileOptions& file_options, uint32_t column_family_id,
26     HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
27     const std::shared_ptr<IOTracer>& io_tracer,
28     std::unique_ptr<BlobFileReader>* blob_file_reader) {
29   assert(blob_file_reader);
30   assert(!*blob_file_reader);
31 
32   uint64_t file_size = 0;
33   std::unique_ptr<RandomAccessFileReader> file_reader;
34 
35   {
36     const Status s =
37         OpenFile(immutable_cf_options, file_options, blob_file_read_hist,
38                  blob_file_number, io_tracer, &file_size, &file_reader);
39     if (!s.ok()) {
40       return s;
41     }
42   }
43 
44   assert(file_reader);
45 
46   CompressionType compression_type = kNoCompression;
47 
48   {
49     const Status s =
50         ReadHeader(file_reader.get(), column_family_id, &compression_type);
51     if (!s.ok()) {
52       return s;
53     }
54   }
55 
56   {
57     const Status s = ReadFooter(file_size, file_reader.get());
58     if (!s.ok()) {
59       return s;
60     }
61   }
62 
63   blob_file_reader->reset(
64       new BlobFileReader(std::move(file_reader), file_size, compression_type));
65 
66   return Status::OK();
67 }
68 
OpenFile(const ImmutableOptions & immutable_cf_options,const FileOptions & file_opts,HistogramImpl * blob_file_read_hist,uint64_t blob_file_number,const std::shared_ptr<IOTracer> & io_tracer,uint64_t * file_size,std::unique_ptr<RandomAccessFileReader> * file_reader)69 Status BlobFileReader::OpenFile(
70     const ImmutableOptions& immutable_cf_options, const FileOptions& file_opts,
71     HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
72     const std::shared_ptr<IOTracer>& io_tracer, uint64_t* file_size,
73     std::unique_ptr<RandomAccessFileReader>* file_reader) {
74   assert(file_size);
75   assert(file_reader);
76 
77   const auto& cf_paths = immutable_cf_options.cf_paths;
78   assert(!cf_paths.empty());
79 
80   const std::string blob_file_path =
81       BlobFileName(cf_paths.front().path, blob_file_number);
82 
83   FileSystem* const fs = immutable_cf_options.fs.get();
84   assert(fs);
85 
86   constexpr IODebugContext* dbg = nullptr;
87 
88   {
89     TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
90 
91     const Status s =
92         fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
93     if (!s.ok()) {
94       return s;
95     }
96   }
97 
98   if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
99     return Status::Corruption("Malformed blob file");
100   }
101 
102   std::unique_ptr<FSRandomAccessFile> file;
103 
104   {
105     TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
106 
107     const Status s =
108         fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
109     if (!s.ok()) {
110       return s;
111     }
112   }
113 
114   assert(file);
115 
116   if (immutable_cf_options.advise_random_on_open) {
117     file->Hint(FSRandomAccessFile::kRandom);
118   }
119 
120   file_reader->reset(new RandomAccessFileReader(
121       std::move(file), blob_file_path, immutable_cf_options.clock, io_tracer,
122       immutable_cf_options.stats, BLOB_DB_BLOB_FILE_READ_MICROS,
123       blob_file_read_hist, immutable_cf_options.rate_limiter.get(),
124       immutable_cf_options.listeners));
125 
126   return Status::OK();
127 }
128 
ReadHeader(const RandomAccessFileReader * file_reader,uint32_t column_family_id,CompressionType * compression_type)129 Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
130                                   uint32_t column_family_id,
131                                   CompressionType* compression_type) {
132   assert(file_reader);
133   assert(compression_type);
134 
135   Slice header_slice;
136   Buffer buf;
137   AlignedBuf aligned_buf;
138 
139   {
140     TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
141 
142     constexpr uint64_t read_offset = 0;
143     constexpr size_t read_size = BlobLogHeader::kSize;
144 
145     const Status s = ReadFromFile(file_reader, read_offset, read_size,
146                                   &header_slice, &buf, &aligned_buf);
147     if (!s.ok()) {
148       return s;
149     }
150 
151     TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
152                              &header_slice);
153   }
154 
155   BlobLogHeader header;
156 
157   {
158     const Status s = header.DecodeFrom(header_slice);
159     if (!s.ok()) {
160       return s;
161     }
162   }
163 
164   constexpr ExpirationRange no_expiration_range;
165 
166   if (header.has_ttl || header.expiration_range != no_expiration_range) {
167     return Status::Corruption("Unexpected TTL blob file");
168   }
169 
170   if (header.column_family_id != column_family_id) {
171     return Status::Corruption("Column family ID mismatch");
172   }
173 
174   *compression_type = header.compression;
175 
176   return Status::OK();
177 }
178 
ReadFooter(uint64_t file_size,const RandomAccessFileReader * file_reader)179 Status BlobFileReader::ReadFooter(uint64_t file_size,
180                                   const RandomAccessFileReader* file_reader) {
181   assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
182   assert(file_reader);
183 
184   Slice footer_slice;
185   Buffer buf;
186   AlignedBuf aligned_buf;
187 
188   {
189     TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
190 
191     const uint64_t read_offset = file_size - BlobLogFooter::kSize;
192     constexpr size_t read_size = BlobLogFooter::kSize;
193 
194     const Status s = ReadFromFile(file_reader, read_offset, read_size,
195                                   &footer_slice, &buf, &aligned_buf);
196     if (!s.ok()) {
197       return s;
198     }
199 
200     TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
201                              &footer_slice);
202   }
203 
204   BlobLogFooter footer;
205 
206   {
207     const Status s = footer.DecodeFrom(footer_slice);
208     if (!s.ok()) {
209       return s;
210     }
211   }
212 
213   constexpr ExpirationRange no_expiration_range;
214 
215   if (footer.expiration_range != no_expiration_range) {
216     return Status::Corruption("Unexpected TTL blob file");
217   }
218 
219   return Status::OK();
220 }
221 
ReadFromFile(const RandomAccessFileReader * file_reader,uint64_t read_offset,size_t read_size,Slice * slice,Buffer * buf,AlignedBuf * aligned_buf)222 Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
223                                     uint64_t read_offset, size_t read_size,
224                                     Slice* slice, Buffer* buf,
225                                     AlignedBuf* aligned_buf) {
226   assert(slice);
227   assert(buf);
228   assert(aligned_buf);
229 
230   assert(file_reader);
231 
232   Status s;
233 
234   if (file_reader->use_direct_io()) {
235     constexpr char* scratch = nullptr;
236 
237     s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
238                           aligned_buf);
239   } else {
240     buf->reset(new char[read_size]);
241     constexpr AlignedBuf* aligned_scratch = nullptr;
242 
243     s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
244                           buf->get(), aligned_scratch);
245   }
246 
247   if (!s.ok()) {
248     return s;
249   }
250 
251   if (slice->size() != read_size) {
252     return Status::Corruption("Failed to read data from blob file");
253   }
254 
255   return Status::OK();
256 }
257 
BlobFileReader(std::unique_ptr<RandomAccessFileReader> && file_reader,uint64_t file_size,CompressionType compression_type)258 BlobFileReader::BlobFileReader(
259     std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
260     CompressionType compression_type)
261     : file_reader_(std::move(file_reader)),
262       file_size_(file_size),
263       compression_type_(compression_type) {
264   assert(file_reader_);
265 }
266 
267 BlobFileReader::~BlobFileReader() = default;
268 
GetBlob(const ReadOptions & read_options,const Slice & user_key,uint64_t offset,uint64_t value_size,CompressionType compression_type,PinnableSlice * value,uint64_t * bytes_read) const269 Status BlobFileReader::GetBlob(const ReadOptions& read_options,
270                                const Slice& user_key, uint64_t offset,
271                                uint64_t value_size,
272                                CompressionType compression_type,
273                                PinnableSlice* value,
274                                uint64_t* bytes_read) const {
275   assert(value);
276 
277   const uint64_t key_size = user_key.size();
278 
279   if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
280     return Status::Corruption("Invalid blob offset");
281   }
282 
283   if (compression_type != compression_type_) {
284     return Status::Corruption("Compression type mismatch when reading blob");
285   }
286 
287   // Note: if verify_checksum is set, we read the entire blob record to be able
288   // to perform the verification; otherwise, we just read the blob itself. Since
289   // the offset in BlobIndex actually points to the blob value, we need to make
290   // an adjustment in the former case.
291   const uint64_t adjustment =
292       read_options.verify_checksums
293           ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
294           : 0;
295   assert(offset >= adjustment);
296 
297   const uint64_t record_offset = offset - adjustment;
298   const uint64_t record_size = value_size + adjustment;
299 
300   Slice record_slice;
301   Buffer buf;
302   AlignedBuf aligned_buf;
303 
304   {
305     TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
306 
307     const Status s = ReadFromFile(file_reader_.get(), record_offset,
308                                   static_cast<size_t>(record_size),
309                                   &record_slice, &buf, &aligned_buf);
310     if (!s.ok()) {
311       return s;
312     }
313 
314     TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
315                              &record_slice);
316   }
317 
318   if (read_options.verify_checksums) {
319     const Status s = VerifyBlob(record_slice, user_key, value_size);
320     if (!s.ok()) {
321       return s;
322     }
323   }
324 
325   const Slice value_slice(record_slice.data() + adjustment, value_size);
326 
327   {
328     const Status s =
329         UncompressBlobIfNeeded(value_slice, compression_type, value);
330     if (!s.ok()) {
331       return s;
332     }
333   }
334 
335   if (bytes_read) {
336     *bytes_read = record_size;
337   }
338 
339   return Status::OK();
340 }
341 
VerifyBlob(const Slice & record_slice,const Slice & user_key,uint64_t value_size)342 Status BlobFileReader::VerifyBlob(const Slice& record_slice,
343                                   const Slice& user_key, uint64_t value_size) {
344   BlobLogRecord record;
345 
346   const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
347 
348   {
349     const Status s = record.DecodeHeaderFrom(header_slice);
350     if (!s.ok()) {
351       return s;
352     }
353   }
354 
355   if (record.key_size != user_key.size()) {
356     return Status::Corruption("Key size mismatch when reading blob");
357   }
358 
359   if (record.value_size != value_size) {
360     return Status::Corruption("Value size mismatch when reading blob");
361   }
362 
363   record.key =
364       Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
365   if (record.key != user_key) {
366     return Status::Corruption("Key mismatch when reading blob");
367   }
368 
369   record.value = Slice(record.key.data() + record.key_size, value_size);
370 
371   {
372     TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
373                              &record);
374 
375     const Status s = record.CheckBlobCRC();
376     if (!s.ok()) {
377       return s;
378     }
379   }
380 
381   return Status::OK();
382 }
383 
UncompressBlobIfNeeded(const Slice & value_slice,CompressionType compression_type,PinnableSlice * value)384 Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
385                                               CompressionType compression_type,
386                                               PinnableSlice* value) {
387   assert(value);
388 
389   if (compression_type == kNoCompression) {
390     SaveValue(value_slice, value);
391 
392     return Status::OK();
393   }
394 
395   UncompressionContext context(compression_type);
396   UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
397                          compression_type);
398 
399   size_t uncompressed_size = 0;
400   constexpr uint32_t compression_format_version = 2;
401   constexpr MemoryAllocator* allocator = nullptr;
402 
403   CacheAllocationPtr output =
404       UncompressData(info, value_slice.data(), value_slice.size(),
405                      &uncompressed_size, compression_format_version, allocator);
406 
407   TEST_SYNC_POINT_CALLBACK(
408       "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
409 
410   if (!output) {
411     return Status::Corruption("Unable to uncompress blob");
412   }
413 
414   SaveValue(Slice(output.get(), uncompressed_size), value);
415 
416   return Status::OK();
417 }
418 
SaveValue(const Slice & src,PinnableSlice * dst)419 void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
420   assert(dst);
421 
422   if (dst->IsPinned()) {
423     dst->Reset();
424   }
425 
426   dst->PinSelf(src);
427 }
428 
429 }  // namespace ROCKSDB_NAMESPACE
430