1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "rocksdb/sst_file_writer.h"
7 
8 #include <vector>
9 
10 #include "db/dbformat.h"
11 #include "env/composite_env_wrapper.h"
12 #include "file/writable_file_writer.h"
13 #include "rocksdb/table.h"
14 #include "table/block_based/block_based_table_builder.h"
15 #include "table/sst_file_writer_collectors.h"
16 #include "test_util/sync_point.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
20 const std::string ExternalSstFilePropertyNames::kVersion =
21     "rocksdb.external_sst_file.version";
22 const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
23     "rocksdb.external_sst_file.global_seqno";
24 
25 #ifndef ROCKSDB_LITE
26 
27 const size_t kFadviseTrigger = 1024 * 1024; // 1MB
28 
29 struct SstFileWriter::Rep {
RepROCKSDB_NAMESPACE::SstFileWriter::Rep30   Rep(const EnvOptions& _env_options, const Options& options,
31       Env::IOPriority _io_priority, const Comparator* _user_comparator,
32       ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
33       : env_options(_env_options),
34         ioptions(options),
35         mutable_cf_options(options),
36         io_priority(_io_priority),
37         internal_comparator(_user_comparator),
38         cfh(_cfh),
39         invalidate_page_cache(_invalidate_page_cache),
40         last_fadvise_size(0),
41         skip_filters(_skip_filters) {}
42 
43   std::unique_ptr<WritableFileWriter> file_writer;
44   std::unique_ptr<TableBuilder> builder;
45   EnvOptions env_options;
46   ImmutableCFOptions ioptions;
47   MutableCFOptions mutable_cf_options;
48   Env::IOPriority io_priority;
49   InternalKeyComparator internal_comparator;
50   ExternalSstFileInfo file_info;
51   InternalKey ikey;
52   std::string column_family_name;
53   ColumnFamilyHandle* cfh;
54   // If true, We will give the OS a hint that this file pages is not needed
55   // every time we write 1MB to the file.
56   bool invalidate_page_cache;
57   // The size of the file during the last time we called Fadvise to remove
58   // cached pages from page cache.
59   uint64_t last_fadvise_size;
60   bool skip_filters;
AddROCKSDB_NAMESPACE::SstFileWriter::Rep61   Status Add(const Slice& user_key, const Slice& value,
62              const ValueType value_type) {
63     if (!builder) {
64       return Status::InvalidArgument("File is not opened");
65     }
66 
67     if (file_info.num_entries == 0) {
68       file_info.smallest_key.assign(user_key.data(), user_key.size());
69     } else {
70       if (internal_comparator.user_comparator()->Compare(
71               user_key, file_info.largest_key) <= 0) {
72         // Make sure that keys are added in order
73         return Status::InvalidArgument(
74             "Keys must be added in strict ascending order.");
75       }
76     }
77 
78     // TODO(tec) : For external SST files we could omit the seqno and type.
79     switch (value_type) {
80       case ValueType::kTypeValue:
81         ikey.Set(user_key, 0 /* Sequence Number */,
82                  ValueType::kTypeValue /* Put */);
83         break;
84       case ValueType::kTypeMerge:
85         ikey.Set(user_key, 0 /* Sequence Number */,
86                  ValueType::kTypeMerge /* Merge */);
87         break;
88       case ValueType::kTypeDeletion:
89         ikey.Set(user_key, 0 /* Sequence Number */,
90                  ValueType::kTypeDeletion /* Delete */);
91         break;
92       default:
93         return Status::InvalidArgument("Value type is not supported");
94     }
95     builder->Add(ikey.Encode(), value);
96 
97     // update file info
98     file_info.num_entries++;
99     file_info.largest_key.assign(user_key.data(), user_key.size());
100     file_info.file_size = builder->FileSize();
101 
102     InvalidatePageCache(false /* closing */);
103 
104     return Status::OK();
105   }
106 
DeleteRangeROCKSDB_NAMESPACE::SstFileWriter::Rep107   Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
108     if (!builder) {
109       return Status::InvalidArgument("File is not opened");
110     }
111 
112     RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
113     if (file_info.num_range_del_entries == 0) {
114       file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
115                                               tombstone.start_key_.size());
116       file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
117                                              tombstone.end_key_.size());
118     } else {
119       if (internal_comparator.user_comparator()->Compare(
120               tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
121         file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
122                                                 tombstone.start_key_.size());
123       }
124       if (internal_comparator.user_comparator()->Compare(
125               tombstone.end_key_, file_info.largest_range_del_key) > 0) {
126         file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
127                                                tombstone.end_key_.size());
128       }
129     }
130 
131     auto ikey_and_end_key = tombstone.Serialize();
132     builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
133 
134     // update file info
135     file_info.num_range_del_entries++;
136     file_info.file_size = builder->FileSize();
137 
138     InvalidatePageCache(false /* closing */);
139 
140     return Status::OK();
141   }
142 
InvalidatePageCacheROCKSDB_NAMESPACE::SstFileWriter::Rep143   void InvalidatePageCache(bool closing) {
144     if (invalidate_page_cache == false) {
145       // Fadvise disabled
146       return;
147     }
148     uint64_t bytes_since_last_fadvise =
149       builder->FileSize() - last_fadvise_size;
150     if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
151       TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
152                                &(bytes_since_last_fadvise));
153       // Tell the OS that we dont need this file in page cache
154       file_writer->InvalidateCache(0, 0);
155       last_fadvise_size = builder->FileSize();
156     }
157   }
158 
159 };
160 
SstFileWriter(const EnvOptions & env_options,const Options & options,const Comparator * user_comparator,ColumnFamilyHandle * column_family,bool invalidate_page_cache,Env::IOPriority io_priority,bool skip_filters)161 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
162                              const Options& options,
163                              const Comparator* user_comparator,
164                              ColumnFamilyHandle* column_family,
165                              bool invalidate_page_cache,
166                              Env::IOPriority io_priority, bool skip_filters)
167     : rep_(new Rep(env_options, options, io_priority, user_comparator,
168                    column_family, invalidate_page_cache, skip_filters)) {
169   rep_->file_info.file_size = 0;
170 }
171 
~SstFileWriter()172 SstFileWriter::~SstFileWriter() {
173   if (rep_->builder) {
174     // User did not call Finish() or Finish() failed, we need to
175     // abandon the builder.
176     rep_->builder->Abandon();
177   }
178 }
179 
Open(const std::string & file_path)180 Status SstFileWriter::Open(const std::string& file_path) {
181   Rep* r = rep_.get();
182   Status s;
183   std::unique_ptr<WritableFile> sst_file;
184   s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
185   if (!s.ok()) {
186     return s;
187   }
188 
189   sst_file->SetIOPriority(r->io_priority);
190 
191   CompressionType compression_type;
192   CompressionOptions compression_opts;
193   if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
194     compression_type = r->ioptions.bottommost_compression;
195     if (r->ioptions.bottommost_compression_opts.enabled) {
196       compression_opts = r->ioptions.bottommost_compression_opts;
197     } else {
198       compression_opts = r->ioptions.compression_opts;
199     }
200   } else if (!r->ioptions.compression_per_level.empty()) {
201     // Use the compression of the last level if we have per level compression
202     compression_type = *(r->ioptions.compression_per_level.rbegin());
203     compression_opts = r->ioptions.compression_opts;
204   } else {
205     compression_type = r->mutable_cf_options.compression;
206     compression_opts = r->ioptions.compression_opts;
207   }
208   uint64_t sample_for_compression =
209       r->mutable_cf_options.sample_for_compression;
210 
211   std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
212       int_tbl_prop_collector_factories;
213 
214   // SstFileWriter properties collector to add SstFileWriter version.
215   int_tbl_prop_collector_factories.emplace_back(
216       new SstFileWriterPropertiesCollectorFactory(2 /* version */,
217                                                   0 /* global_seqno*/));
218 
219   // User collector factories
220   auto user_collector_factories =
221       r->ioptions.table_properties_collector_factories;
222   for (size_t i = 0; i < user_collector_factories.size(); i++) {
223     int_tbl_prop_collector_factories.emplace_back(
224         new UserKeyTablePropertiesCollectorFactory(
225             user_collector_factories[i]));
226   }
227   int unknown_level = -1;
228   uint32_t cf_id;
229 
230   if (r->cfh != nullptr) {
231     // user explicitly specified that this file will be ingested into cfh,
232     // we can persist this information in the file.
233     cf_id = r->cfh->GetID();
234     r->column_family_name = r->cfh->GetName();
235   } else {
236     r->column_family_name = "";
237     cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
238   }
239 
240   TableBuilderOptions table_builder_options(
241       r->ioptions, r->mutable_cf_options, r->internal_comparator,
242       &int_tbl_prop_collector_factories, compression_type,
243       sample_for_compression, compression_opts, r->skip_filters,
244       r->column_family_name, unknown_level);
245   r->file_writer.reset(
246       new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(sst_file)),
247                              file_path, r->env_options, r->ioptions.env,
248                              nullptr /* stats */, r->ioptions.listeners));
249 
250   // TODO(tec) : If table_factory is using compressed block cache, we will
251   // be adding the external sst file blocks into it, which is wasteful.
252   r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
253       table_builder_options, cf_id, r->file_writer.get()));
254 
255   r->file_info = ExternalSstFileInfo();
256   r->file_info.file_path = file_path;
257   r->file_info.version = 2;
258   return s;
259 }
260 
Add(const Slice & user_key,const Slice & value)261 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
262   return rep_->Add(user_key, value, ValueType::kTypeValue);
263 }
264 
Put(const Slice & user_key,const Slice & value)265 Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
266   return rep_->Add(user_key, value, ValueType::kTypeValue);
267 }
268 
Merge(const Slice & user_key,const Slice & value)269 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
270   return rep_->Add(user_key, value, ValueType::kTypeMerge);
271 }
272 
Delete(const Slice & user_key)273 Status SstFileWriter::Delete(const Slice& user_key) {
274   return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
275 }
276 
DeleteRange(const Slice & begin_key,const Slice & end_key)277 Status SstFileWriter::DeleteRange(const Slice& begin_key,
278                                   const Slice& end_key) {
279   return rep_->DeleteRange(begin_key, end_key);
280 }
281 
Finish(ExternalSstFileInfo * file_info)282 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
283   Rep* r = rep_.get();
284   if (!r->builder) {
285     return Status::InvalidArgument("File is not opened");
286   }
287   if (r->file_info.num_entries == 0 &&
288       r->file_info.num_range_del_entries == 0) {
289     return Status::InvalidArgument("Cannot create sst file with no entries");
290   }
291 
292   Status s = r->builder->Finish();
293   r->file_info.file_size = r->builder->FileSize();
294 
295   if (s.ok()) {
296     s = r->file_writer->Sync(r->ioptions.use_fsync);
297     r->InvalidatePageCache(true /* closing */);
298     if (s.ok()) {
299       s = r->file_writer->Close();
300     }
301   }
302   if (!s.ok()) {
303     r->ioptions.env->DeleteFile(r->file_info.file_path);
304   }
305 
306   if (file_info != nullptr) {
307     *file_info = r->file_info;
308   }
309 
310   r->builder.reset();
311   return s;
312 }
313 
FileSize()314 uint64_t SstFileWriter::FileSize() {
315   return rep_->file_info.file_size;
316 }
317 #endif  // !ROCKSDB_LITE
318 
319 }  // namespace ROCKSDB_NAMESPACE
320