1 
2 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
3 //  This source code is licensed under both the GPLv2 (found in the
4 //  COPYING file in the root directory) and Apache 2.0 License
5 //  (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7 #include "utilities/blob_db/blob_file.h"
8 
9 #include <stdio.h>
10 #include <cinttypes>
11 
12 #include <algorithm>
13 #include <memory>
14 
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/dbformat.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/filename.h"
20 #include "file/readahead_raf.h"
21 #include "logging/logging.h"
22 #include "utilities/blob_db/blob_db_impl.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 
26 namespace blob_db {
27 
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log)28 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
29                    Logger* info_log)
30     : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
31 
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log,uint32_t column_family_id,CompressionType compression,bool has_ttl,const ExpirationRange & expiration_range)32 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
33                    Logger* info_log, uint32_t column_family_id,
34                    CompressionType compression, bool has_ttl,
35                    const ExpirationRange& expiration_range)
36     : parent_(p),
37       path_to_dir_(bdir),
38       file_number_(fn),
39       info_log_(info_log),
40       column_family_id_(column_family_id),
41       compression_(compression),
42       has_ttl_(has_ttl),
43       expiration_range_(expiration_range),
44       header_(column_family_id, compression, has_ttl, expiration_range),
45       header_valid_(true) {}
46 
~BlobFile()47 BlobFile::~BlobFile() {
48   if (obsolete_) {
49     std::string pn(PathName());
50     Status s = Env::Default()->DeleteFile(PathName());
51     if (!s.ok()) {
52       // ROCKS_LOG_INFO(db_options_.info_log,
53       // "File could not be deleted %s", pn.c_str());
54     }
55   }
56 }
57 
GetColumnFamilyId() const58 uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; }
59 
PathName() const60 std::string BlobFile::PathName() const {
61   return BlobFileName(path_to_dir_, file_number_);
62 }
63 
OpenRandomAccessReader(Env * env,const DBOptions & db_options,const EnvOptions & env_options) const64 std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
65     Env* env, const DBOptions& db_options,
66     const EnvOptions& env_options) const {
67   constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
68   std::unique_ptr<RandomAccessFile> sfile;
69   std::string path_name(PathName());
70   Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
71   if (!s.ok()) {
72     // report something here.
73     return nullptr;
74   }
75   sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
76 
77   std::unique_ptr<RandomAccessFileReader> sfile_reader;
78   sfile_reader.reset(new RandomAccessFileReader(
79       NewLegacyRandomAccessFileWrapper(sfile), path_name));
80 
81   std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
82       std::move(sfile_reader), db_options.env, db_options.statistics.get());
83 
84   return log_reader;
85 }
86 
DumpState() const87 std::string BlobFile::DumpState() const {
88   char str[1000];
89   snprintf(
90       str, sizeof(str),
91       "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
92       " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
93       "), writer: %d reader: %d",
94       path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
95       closed_.load(), obsolete_.load(), expiration_range_.first,
96       expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
97   return str;
98 }
99 
MarkObsolete(SequenceNumber sequence)100 void BlobFile::MarkObsolete(SequenceNumber sequence) {
101   assert(Immutable());
102   obsolete_sequence_ = sequence;
103   obsolete_.store(true);
104 }
105 
NeedsFsync(bool hard,uint64_t bytes_per_sync) const106 bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
107   assert(last_fsync_ <= file_size_);
108   return (hard) ? file_size_ > last_fsync_
109                 : (file_size_ - last_fsync_) >= bytes_per_sync;
110 }
111 
WriteFooterAndCloseLocked(SequenceNumber sequence)112 Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
113   BlobLogFooter footer;
114   footer.blob_count = blob_count_;
115   if (HasTTL()) {
116     footer.expiration_range = expiration_range_;
117   }
118 
119   // this will close the file and reset the Writable File Pointer.
120   Status s = log_writer_->AppendFooter(footer);
121   if (s.ok()) {
122     closed_ = true;
123     immutable_sequence_ = sequence;
124     file_size_ += BlobLogFooter::kSize;
125   }
126   // delete the sequential writer
127   log_writer_.reset();
128   return s;
129 }
130 
ReadFooter(BlobLogFooter * bf)131 Status BlobFile::ReadFooter(BlobLogFooter* bf) {
132   if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
133     return Status::IOError("File does not have footer", PathName());
134   }
135 
136   uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
137   // assume that ra_file_reader_ is valid before we enter this
138   assert(ra_file_reader_);
139 
140   Slice result;
141   char scratch[BlobLogFooter::kSize + 10];
142   Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
143                                    scratch);
144   if (!s.ok()) return s;
145   if (result.size() != BlobLogFooter::kSize) {
146     // should not happen
147     return Status::IOError("EOF reached before footer");
148   }
149 
150   s = bf->DecodeFrom(result);
151   return s;
152 }
153 
SetFromFooterLocked(const BlobLogFooter & footer)154 Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
155   // assume that file has been fully fsync'd
156   last_fsync_.store(file_size_);
157   blob_count_ = footer.blob_count;
158   expiration_range_ = footer.expiration_range;
159   closed_ = true;
160   return Status::OK();
161 }
162 
Fsync()163 Status BlobFile::Fsync() {
164   Status s;
165   if (log_writer_.get()) {
166     s = log_writer_->Sync();
167     last_fsync_.store(file_size_.load());
168   }
169   return s;
170 }
171 
CloseRandomAccessLocked()172 void BlobFile::CloseRandomAccessLocked() {
173   ra_file_reader_.reset();
174   last_access_ = -1;
175 }
176 
GetReader(Env * env,const EnvOptions & env_options,std::shared_ptr<RandomAccessFileReader> * reader,bool * fresh_open)177 Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
178                            std::shared_ptr<RandomAccessFileReader>* reader,
179                            bool* fresh_open) {
180   assert(reader != nullptr);
181   assert(fresh_open != nullptr);
182   *fresh_open = false;
183   int64_t current_time = 0;
184   env->GetCurrentTime(&current_time);
185   last_access_.store(current_time);
186   Status s;
187 
188   {
189     ReadLock lockbfile_r(&mutex_);
190     if (ra_file_reader_) {
191       *reader = ra_file_reader_;
192       return s;
193     }
194   }
195 
196   WriteLock lockbfile_w(&mutex_);
197   // Double check.
198   if (ra_file_reader_) {
199     *reader = ra_file_reader_;
200     return s;
201   }
202 
203   std::unique_ptr<RandomAccessFile> rfile;
204   s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
205   if (!s.ok()) {
206     ROCKS_LOG_ERROR(info_log_,
207                     "Failed to open blob file for random-read: %s status: '%s'"
208                     " exists: '%s'",
209                     PathName().c_str(), s.ToString().c_str(),
210                     env->FileExists(PathName()).ToString().c_str());
211     return s;
212   }
213 
214   ra_file_reader_ = std::make_shared<RandomAccessFileReader>(
215       NewLegacyRandomAccessFileWrapper(rfile), PathName());
216   *reader = ra_file_reader_;
217   *fresh_open = true;
218   return s;
219 }
220 
ReadMetadata(Env * env,const EnvOptions & env_options)221 Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
222   assert(Immutable());
223   // Get file size.
224   uint64_t file_size = 0;
225   Status s = env->GetFileSize(PathName(), &file_size);
226   if (s.ok()) {
227     file_size_ = file_size;
228   } else {
229     ROCKS_LOG_ERROR(info_log_,
230                     "Failed to get size of blob file %" PRIu64
231                     ", status: %s",
232                     file_number_, s.ToString().c_str());
233     return s;
234   }
235   if (file_size < BlobLogHeader::kSize) {
236     ROCKS_LOG_ERROR(info_log_,
237                     "Incomplete blob file blob file %" PRIu64
238                     ", size: %" PRIu64,
239                     file_number_, file_size);
240     return Status::Corruption("Incomplete blob file header.");
241   }
242 
243   // Create file reader.
244   std::unique_ptr<RandomAccessFile> file;
245   s = env->NewRandomAccessFile(PathName(), &file, env_options);
246   if (!s.ok()) {
247     ROCKS_LOG_ERROR(info_log_,
248                     "Failed to open blob file %" PRIu64 ", status: %s",
249                     file_number_, s.ToString().c_str());
250     return s;
251   }
252   std::unique_ptr<RandomAccessFileReader> file_reader(
253       new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
254                                  PathName()));
255 
256   // Read file header.
257   char header_buf[BlobLogHeader::kSize];
258   Slice header_slice;
259   s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf);
260   if (!s.ok()) {
261     ROCKS_LOG_ERROR(info_log_,
262                     "Failed to read header of blob file %" PRIu64
263                     ", status: %s",
264                     file_number_, s.ToString().c_str());
265     return s;
266   }
267   BlobLogHeader header;
268   s = header.DecodeFrom(header_slice);
269   if (!s.ok()) {
270     ROCKS_LOG_ERROR(info_log_,
271                     "Failed to decode header of blob file %" PRIu64
272                     ", status: %s",
273                     file_number_, s.ToString().c_str());
274     return s;
275   }
276   column_family_id_ = header.column_family_id;
277   compression_ = header.compression;
278   has_ttl_ = header.has_ttl;
279   if (has_ttl_) {
280     expiration_range_ = header.expiration_range;
281   }
282   header_valid_ = true;
283 
284   // Read file footer.
285   if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
286     // OK not to have footer.
287     assert(!footer_valid_);
288     return Status::OK();
289   }
290   char footer_buf[BlobLogFooter::kSize];
291   Slice footer_slice;
292   s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
293                         &footer_slice, footer_buf);
294   if (!s.ok()) {
295     ROCKS_LOG_ERROR(info_log_,
296                     "Failed to read footer of blob file %" PRIu64
297                     ", status: %s",
298                     file_number_, s.ToString().c_str());
299     return s;
300   }
301   BlobLogFooter footer;
302   s = footer.DecodeFrom(footer_slice);
303   if (!s.ok()) {
304     // OK not to have footer.
305     assert(!footer_valid_);
306     return Status::OK();
307   }
308   blob_count_ = footer.blob_count;
309   if (has_ttl_) {
310     assert(header.expiration_range.first <= footer.expiration_range.first);
311     assert(header.expiration_range.second >= footer.expiration_range.second);
312     expiration_range_ = footer.expiration_range;
313   }
314   footer_valid_ = true;
315   return Status::OK();
316 }
317 
318 }  // namespace blob_db
319 }  // namespace ROCKSDB_NAMESPACE
320 #endif  // ROCKSDB_LITE
321