1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "rocksdb/sst_file_writer.h"
7 
8 #include <vector>
9 
10 #include "db/dbformat.h"
11 #include "file/writable_file_writer.h"
12 #include "rocksdb/file_system.h"
13 #include "rocksdb/table.h"
14 #include "table/block_based/block_based_table_builder.h"
15 #include "table/sst_file_writer_collectors.h"
16 #include "test_util/sync_point.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
20 const std::string ExternalSstFilePropertyNames::kVersion =
21     "rocksdb.external_sst_file.version";
22 const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
23     "rocksdb.external_sst_file.global_seqno";
24 
25 #ifndef ROCKSDB_LITE
26 
27 const size_t kFadviseTrigger = 1024 * 1024; // 1MB
28 
29 struct SstFileWriter::Rep {
RepROCKSDB_NAMESPACE::SstFileWriter::Rep30   Rep(const EnvOptions& _env_options, const Options& options,
31       Env::IOPriority _io_priority, const Comparator* _user_comparator,
32       ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
33       : env_options(_env_options),
34         ioptions(options),
35         mutable_cf_options(options),
36         io_priority(_io_priority),
37         internal_comparator(_user_comparator),
38         cfh(_cfh),
39         invalidate_page_cache(_invalidate_page_cache),
40         last_fadvise_size(0),
41         skip_filters(_skip_filters) {}
42 
43   std::unique_ptr<WritableFileWriter> file_writer;
44   std::unique_ptr<TableBuilder> builder;
45   EnvOptions env_options;
46   ImmutableOptions ioptions;
47   MutableCFOptions mutable_cf_options;
48   Env::IOPriority io_priority;
49   InternalKeyComparator internal_comparator;
50   ExternalSstFileInfo file_info;
51   InternalKey ikey;
52   std::string column_family_name;
53   ColumnFamilyHandle* cfh;
54   // If true, We will give the OS a hint that this file pages is not needed
55   // every time we write 1MB to the file.
56   bool invalidate_page_cache;
57   // The size of the file during the last time we called Fadvise to remove
58   // cached pages from page cache.
59   uint64_t last_fadvise_size;
60   bool skip_filters;
AddROCKSDB_NAMESPACE::SstFileWriter::Rep61   Status Add(const Slice& user_key, const Slice& value,
62              const ValueType value_type) {
63     if (!builder) {
64       return Status::InvalidArgument("File is not opened");
65     }
66 
67     if (file_info.num_entries == 0) {
68       file_info.smallest_key.assign(user_key.data(), user_key.size());
69     } else {
70       if (internal_comparator.user_comparator()->Compare(
71               user_key, file_info.largest_key) <= 0) {
72         // Make sure that keys are added in order
73         return Status::InvalidArgument(
74             "Keys must be added in strict ascending order.");
75       }
76     }
77 
78     // TODO(tec) : For external SST files we could omit the seqno and type.
79     switch (value_type) {
80       case ValueType::kTypeValue:
81         ikey.Set(user_key, 0 /* Sequence Number */,
82                  ValueType::kTypeValue /* Put */);
83         break;
84       case ValueType::kTypeMerge:
85         ikey.Set(user_key, 0 /* Sequence Number */,
86                  ValueType::kTypeMerge /* Merge */);
87         break;
88       case ValueType::kTypeDeletion:
89         ikey.Set(user_key, 0 /* Sequence Number */,
90                  ValueType::kTypeDeletion /* Delete */);
91         break;
92       default:
93         return Status::InvalidArgument("Value type is not supported");
94     }
95     builder->Add(ikey.Encode(), value);
96 
97     // update file info
98     file_info.num_entries++;
99     file_info.largest_key.assign(user_key.data(), user_key.size());
100     file_info.file_size = builder->FileSize();
101 
102     InvalidatePageCache(false /* closing */).PermitUncheckedError();
103     return Status::OK();
104   }
105 
DeleteRangeROCKSDB_NAMESPACE::SstFileWriter::Rep106   Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
107     if (!builder) {
108       return Status::InvalidArgument("File is not opened");
109     }
110 
111     RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
112     if (file_info.num_range_del_entries == 0) {
113       file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
114                                               tombstone.start_key_.size());
115       file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
116                                              tombstone.end_key_.size());
117     } else {
118       if (internal_comparator.user_comparator()->Compare(
119               tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
120         file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
121                                                 tombstone.start_key_.size());
122       }
123       if (internal_comparator.user_comparator()->Compare(
124               tombstone.end_key_, file_info.largest_range_del_key) > 0) {
125         file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
126                                                tombstone.end_key_.size());
127       }
128     }
129 
130     auto ikey_and_end_key = tombstone.Serialize();
131     builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
132 
133     // update file info
134     file_info.num_range_del_entries++;
135     file_info.file_size = builder->FileSize();
136 
137     InvalidatePageCache(false /* closing */).PermitUncheckedError();
138     return Status::OK();
139   }
140 
InvalidatePageCacheROCKSDB_NAMESPACE::SstFileWriter::Rep141   Status InvalidatePageCache(bool closing) {
142     Status s = Status::OK();
143     if (invalidate_page_cache == false) {
144       // Fadvise disabled
145       return s;
146     }
147     uint64_t bytes_since_last_fadvise =
148       builder->FileSize() - last_fadvise_size;
149     if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
150       TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
151                                &(bytes_since_last_fadvise));
152       // Tell the OS that we don't need this file in page cache
153       s = file_writer->InvalidateCache(0, 0);
154       if (s.IsNotSupported()) {
155         // NotSupported is fine as it could be a file type that doesn't use page
156         // cache.
157         s = Status::OK();
158       }
159       last_fadvise_size = builder->FileSize();
160     }
161     return s;
162   }
163 };
164 
SstFileWriter(const EnvOptions & env_options,const Options & options,const Comparator * user_comparator,ColumnFamilyHandle * column_family,bool invalidate_page_cache,Env::IOPriority io_priority,bool skip_filters)165 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
166                              const Options& options,
167                              const Comparator* user_comparator,
168                              ColumnFamilyHandle* column_family,
169                              bool invalidate_page_cache,
170                              Env::IOPriority io_priority, bool skip_filters)
171     : rep_(new Rep(env_options, options, io_priority, user_comparator,
172                    column_family, invalidate_page_cache, skip_filters)) {
173   rep_->file_info.file_size = 0;
174 }
175 
~SstFileWriter()176 SstFileWriter::~SstFileWriter() {
177   if (rep_->builder) {
178     // User did not call Finish() or Finish() failed, we need to
179     // abandon the builder.
180     rep_->builder->Abandon();
181   }
182 }
183 
Open(const std::string & file_path)184 Status SstFileWriter::Open(const std::string& file_path) {
185   Rep* r = rep_.get();
186   Status s;
187   std::unique_ptr<FSWritableFile> sst_file;
188   FileOptions cur_file_opts(r->env_options);
189   s = r->ioptions.env->GetFileSystem()->NewWritableFile(
190       file_path, cur_file_opts, &sst_file, nullptr);
191   if (!s.ok()) {
192     return s;
193   }
194 
195   sst_file->SetIOPriority(r->io_priority);
196 
197   CompressionType compression_type;
198   CompressionOptions compression_opts;
199   if (r->mutable_cf_options.bottommost_compression !=
200       kDisableCompressionOption) {
201     compression_type = r->mutable_cf_options.bottommost_compression;
202     if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
203       compression_opts = r->mutable_cf_options.bottommost_compression_opts;
204     } else {
205       compression_opts = r->mutable_cf_options.compression_opts;
206     }
207   } else if (!r->ioptions.compression_per_level.empty()) {
208     // Use the compression of the last level if we have per level compression
209     compression_type = *(r->ioptions.compression_per_level.rbegin());
210     compression_opts = r->mutable_cf_options.compression_opts;
211   } else {
212     compression_type = r->mutable_cf_options.compression;
213     compression_opts = r->mutable_cf_options.compression_opts;
214   }
215 
216   IntTblPropCollectorFactories int_tbl_prop_collector_factories;
217 
218   // SstFileWriter properties collector to add SstFileWriter version.
219   int_tbl_prop_collector_factories.emplace_back(
220       new SstFileWriterPropertiesCollectorFactory(2 /* version */,
221                                                   0 /* global_seqno*/));
222 
223   // User collector factories
224   auto user_collector_factories =
225       r->ioptions.table_properties_collector_factories;
226   for (size_t i = 0; i < user_collector_factories.size(); i++) {
227     int_tbl_prop_collector_factories.emplace_back(
228         new UserKeyTablePropertiesCollectorFactory(
229             user_collector_factories[i]));
230   }
231   int unknown_level = -1;
232   uint32_t cf_id;
233 
234   if (r->cfh != nullptr) {
235     // user explicitly specified that this file will be ingested into cfh,
236     // we can persist this information in the file.
237     cf_id = r->cfh->GetID();
238     r->column_family_name = r->cfh->GetName();
239   } else {
240     r->column_family_name = "";
241     cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
242   }
243   // SstFileWriter is used to create sst files that can be added to database
244   // later. Therefore, no real db_id and db_session_id are associated with it.
245   // Here we mimic the way db_session_id behaves by resetting the db_session_id
246   // every time SstFileWriter is used, and in this case db_id is set to be "SST
247   // Writer".
248   std::string db_session_id = r->ioptions.env->GenerateUniqueId();
249   if (!db_session_id.empty() && db_session_id.back() == '\n') {
250     db_session_id.pop_back();
251   }
252   TableBuilderOptions table_builder_options(
253       r->ioptions, r->mutable_cf_options, r->internal_comparator,
254       &int_tbl_prop_collector_factories, compression_type, compression_opts,
255       cf_id, r->column_family_name, unknown_level, false /* is_bottommost */,
256       TableFileCreationReason::kMisc, 0 /* creation_time */,
257       0 /* oldest_key_time */, 0 /* file_creation_time */,
258       "SST Writer" /* db_id */, db_session_id, 0 /* target_file_size */);
259   // XXX: when we can remove skip_filters from the SstFileWriter public API
260   // we can remove it from TableBuilderOptions.
261   table_builder_options.skip_filters = r->skip_filters;
262   FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types;
263   r->file_writer.reset(new WritableFileWriter(
264       std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
265       nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners,
266       r->ioptions.file_checksum_gen_factory.get(),
267       tmp_set.Contains(FileType::kTableFile)));
268 
269   // TODO(tec) : If table_factory is using compressed block cache, we will
270   // be adding the external sst file blocks into it, which is wasteful.
271   r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
272       table_builder_options, r->file_writer.get()));
273 
274   r->file_info = ExternalSstFileInfo();
275   r->file_info.file_path = file_path;
276   r->file_info.version = 2;
277   return s;
278 }
279 
Add(const Slice & user_key,const Slice & value)280 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
281   return rep_->Add(user_key, value, ValueType::kTypeValue);
282 }
283 
Put(const Slice & user_key,const Slice & value)284 Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
285   return rep_->Add(user_key, value, ValueType::kTypeValue);
286 }
287 
Merge(const Slice & user_key,const Slice & value)288 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
289   return rep_->Add(user_key, value, ValueType::kTypeMerge);
290 }
291 
Delete(const Slice & user_key)292 Status SstFileWriter::Delete(const Slice& user_key) {
293   return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
294 }
295 
DeleteRange(const Slice & begin_key,const Slice & end_key)296 Status SstFileWriter::DeleteRange(const Slice& begin_key,
297                                   const Slice& end_key) {
298   return rep_->DeleteRange(begin_key, end_key);
299 }
300 
Finish(ExternalSstFileInfo * file_info)301 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
302   Rep* r = rep_.get();
303   if (!r->builder) {
304     return Status::InvalidArgument("File is not opened");
305   }
306   if (r->file_info.num_entries == 0 &&
307       r->file_info.num_range_del_entries == 0) {
308     return Status::InvalidArgument("Cannot create sst file with no entries");
309   }
310 
311   Status s = r->builder->Finish();
312   r->file_info.file_size = r->builder->FileSize();
313 
314   if (s.ok()) {
315     s = r->file_writer->Sync(r->ioptions.use_fsync);
316     r->InvalidatePageCache(true /* closing */).PermitUncheckedError();
317     if (s.ok()) {
318       s = r->file_writer->Close();
319     }
320   }
321   if (s.ok()) {
322     r->file_info.file_checksum = r->file_writer->GetFileChecksum();
323     r->file_info.file_checksum_func_name =
324         r->file_writer->GetFileChecksumFuncName();
325   }
326   if (!s.ok()) {
327     r->ioptions.env->DeleteFile(r->file_info.file_path);
328   }
329 
330   if (file_info != nullptr) {
331     *file_info = r->file_info;
332   }
333 
334   r->builder.reset();
335   return s;
336 }
337 
FileSize()338 uint64_t SstFileWriter::FileSize() {
339   return rep_->file_info.file_size;
340 }
341 #endif  // !ROCKSDB_LITE
342 
343 }  // namespace ROCKSDB_NAMESPACE
344