1 2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 3 // This source code is licensed under both the GPLv2 (found in the 4 // COPYING file in the root directory) and Apache 2.0 License 5 // (found in the LICENSE.Apache file in the root directory). 6 #ifndef ROCKSDB_LITE 7 #include "utilities/blob_db/blob_file.h" 8 9 #include <stdio.h> 10 #include <cinttypes> 11 12 #include <algorithm> 13 #include <memory> 14 15 #include "db/column_family.h" 16 #include "db/db_impl/db_impl.h" 17 #include "db/dbformat.h" 18 #include "env/composite_env_wrapper.h" 19 #include "file/filename.h" 20 #include "file/readahead_raf.h" 21 #include "logging/logging.h" 22 #include "utilities/blob_db/blob_db_impl.h" 23 24 namespace ROCKSDB_NAMESPACE { 25 26 namespace blob_db { 27 28 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, 29 Logger* info_log) 30 : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} 31 32 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, 33 Logger* info_log, uint32_t column_family_id, 34 CompressionType compression, bool has_ttl, 35 const ExpirationRange& expiration_range) 36 : parent_(p), 37 path_to_dir_(bdir), 38 file_number_(fn), 39 info_log_(info_log), 40 column_family_id_(column_family_id), 41 compression_(compression), 42 has_ttl_(has_ttl), 43 expiration_range_(expiration_range), 44 header_(column_family_id, compression, has_ttl, expiration_range), 45 header_valid_(true) {} 46 47 BlobFile::~BlobFile() { 48 if (obsolete_) { 49 std::string pn(PathName()); 50 Status s = Env::Default()->DeleteFile(PathName()); 51 if (!s.ok()) { 52 // ROCKS_LOG_INFO(db_options_.info_log, 53 // "File could not be deleted %s", pn.c_str()); 54 } 55 } 56 } 57 58 uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } 59 60 std::string BlobFile::PathName() const { 61 return BlobFileName(path_to_dir_, file_number_); 62 } 63 64 std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader( 65 Env* env, const DBOptions& db_options, 66 const EnvOptions& env_options) const { 67 constexpr size_t kReadaheadSize = 2 * 1024 * 1024; 68 std::unique_ptr<RandomAccessFile> sfile; 69 std::string path_name(PathName()); 70 Status s = env->NewRandomAccessFile(path_name, &sfile, env_options); 71 if (!s.ok()) { 72 // report something here. 73 return nullptr; 74 } 75 sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize); 76 77 std::unique_ptr<RandomAccessFileReader> sfile_reader; 78 sfile_reader.reset(new RandomAccessFileReader( 79 NewLegacyRandomAccessFileWrapper(sfile), path_name)); 80 81 std::shared_ptr<Reader> log_reader = std::make_shared<Reader>( 82 std::move(sfile_reader), db_options.env, db_options.statistics.get()); 83 84 return log_reader; 85 } 86 87 std::string BlobFile::DumpState() const { 88 char str[1000]; 89 snprintf( 90 str, sizeof(str), 91 "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 92 " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 93 "), writer: %d reader: %d", 94 path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), 95 closed_.load(), obsolete_.load(), expiration_range_.first, 96 expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); 97 return str; 98 } 99 100 void BlobFile::MarkObsolete(SequenceNumber sequence) { 101 assert(Immutable()); 102 obsolete_sequence_ = sequence; 103 obsolete_.store(true); 104 } 105 106 bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { 107 assert(last_fsync_ <= file_size_); 108 return (hard) ? file_size_ > last_fsync_ 109 : (file_size_ - last_fsync_) >= bytes_per_sync; 110 } 111 112 Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { 113 BlobLogFooter footer; 114 footer.blob_count = blob_count_; 115 if (HasTTL()) { 116 footer.expiration_range = expiration_range_; 117 } 118 119 // this will close the file and reset the Writable File Pointer. 120 Status s = log_writer_->AppendFooter(footer); 121 if (s.ok()) { 122 closed_ = true; 123 immutable_sequence_ = sequence; 124 file_size_ += BlobLogFooter::kSize; 125 } 126 // delete the sequential writer 127 log_writer_.reset(); 128 return s; 129 } 130 131 Status BlobFile::ReadFooter(BlobLogFooter* bf) { 132 if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { 133 return Status::IOError("File does not have footer", PathName()); 134 } 135 136 uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; 137 // assume that ra_file_reader_ is valid before we enter this 138 assert(ra_file_reader_); 139 140 Slice result; 141 char scratch[BlobLogFooter::kSize + 10]; 142 Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, 143 scratch); 144 if (!s.ok()) return s; 145 if (result.size() != BlobLogFooter::kSize) { 146 // should not happen 147 return Status::IOError("EOF reached before footer"); 148 } 149 150 s = bf->DecodeFrom(result); 151 return s; 152 } 153 154 Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { 155 // assume that file has been fully fsync'd 156 last_fsync_.store(file_size_); 157 blob_count_ = footer.blob_count; 158 expiration_range_ = footer.expiration_range; 159 closed_ = true; 160 return Status::OK(); 161 } 162 163 Status BlobFile::Fsync() { 164 Status s; 165 if (log_writer_.get()) { 166 s = log_writer_->Sync(); 167 last_fsync_.store(file_size_.load()); 168 } 169 return s; 170 } 171 172 void BlobFile::CloseRandomAccessLocked() { 173 ra_file_reader_.reset(); 174 last_access_ = -1; 175 } 176 177 Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, 178 std::shared_ptr<RandomAccessFileReader>* reader, 179 bool* fresh_open) { 180 assert(reader != nullptr); 181 assert(fresh_open != nullptr); 182 *fresh_open = false; 183 int64_t current_time = 0; 184 env->GetCurrentTime(¤t_time); 185 last_access_.store(current_time); 186 Status s; 187 188 { 189 ReadLock lockbfile_r(&mutex_); 190 if (ra_file_reader_) { 191 *reader = ra_file_reader_; 192 return s; 193 } 194 } 195 196 WriteLock lockbfile_w(&mutex_); 197 // Double check. 198 if (ra_file_reader_) { 199 *reader = ra_file_reader_; 200 return s; 201 } 202 203 std::unique_ptr<RandomAccessFile> rfile; 204 s = env->NewRandomAccessFile(PathName(), &rfile, env_options); 205 if (!s.ok()) { 206 ROCKS_LOG_ERROR(info_log_, 207 "Failed to open blob file for random-read: %s status: '%s'" 208 " exists: '%s'", 209 PathName().c_str(), s.ToString().c_str(), 210 env->FileExists(PathName()).ToString().c_str()); 211 return s; 212 } 213 214 ra_file_reader_ = std::make_shared<RandomAccessFileReader>( 215 NewLegacyRandomAccessFileWrapper(rfile), PathName()); 216 *reader = ra_file_reader_; 217 *fresh_open = true; 218 return s; 219 } 220 221 Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { 222 assert(Immutable()); 223 // Get file size. 224 uint64_t file_size = 0; 225 Status s = env->GetFileSize(PathName(), &file_size); 226 if (s.ok()) { 227 file_size_ = file_size; 228 } else { 229 ROCKS_LOG_ERROR(info_log_, 230 "Failed to get size of blob file %" PRIu64 231 ", status: %s", 232 file_number_, s.ToString().c_str()); 233 return s; 234 } 235 if (file_size < BlobLogHeader::kSize) { 236 ROCKS_LOG_ERROR(info_log_, 237 "Incomplete blob file blob file %" PRIu64 238 ", size: %" PRIu64, 239 file_number_, file_size); 240 return Status::Corruption("Incomplete blob file header."); 241 } 242 243 // Create file reader. 244 std::unique_ptr<RandomAccessFile> file; 245 s = env->NewRandomAccessFile(PathName(), &file, env_options); 246 if (!s.ok()) { 247 ROCKS_LOG_ERROR(info_log_, 248 "Failed to open blob file %" PRIu64 ", status: %s", 249 file_number_, s.ToString().c_str()); 250 return s; 251 } 252 std::unique_ptr<RandomAccessFileReader> file_reader( 253 new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file), 254 PathName())); 255 256 // Read file header. 257 char header_buf[BlobLogHeader::kSize]; 258 Slice header_slice; 259 s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf); 260 if (!s.ok()) { 261 ROCKS_LOG_ERROR(info_log_, 262 "Failed to read header of blob file %" PRIu64 263 ", status: %s", 264 file_number_, s.ToString().c_str()); 265 return s; 266 } 267 BlobLogHeader header; 268 s = header.DecodeFrom(header_slice); 269 if (!s.ok()) { 270 ROCKS_LOG_ERROR(info_log_, 271 "Failed to decode header of blob file %" PRIu64 272 ", status: %s", 273 file_number_, s.ToString().c_str()); 274 return s; 275 } 276 column_family_id_ = header.column_family_id; 277 compression_ = header.compression; 278 has_ttl_ = header.has_ttl; 279 if (has_ttl_) { 280 expiration_range_ = header.expiration_range; 281 } 282 header_valid_ = true; 283 284 // Read file footer. 285 if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { 286 // OK not to have footer. 287 assert(!footer_valid_); 288 return Status::OK(); 289 } 290 char footer_buf[BlobLogFooter::kSize]; 291 Slice footer_slice; 292 s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, 293 &footer_slice, footer_buf); 294 if (!s.ok()) { 295 ROCKS_LOG_ERROR(info_log_, 296 "Failed to read footer of blob file %" PRIu64 297 ", status: %s", 298 file_number_, s.ToString().c_str()); 299 return s; 300 } 301 BlobLogFooter footer; 302 s = footer.DecodeFrom(footer_slice); 303 if (!s.ok()) { 304 // OK not to have footer. 305 assert(!footer_valid_); 306 return Status::OK(); 307 } 308 blob_count_ = footer.blob_count; 309 if (has_ttl_) { 310 assert(header.expiration_range.first <= footer.expiration_range.first); 311 assert(header.expiration_range.second >= footer.expiration_range.second); 312 expiration_range_ = footer.expiration_range; 313 } 314 footer_valid_ = true; 315 return Status::OK(); 316 } 317 318 } // namespace blob_db 319 } // namespace ROCKSDB_NAMESPACE 320 #endif // ROCKSDB_LITE 321