1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "rocksdb/sst_file_writer.h"
7
8 #include <vector>
9
10 #include "db/dbformat.h"
11 #include "file/writable_file_writer.h"
12 #include "rocksdb/file_system.h"
13 #include "rocksdb/table.h"
14 #include "table/block_based/block_based_table_builder.h"
15 #include "table/sst_file_writer_collectors.h"
16 #include "test_util/sync_point.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 const std::string ExternalSstFilePropertyNames::kVersion =
21 "rocksdb.external_sst_file.version";
22 const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
23 "rocksdb.external_sst_file.global_seqno";
24
25 #ifndef ROCKSDB_LITE
26
27 const size_t kFadviseTrigger = 1024 * 1024; // 1MB
28
29 struct SstFileWriter::Rep {
RepROCKSDB_NAMESPACE::SstFileWriter::Rep30 Rep(const EnvOptions& _env_options, const Options& options,
31 Env::IOPriority _io_priority, const Comparator* _user_comparator,
32 ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
33 : env_options(_env_options),
34 ioptions(options),
35 mutable_cf_options(options),
36 io_priority(_io_priority),
37 internal_comparator(_user_comparator),
38 cfh(_cfh),
39 invalidate_page_cache(_invalidate_page_cache),
40 last_fadvise_size(0),
41 skip_filters(_skip_filters) {}
42
43 std::unique_ptr<WritableFileWriter> file_writer;
44 std::unique_ptr<TableBuilder> builder;
45 EnvOptions env_options;
46 ImmutableOptions ioptions;
47 MutableCFOptions mutable_cf_options;
48 Env::IOPriority io_priority;
49 InternalKeyComparator internal_comparator;
50 ExternalSstFileInfo file_info;
51 InternalKey ikey;
52 std::string column_family_name;
53 ColumnFamilyHandle* cfh;
54 // If true, We will give the OS a hint that this file pages is not needed
55 // every time we write 1MB to the file.
56 bool invalidate_page_cache;
57 // The size of the file during the last time we called Fadvise to remove
58 // cached pages from page cache.
59 uint64_t last_fadvise_size;
60 bool skip_filters;
AddROCKSDB_NAMESPACE::SstFileWriter::Rep61 Status Add(const Slice& user_key, const Slice& value,
62 const ValueType value_type) {
63 if (!builder) {
64 return Status::InvalidArgument("File is not opened");
65 }
66
67 if (file_info.num_entries == 0) {
68 file_info.smallest_key.assign(user_key.data(), user_key.size());
69 } else {
70 if (internal_comparator.user_comparator()->Compare(
71 user_key, file_info.largest_key) <= 0) {
72 // Make sure that keys are added in order
73 return Status::InvalidArgument(
74 "Keys must be added in strict ascending order.");
75 }
76 }
77
78 // TODO(tec) : For external SST files we could omit the seqno and type.
79 switch (value_type) {
80 case ValueType::kTypeValue:
81 ikey.Set(user_key, 0 /* Sequence Number */,
82 ValueType::kTypeValue /* Put */);
83 break;
84 case ValueType::kTypeMerge:
85 ikey.Set(user_key, 0 /* Sequence Number */,
86 ValueType::kTypeMerge /* Merge */);
87 break;
88 case ValueType::kTypeDeletion:
89 ikey.Set(user_key, 0 /* Sequence Number */,
90 ValueType::kTypeDeletion /* Delete */);
91 break;
92 default:
93 return Status::InvalidArgument("Value type is not supported");
94 }
95 builder->Add(ikey.Encode(), value);
96
97 // update file info
98 file_info.num_entries++;
99 file_info.largest_key.assign(user_key.data(), user_key.size());
100 file_info.file_size = builder->FileSize();
101
102 InvalidatePageCache(false /* closing */).PermitUncheckedError();
103 return Status::OK();
104 }
105
DeleteRangeROCKSDB_NAMESPACE::SstFileWriter::Rep106 Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
107 if (!builder) {
108 return Status::InvalidArgument("File is not opened");
109 }
110
111 RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
112 if (file_info.num_range_del_entries == 0) {
113 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
114 tombstone.start_key_.size());
115 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
116 tombstone.end_key_.size());
117 } else {
118 if (internal_comparator.user_comparator()->Compare(
119 tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
120 file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
121 tombstone.start_key_.size());
122 }
123 if (internal_comparator.user_comparator()->Compare(
124 tombstone.end_key_, file_info.largest_range_del_key) > 0) {
125 file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
126 tombstone.end_key_.size());
127 }
128 }
129
130 auto ikey_and_end_key = tombstone.Serialize();
131 builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
132
133 // update file info
134 file_info.num_range_del_entries++;
135 file_info.file_size = builder->FileSize();
136
137 InvalidatePageCache(false /* closing */).PermitUncheckedError();
138 return Status::OK();
139 }
140
InvalidatePageCacheROCKSDB_NAMESPACE::SstFileWriter::Rep141 Status InvalidatePageCache(bool closing) {
142 Status s = Status::OK();
143 if (invalidate_page_cache == false) {
144 // Fadvise disabled
145 return s;
146 }
147 uint64_t bytes_since_last_fadvise =
148 builder->FileSize() - last_fadvise_size;
149 if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
150 TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
151 &(bytes_since_last_fadvise));
152 // Tell the OS that we don't need this file in page cache
153 s = file_writer->InvalidateCache(0, 0);
154 if (s.IsNotSupported()) {
155 // NotSupported is fine as it could be a file type that doesn't use page
156 // cache.
157 s = Status::OK();
158 }
159 last_fadvise_size = builder->FileSize();
160 }
161 return s;
162 }
163 };
164
SstFileWriter(const EnvOptions & env_options,const Options & options,const Comparator * user_comparator,ColumnFamilyHandle * column_family,bool invalidate_page_cache,Env::IOPriority io_priority,bool skip_filters)165 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
166 const Options& options,
167 const Comparator* user_comparator,
168 ColumnFamilyHandle* column_family,
169 bool invalidate_page_cache,
170 Env::IOPriority io_priority, bool skip_filters)
171 : rep_(new Rep(env_options, options, io_priority, user_comparator,
172 column_family, invalidate_page_cache, skip_filters)) {
173 rep_->file_info.file_size = 0;
174 }
175
~SstFileWriter()176 SstFileWriter::~SstFileWriter() {
177 if (rep_->builder) {
178 // User did not call Finish() or Finish() failed, we need to
179 // abandon the builder.
180 rep_->builder->Abandon();
181 }
182 }
183
Open(const std::string & file_path)184 Status SstFileWriter::Open(const std::string& file_path) {
185 Rep* r = rep_.get();
186 Status s;
187 std::unique_ptr<FSWritableFile> sst_file;
188 FileOptions cur_file_opts(r->env_options);
189 s = r->ioptions.env->GetFileSystem()->NewWritableFile(
190 file_path, cur_file_opts, &sst_file, nullptr);
191 if (!s.ok()) {
192 return s;
193 }
194
195 sst_file->SetIOPriority(r->io_priority);
196
197 CompressionType compression_type;
198 CompressionOptions compression_opts;
199 if (r->mutable_cf_options.bottommost_compression !=
200 kDisableCompressionOption) {
201 compression_type = r->mutable_cf_options.bottommost_compression;
202 if (r->mutable_cf_options.bottommost_compression_opts.enabled) {
203 compression_opts = r->mutable_cf_options.bottommost_compression_opts;
204 } else {
205 compression_opts = r->mutable_cf_options.compression_opts;
206 }
207 } else if (!r->ioptions.compression_per_level.empty()) {
208 // Use the compression of the last level if we have per level compression
209 compression_type = *(r->ioptions.compression_per_level.rbegin());
210 compression_opts = r->mutable_cf_options.compression_opts;
211 } else {
212 compression_type = r->mutable_cf_options.compression;
213 compression_opts = r->mutable_cf_options.compression_opts;
214 }
215
216 IntTblPropCollectorFactories int_tbl_prop_collector_factories;
217
218 // SstFileWriter properties collector to add SstFileWriter version.
219 int_tbl_prop_collector_factories.emplace_back(
220 new SstFileWriterPropertiesCollectorFactory(2 /* version */,
221 0 /* global_seqno*/));
222
223 // User collector factories
224 auto user_collector_factories =
225 r->ioptions.table_properties_collector_factories;
226 for (size_t i = 0; i < user_collector_factories.size(); i++) {
227 int_tbl_prop_collector_factories.emplace_back(
228 new UserKeyTablePropertiesCollectorFactory(
229 user_collector_factories[i]));
230 }
231 int unknown_level = -1;
232 uint32_t cf_id;
233
234 if (r->cfh != nullptr) {
235 // user explicitly specified that this file will be ingested into cfh,
236 // we can persist this information in the file.
237 cf_id = r->cfh->GetID();
238 r->column_family_name = r->cfh->GetName();
239 } else {
240 r->column_family_name = "";
241 cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
242 }
243 // SstFileWriter is used to create sst files that can be added to database
244 // later. Therefore, no real db_id and db_session_id are associated with it.
245 // Here we mimic the way db_session_id behaves by resetting the db_session_id
246 // every time SstFileWriter is used, and in this case db_id is set to be "SST
247 // Writer".
248 std::string db_session_id = r->ioptions.env->GenerateUniqueId();
249 if (!db_session_id.empty() && db_session_id.back() == '\n') {
250 db_session_id.pop_back();
251 }
252 TableBuilderOptions table_builder_options(
253 r->ioptions, r->mutable_cf_options, r->internal_comparator,
254 &int_tbl_prop_collector_factories, compression_type, compression_opts,
255 cf_id, r->column_family_name, unknown_level, false /* is_bottommost */,
256 TableFileCreationReason::kMisc, 0 /* creation_time */,
257 0 /* oldest_key_time */, 0 /* file_creation_time */,
258 "SST Writer" /* db_id */, db_session_id, 0 /* target_file_size */);
259 // XXX: when we can remove skip_filters from the SstFileWriter public API
260 // we can remove it from TableBuilderOptions.
261 table_builder_options.skip_filters = r->skip_filters;
262 FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types;
263 r->file_writer.reset(new WritableFileWriter(
264 std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
265 nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners,
266 r->ioptions.file_checksum_gen_factory.get(),
267 tmp_set.Contains(FileType::kTableFile)));
268
269 // TODO(tec) : If table_factory is using compressed block cache, we will
270 // be adding the external sst file blocks into it, which is wasteful.
271 r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
272 table_builder_options, r->file_writer.get()));
273
274 r->file_info = ExternalSstFileInfo();
275 r->file_info.file_path = file_path;
276 r->file_info.version = 2;
277 return s;
278 }
279
Add(const Slice & user_key,const Slice & value)280 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
281 return rep_->Add(user_key, value, ValueType::kTypeValue);
282 }
283
Put(const Slice & user_key,const Slice & value)284 Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
285 return rep_->Add(user_key, value, ValueType::kTypeValue);
286 }
287
Merge(const Slice & user_key,const Slice & value)288 Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
289 return rep_->Add(user_key, value, ValueType::kTypeMerge);
290 }
291
Delete(const Slice & user_key)292 Status SstFileWriter::Delete(const Slice& user_key) {
293 return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
294 }
295
DeleteRange(const Slice & begin_key,const Slice & end_key)296 Status SstFileWriter::DeleteRange(const Slice& begin_key,
297 const Slice& end_key) {
298 return rep_->DeleteRange(begin_key, end_key);
299 }
300
Finish(ExternalSstFileInfo * file_info)301 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
302 Rep* r = rep_.get();
303 if (!r->builder) {
304 return Status::InvalidArgument("File is not opened");
305 }
306 if (r->file_info.num_entries == 0 &&
307 r->file_info.num_range_del_entries == 0) {
308 return Status::InvalidArgument("Cannot create sst file with no entries");
309 }
310
311 Status s = r->builder->Finish();
312 r->file_info.file_size = r->builder->FileSize();
313
314 if (s.ok()) {
315 s = r->file_writer->Sync(r->ioptions.use_fsync);
316 r->InvalidatePageCache(true /* closing */).PermitUncheckedError();
317 if (s.ok()) {
318 s = r->file_writer->Close();
319 }
320 }
321 if (s.ok()) {
322 r->file_info.file_checksum = r->file_writer->GetFileChecksum();
323 r->file_info.file_checksum_func_name =
324 r->file_writer->GetFileChecksumFuncName();
325 }
326 if (!s.ok()) {
327 r->ioptions.env->DeleteFile(r->file_info.file_path);
328 }
329
330 if (file_info != nullptr) {
331 *file_info = r->file_info;
332 }
333
334 r->builder.reset();
335 return s;
336 }
337
FileSize()338 uint64_t SstFileWriter::FileSize() {
339 return rep_->file_info.file_size;
340 }
341 #endif // !ROCKSDB_LITE
342
343 } // namespace ROCKSDB_NAMESPACE
344