1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/blob/blob_file_builder.h"
7 
8 #include <cassert>
9 
10 #include "db/blob/blob_file_addition.h"
11 #include "db/blob/blob_file_completion_callback.h"
12 #include "db/blob/blob_index.h"
13 #include "db/blob/blob_log_format.h"
14 #include "db/blob/blob_log_writer.h"
15 #include "db/version_set.h"
16 #include "file/filename.h"
17 #include "file/read_write_util.h"
18 #include "file/writable_file_writer.h"
19 #include "logging/logging.h"
20 #include "options/cf_options.h"
21 #include "options/options_helper.h"
22 #include "rocksdb/slice.h"
23 #include "rocksdb/status.h"
24 #include "test_util/sync_point.h"
25 #include "trace_replay/io_tracer.h"
26 #include "util/compression.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
BlobFileBuilder(VersionSet * versions,FileSystem * fs,const ImmutableOptions * immutable_cf_options,const MutableCFOptions * mutable_cf_options,const FileOptions * file_options,int job_id,uint32_t column_family_id,const std::string & column_family_name,Env::IOPriority io_priority,Env::WriteLifeTimeHint write_hint,const std::shared_ptr<IOTracer> & io_tracer,BlobFileCompletionCallback * blob_callback,std::vector<std::string> * blob_file_paths,std::vector<BlobFileAddition> * blob_file_additions)30 BlobFileBuilder::BlobFileBuilder(
31     VersionSet* versions, FileSystem* fs,
32     const ImmutableOptions* immutable_cf_options,
33     const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
34     int job_id, uint32_t column_family_id,
35     const std::string& column_family_name, Env::IOPriority io_priority,
36     Env::WriteLifeTimeHint write_hint,
37     const std::shared_ptr<IOTracer>& io_tracer,
38     BlobFileCompletionCallback* blob_callback,
39     std::vector<std::string>* blob_file_paths,
40     std::vector<BlobFileAddition>* blob_file_additions)
41     : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
42                       immutable_cf_options, mutable_cf_options, file_options,
43                       job_id, column_family_id, column_family_name, io_priority,
44                       write_hint, io_tracer, blob_callback, blob_file_paths,
45                       blob_file_additions) {}
46 
BlobFileBuilder(std::function<uint64_t ()> file_number_generator,FileSystem * fs,const ImmutableOptions * immutable_cf_options,const MutableCFOptions * mutable_cf_options,const FileOptions * file_options,int job_id,uint32_t column_family_id,const std::string & column_family_name,Env::IOPriority io_priority,Env::WriteLifeTimeHint write_hint,const std::shared_ptr<IOTracer> & io_tracer,BlobFileCompletionCallback * blob_callback,std::vector<std::string> * blob_file_paths,std::vector<BlobFileAddition> * blob_file_additions)47 BlobFileBuilder::BlobFileBuilder(
48     std::function<uint64_t()> file_number_generator, FileSystem* fs,
49     const ImmutableOptions* immutable_cf_options,
50     const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
51     int job_id, uint32_t column_family_id,
52     const std::string& column_family_name, Env::IOPriority io_priority,
53     Env::WriteLifeTimeHint write_hint,
54     const std::shared_ptr<IOTracer>& io_tracer,
55     BlobFileCompletionCallback* blob_callback,
56     std::vector<std::string>* blob_file_paths,
57     std::vector<BlobFileAddition>* blob_file_additions)
58     : file_number_generator_(std::move(file_number_generator)),
59       fs_(fs),
60       immutable_cf_options_(immutable_cf_options),
61       min_blob_size_(mutable_cf_options->min_blob_size),
62       blob_file_size_(mutable_cf_options->blob_file_size),
63       blob_compression_type_(mutable_cf_options->blob_compression_type),
64       file_options_(file_options),
65       job_id_(job_id),
66       column_family_id_(column_family_id),
67       column_family_name_(column_family_name),
68       io_priority_(io_priority),
69       write_hint_(write_hint),
70       io_tracer_(io_tracer),
71       blob_callback_(blob_callback),
72       blob_file_paths_(blob_file_paths),
73       blob_file_additions_(blob_file_additions),
74       blob_count_(0),
75       blob_bytes_(0) {
76   assert(file_number_generator_);
77   assert(fs_);
78   assert(immutable_cf_options_);
79   assert(file_options_);
80   assert(blob_file_paths_);
81   assert(blob_file_paths_->empty());
82   assert(blob_file_additions_);
83   assert(blob_file_additions_->empty());
84 }
85 
86 BlobFileBuilder::~BlobFileBuilder() = default;
87 
Add(const Slice & key,const Slice & value,std::string * blob_index)88 Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
89                             std::string* blob_index) {
90   assert(blob_index);
91   assert(blob_index->empty());
92 
93   if (value.size() < min_blob_size_) {
94     return Status::OK();
95   }
96 
97   {
98     const Status s = OpenBlobFileIfNeeded();
99     if (!s.ok()) {
100       return s;
101     }
102   }
103 
104   Slice blob = value;
105   std::string compressed_blob;
106 
107   {
108     const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
109     if (!s.ok()) {
110       return s;
111     }
112   }
113 
114   uint64_t blob_file_number = 0;
115   uint64_t blob_offset = 0;
116 
117   {
118     const Status s =
119         WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
120     if (!s.ok()) {
121       return s;
122     }
123   }
124 
125   {
126     const Status s = CloseBlobFileIfNeeded();
127     if (!s.ok()) {
128       return s;
129     }
130   }
131 
132   BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
133                         blob_compression_type_);
134 
135   return Status::OK();
136 }
137 
Finish()138 Status BlobFileBuilder::Finish() {
139   if (!IsBlobFileOpen()) {
140     return Status::OK();
141   }
142 
143   return CloseBlobFile();
144 }
145 
IsBlobFileOpen() const146 bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
147 
OpenBlobFileIfNeeded()148 Status BlobFileBuilder::OpenBlobFileIfNeeded() {
149   if (IsBlobFileOpen()) {
150     return Status::OK();
151   }
152 
153   assert(!blob_count_);
154   assert(!blob_bytes_);
155 
156   assert(file_number_generator_);
157   const uint64_t blob_file_number = file_number_generator_();
158 
159   assert(immutable_cf_options_);
160   assert(!immutable_cf_options_->cf_paths.empty());
161   std::string blob_file_path = BlobFileName(
162       immutable_cf_options_->cf_paths.front().path, blob_file_number);
163 
164   std::unique_ptr<FSWritableFile> file;
165 
166   {
167     assert(file_options_);
168     Status s = NewWritableFile(fs_, blob_file_path, &file, *file_options_);
169 
170     TEST_SYNC_POINT_CALLBACK(
171         "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile", &s);
172 
173     if (!s.ok()) {
174       return s;
175     }
176   }
177 
178   // Note: files get added to blob_file_paths_ right after the open, so they
179   // can be cleaned up upon failure. Contrast this with blob_file_additions_,
180   // which only contains successfully written files.
181   assert(blob_file_paths_);
182   blob_file_paths_->emplace_back(std::move(blob_file_path));
183 
184   assert(file);
185   file->SetIOPriority(io_priority_);
186   file->SetWriteLifeTimeHint(write_hint_);
187   FileTypeSet tmp_set = immutable_cf_options_->checksum_handoff_file_types;
188   Statistics* const statistics = immutable_cf_options_->stats;
189   std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
190       std::move(file), blob_file_paths_->back(), *file_options_,
191       immutable_cf_options_->clock, io_tracer_, statistics,
192       immutable_cf_options_->listeners,
193       immutable_cf_options_->file_checksum_gen_factory.get(),
194       tmp_set.Contains(FileType::kBlobFile)));
195 
196   constexpr bool do_flush = false;
197 
198   std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
199       std::move(file_writer), immutable_cf_options_->clock, statistics,
200       blob_file_number, immutable_cf_options_->use_fsync, do_flush));
201 
202   constexpr bool has_ttl = false;
203   constexpr ExpirationRange expiration_range;
204 
205   BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
206                        expiration_range);
207 
208   {
209     Status s = blob_log_writer->WriteHeader(header);
210 
211     TEST_SYNC_POINT_CALLBACK(
212         "BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader", &s);
213 
214     if (!s.ok()) {
215       return s;
216     }
217   }
218 
219   writer_ = std::move(blob_log_writer);
220 
221   assert(IsBlobFileOpen());
222 
223   return Status::OK();
224 }
225 
CompressBlobIfNeeded(Slice * blob,std::string * compressed_blob) const226 Status BlobFileBuilder::CompressBlobIfNeeded(
227     Slice* blob, std::string* compressed_blob) const {
228   assert(blob);
229   assert(compressed_blob);
230   assert(compressed_blob->empty());
231 
232   if (blob_compression_type_ == kNoCompression) {
233     return Status::OK();
234   }
235 
236   CompressionOptions opts;
237   CompressionContext context(blob_compression_type_);
238   constexpr uint64_t sample_for_compression = 0;
239 
240   CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
241                        blob_compression_type_, sample_for_compression);
242 
243   constexpr uint32_t compression_format_version = 2;
244 
245   if (!CompressData(*blob, info, compression_format_version, compressed_blob)) {
246     return Status::Corruption("Error compressing blob");
247   }
248 
249   *blob = Slice(*compressed_blob);
250 
251   return Status::OK();
252 }
253 
WriteBlobToFile(const Slice & key,const Slice & blob,uint64_t * blob_file_number,uint64_t * blob_offset)254 Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
255                                         uint64_t* blob_file_number,
256                                         uint64_t* blob_offset) {
257   assert(IsBlobFileOpen());
258   assert(blob_file_number);
259   assert(blob_offset);
260 
261   uint64_t key_offset = 0;
262 
263   Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
264 
265   TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AddRecord", &s);
266 
267   if (!s.ok()) {
268     return s;
269   }
270 
271   *blob_file_number = writer_->get_log_number();
272 
273   ++blob_count_;
274   blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
275 
276   return Status::OK();
277 }
278 
CloseBlobFile()279 Status BlobFileBuilder::CloseBlobFile() {
280   assert(IsBlobFileOpen());
281 
282   BlobLogFooter footer;
283   footer.blob_count = blob_count_;
284 
285   std::string checksum_method;
286   std::string checksum_value;
287 
288   Status s = writer_->AppendFooter(footer, &checksum_method, &checksum_value);
289 
290   TEST_SYNC_POINT_CALLBACK("BlobFileBuilder::WriteBlobToFile:AppendFooter", &s);
291 
292   if (!s.ok()) {
293     return s;
294   }
295 
296   const uint64_t blob_file_number = writer_->get_log_number();
297 
298   assert(blob_file_additions_);
299   blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
300                                      std::move(checksum_method),
301                                      std::move(checksum_value));
302 
303   assert(immutable_cf_options_);
304   ROCKS_LOG_INFO(immutable_cf_options_->logger,
305                  "[%s] [JOB %d] Generated blob file #%" PRIu64 ": %" PRIu64
306                  " total blobs, %" PRIu64 " total bytes",
307                  column_family_name_.c_str(), job_id_, blob_file_number,
308                  blob_count_, blob_bytes_);
309   if (blob_callback_) {
310     s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back());
311   }
312 
313   writer_.reset();
314   blob_count_ = 0;
315   blob_bytes_ = 0;
316 
317   return s;
318 }
319 
CloseBlobFileIfNeeded()320 Status BlobFileBuilder::CloseBlobFileIfNeeded() {
321   assert(IsBlobFileOpen());
322 
323   const WritableFileWriter* const file_writer = writer_->file();
324   assert(file_writer);
325 
326   if (file_writer->GetFileSize() < blob_file_size_) {
327     return Status::OK();
328   }
329 
330   return CloseBlobFile();
331 }
332 
Abandon()333 void BlobFileBuilder::Abandon() {
334   if (!IsBlobFileOpen()) {
335     return;
336   }
337 
338   if (blob_callback_) {
339     // BlobFileBuilder::Abandon() is called because of error while writing to
340     // Blob files. So we can ignore the below error.
341     blob_callback_->OnBlobFileCompleted(blob_file_paths_->back())
342         .PermitUncheckedError();
343   }
344 
345   writer_.reset();
346   blob_count_ = 0;
347   blob_bytes_ = 0;
348 }
349 }  // namespace ROCKSDB_NAMESPACE
350