1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_file_reader.h"
7
8 #include <cassert>
9 #include <string>
10
11 #include "db/blob/blob_log_format.h"
12 #include "file/filename.h"
13 #include "options/cf_options.h"
14 #include "rocksdb/file_system.h"
15 #include "rocksdb/slice.h"
16 #include "rocksdb/status.h"
17 #include "test_util/sync_point.h"
18 #include "util/compression.h"
19 #include "util/crc32c.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
Create(const ImmutableOptions & immutable_cf_options,const FileOptions & file_options,uint32_t column_family_id,HistogramImpl * blob_file_read_hist,uint64_t blob_file_number,const std::shared_ptr<IOTracer> & io_tracer,std::unique_ptr<BlobFileReader> * blob_file_reader)23 Status BlobFileReader::Create(
24 const ImmutableOptions& immutable_cf_options,
25 const FileOptions& file_options, uint32_t column_family_id,
26 HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
27 const std::shared_ptr<IOTracer>& io_tracer,
28 std::unique_ptr<BlobFileReader>* blob_file_reader) {
29 assert(blob_file_reader);
30 assert(!*blob_file_reader);
31
32 uint64_t file_size = 0;
33 std::unique_ptr<RandomAccessFileReader> file_reader;
34
35 {
36 const Status s =
37 OpenFile(immutable_cf_options, file_options, blob_file_read_hist,
38 blob_file_number, io_tracer, &file_size, &file_reader);
39 if (!s.ok()) {
40 return s;
41 }
42 }
43
44 assert(file_reader);
45
46 CompressionType compression_type = kNoCompression;
47
48 {
49 const Status s =
50 ReadHeader(file_reader.get(), column_family_id, &compression_type);
51 if (!s.ok()) {
52 return s;
53 }
54 }
55
56 {
57 const Status s = ReadFooter(file_size, file_reader.get());
58 if (!s.ok()) {
59 return s;
60 }
61 }
62
63 blob_file_reader->reset(
64 new BlobFileReader(std::move(file_reader), file_size, compression_type));
65
66 return Status::OK();
67 }
68
OpenFile(const ImmutableOptions & immutable_cf_options,const FileOptions & file_opts,HistogramImpl * blob_file_read_hist,uint64_t blob_file_number,const std::shared_ptr<IOTracer> & io_tracer,uint64_t * file_size,std::unique_ptr<RandomAccessFileReader> * file_reader)69 Status BlobFileReader::OpenFile(
70 const ImmutableOptions& immutable_cf_options, const FileOptions& file_opts,
71 HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
72 const std::shared_ptr<IOTracer>& io_tracer, uint64_t* file_size,
73 std::unique_ptr<RandomAccessFileReader>* file_reader) {
74 assert(file_size);
75 assert(file_reader);
76
77 const auto& cf_paths = immutable_cf_options.cf_paths;
78 assert(!cf_paths.empty());
79
80 const std::string blob_file_path =
81 BlobFileName(cf_paths.front().path, blob_file_number);
82
83 FileSystem* const fs = immutable_cf_options.fs.get();
84 assert(fs);
85
86 constexpr IODebugContext* dbg = nullptr;
87
88 {
89 TEST_SYNC_POINT("BlobFileReader::OpenFile:GetFileSize");
90
91 const Status s =
92 fs->GetFileSize(blob_file_path, IOOptions(), file_size, dbg);
93 if (!s.ok()) {
94 return s;
95 }
96 }
97
98 if (*file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
99 return Status::Corruption("Malformed blob file");
100 }
101
102 std::unique_ptr<FSRandomAccessFile> file;
103
104 {
105 TEST_SYNC_POINT("BlobFileReader::OpenFile:NewRandomAccessFile");
106
107 const Status s =
108 fs->NewRandomAccessFile(blob_file_path, file_opts, &file, dbg);
109 if (!s.ok()) {
110 return s;
111 }
112 }
113
114 assert(file);
115
116 if (immutable_cf_options.advise_random_on_open) {
117 file->Hint(FSRandomAccessFile::kRandom);
118 }
119
120 file_reader->reset(new RandomAccessFileReader(
121 std::move(file), blob_file_path, immutable_cf_options.clock, io_tracer,
122 immutable_cf_options.stats, BLOB_DB_BLOB_FILE_READ_MICROS,
123 blob_file_read_hist, immutable_cf_options.rate_limiter.get(),
124 immutable_cf_options.listeners));
125
126 return Status::OK();
127 }
128
ReadHeader(const RandomAccessFileReader * file_reader,uint32_t column_family_id,CompressionType * compression_type)129 Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
130 uint32_t column_family_id,
131 CompressionType* compression_type) {
132 assert(file_reader);
133 assert(compression_type);
134
135 Slice header_slice;
136 Buffer buf;
137 AlignedBuf aligned_buf;
138
139 {
140 TEST_SYNC_POINT("BlobFileReader::ReadHeader:ReadFromFile");
141
142 constexpr uint64_t read_offset = 0;
143 constexpr size_t read_size = BlobLogHeader::kSize;
144
145 const Status s = ReadFromFile(file_reader, read_offset, read_size,
146 &header_slice, &buf, &aligned_buf);
147 if (!s.ok()) {
148 return s;
149 }
150
151 TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadHeader:TamperWithResult",
152 &header_slice);
153 }
154
155 BlobLogHeader header;
156
157 {
158 const Status s = header.DecodeFrom(header_slice);
159 if (!s.ok()) {
160 return s;
161 }
162 }
163
164 constexpr ExpirationRange no_expiration_range;
165
166 if (header.has_ttl || header.expiration_range != no_expiration_range) {
167 return Status::Corruption("Unexpected TTL blob file");
168 }
169
170 if (header.column_family_id != column_family_id) {
171 return Status::Corruption("Column family ID mismatch");
172 }
173
174 *compression_type = header.compression;
175
176 return Status::OK();
177 }
178
ReadFooter(uint64_t file_size,const RandomAccessFileReader * file_reader)179 Status BlobFileReader::ReadFooter(uint64_t file_size,
180 const RandomAccessFileReader* file_reader) {
181 assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
182 assert(file_reader);
183
184 Slice footer_slice;
185 Buffer buf;
186 AlignedBuf aligned_buf;
187
188 {
189 TEST_SYNC_POINT("BlobFileReader::ReadFooter:ReadFromFile");
190
191 const uint64_t read_offset = file_size - BlobLogFooter::kSize;
192 constexpr size_t read_size = BlobLogFooter::kSize;
193
194 const Status s = ReadFromFile(file_reader, read_offset, read_size,
195 &footer_slice, &buf, &aligned_buf);
196 if (!s.ok()) {
197 return s;
198 }
199
200 TEST_SYNC_POINT_CALLBACK("BlobFileReader::ReadFooter:TamperWithResult",
201 &footer_slice);
202 }
203
204 BlobLogFooter footer;
205
206 {
207 const Status s = footer.DecodeFrom(footer_slice);
208 if (!s.ok()) {
209 return s;
210 }
211 }
212
213 constexpr ExpirationRange no_expiration_range;
214
215 if (footer.expiration_range != no_expiration_range) {
216 return Status::Corruption("Unexpected TTL blob file");
217 }
218
219 return Status::OK();
220 }
221
ReadFromFile(const RandomAccessFileReader * file_reader,uint64_t read_offset,size_t read_size,Slice * slice,Buffer * buf,AlignedBuf * aligned_buf)222 Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
223 uint64_t read_offset, size_t read_size,
224 Slice* slice, Buffer* buf,
225 AlignedBuf* aligned_buf) {
226 assert(slice);
227 assert(buf);
228 assert(aligned_buf);
229
230 assert(file_reader);
231
232 Status s;
233
234 if (file_reader->use_direct_io()) {
235 constexpr char* scratch = nullptr;
236
237 s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
238 aligned_buf);
239 } else {
240 buf->reset(new char[read_size]);
241 constexpr AlignedBuf* aligned_scratch = nullptr;
242
243 s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
244 buf->get(), aligned_scratch);
245 }
246
247 if (!s.ok()) {
248 return s;
249 }
250
251 if (slice->size() != read_size) {
252 return Status::Corruption("Failed to read data from blob file");
253 }
254
255 return Status::OK();
256 }
257
BlobFileReader(std::unique_ptr<RandomAccessFileReader> && file_reader,uint64_t file_size,CompressionType compression_type)258 BlobFileReader::BlobFileReader(
259 std::unique_ptr<RandomAccessFileReader>&& file_reader, uint64_t file_size,
260 CompressionType compression_type)
261 : file_reader_(std::move(file_reader)),
262 file_size_(file_size),
263 compression_type_(compression_type) {
264 assert(file_reader_);
265 }
266
267 BlobFileReader::~BlobFileReader() = default;
268
GetBlob(const ReadOptions & read_options,const Slice & user_key,uint64_t offset,uint64_t value_size,CompressionType compression_type,PinnableSlice * value,uint64_t * bytes_read) const269 Status BlobFileReader::GetBlob(const ReadOptions& read_options,
270 const Slice& user_key, uint64_t offset,
271 uint64_t value_size,
272 CompressionType compression_type,
273 PinnableSlice* value,
274 uint64_t* bytes_read) const {
275 assert(value);
276
277 const uint64_t key_size = user_key.size();
278
279 if (!IsValidBlobOffset(offset, key_size, value_size, file_size_)) {
280 return Status::Corruption("Invalid blob offset");
281 }
282
283 if (compression_type != compression_type_) {
284 return Status::Corruption("Compression type mismatch when reading blob");
285 }
286
287 // Note: if verify_checksum is set, we read the entire blob record to be able
288 // to perform the verification; otherwise, we just read the blob itself. Since
289 // the offset in BlobIndex actually points to the blob value, we need to make
290 // an adjustment in the former case.
291 const uint64_t adjustment =
292 read_options.verify_checksums
293 ? BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size)
294 : 0;
295 assert(offset >= adjustment);
296
297 const uint64_t record_offset = offset - adjustment;
298 const uint64_t record_size = value_size + adjustment;
299
300 Slice record_slice;
301 Buffer buf;
302 AlignedBuf aligned_buf;
303
304 {
305 TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile");
306
307 const Status s = ReadFromFile(file_reader_.get(), record_offset,
308 static_cast<size_t>(record_size),
309 &record_slice, &buf, &aligned_buf);
310 if (!s.ok()) {
311 return s;
312 }
313
314 TEST_SYNC_POINT_CALLBACK("BlobFileReader::GetBlob:TamperWithResult",
315 &record_slice);
316 }
317
318 if (read_options.verify_checksums) {
319 const Status s = VerifyBlob(record_slice, user_key, value_size);
320 if (!s.ok()) {
321 return s;
322 }
323 }
324
325 const Slice value_slice(record_slice.data() + adjustment, value_size);
326
327 {
328 const Status s =
329 UncompressBlobIfNeeded(value_slice, compression_type, value);
330 if (!s.ok()) {
331 return s;
332 }
333 }
334
335 if (bytes_read) {
336 *bytes_read = record_size;
337 }
338
339 return Status::OK();
340 }
341
VerifyBlob(const Slice & record_slice,const Slice & user_key,uint64_t value_size)342 Status BlobFileReader::VerifyBlob(const Slice& record_slice,
343 const Slice& user_key, uint64_t value_size) {
344 BlobLogRecord record;
345
346 const Slice header_slice(record_slice.data(), BlobLogRecord::kHeaderSize);
347
348 {
349 const Status s = record.DecodeHeaderFrom(header_slice);
350 if (!s.ok()) {
351 return s;
352 }
353 }
354
355 if (record.key_size != user_key.size()) {
356 return Status::Corruption("Key size mismatch when reading blob");
357 }
358
359 if (record.value_size != value_size) {
360 return Status::Corruption("Value size mismatch when reading blob");
361 }
362
363 record.key =
364 Slice(record_slice.data() + BlobLogRecord::kHeaderSize, record.key_size);
365 if (record.key != user_key) {
366 return Status::Corruption("Key mismatch when reading blob");
367 }
368
369 record.value = Slice(record.key.data() + record.key_size, value_size);
370
371 {
372 TEST_SYNC_POINT_CALLBACK("BlobFileReader::VerifyBlob:CheckBlobCRC",
373 &record);
374
375 const Status s = record.CheckBlobCRC();
376 if (!s.ok()) {
377 return s;
378 }
379 }
380
381 return Status::OK();
382 }
383
UncompressBlobIfNeeded(const Slice & value_slice,CompressionType compression_type,PinnableSlice * value)384 Status BlobFileReader::UncompressBlobIfNeeded(const Slice& value_slice,
385 CompressionType compression_type,
386 PinnableSlice* value) {
387 assert(value);
388
389 if (compression_type == kNoCompression) {
390 SaveValue(value_slice, value);
391
392 return Status::OK();
393 }
394
395 UncompressionContext context(compression_type);
396 UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
397 compression_type);
398
399 size_t uncompressed_size = 0;
400 constexpr uint32_t compression_format_version = 2;
401 constexpr MemoryAllocator* allocator = nullptr;
402
403 CacheAllocationPtr output =
404 UncompressData(info, value_slice.data(), value_slice.size(),
405 &uncompressed_size, compression_format_version, allocator);
406
407 TEST_SYNC_POINT_CALLBACK(
408 "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", &output);
409
410 if (!output) {
411 return Status::Corruption("Unable to uncompress blob");
412 }
413
414 SaveValue(Slice(output.get(), uncompressed_size), value);
415
416 return Status::OK();
417 }
418
SaveValue(const Slice & src,PinnableSlice * dst)419 void BlobFileReader::SaveValue(const Slice& src, PinnableSlice* dst) {
420 assert(dst);
421
422 if (dst->IsPinned()) {
423 dst->Reset();
424 }
425
426 dst->PinSelf(src);
427 }
428
429 } // namespace ROCKSDB_NAMESPACE
430